add RET, Q, RSE lint
This commit is contained in:
@@ -104,25 +104,24 @@ class GObjApi:
|
||||
for m_name, m_signature in fn_signatures.items():
|
||||
if m_name not in overwrite_fns:
|
||||
continue
|
||||
else:
|
||||
# check if the signature of the overriden method matches
|
||||
# the implementation signature
|
||||
exp_args = []
|
||||
exp_return = m_signature.return_annotation
|
||||
for param in dict(m_signature.parameters).values():
|
||||
exp_args.append(param.annotation)
|
||||
exp_signature = (tuple(exp_args), exp_return)
|
||||
# check if the signature of the overriden method matches
|
||||
# the implementation signature
|
||||
exp_args = []
|
||||
exp_return = m_signature.return_annotation
|
||||
for param in dict(m_signature.parameters).values():
|
||||
exp_args.append(param.annotation)
|
||||
exp_signature = (tuple(exp_args), exp_return)
|
||||
|
||||
# implementation signature
|
||||
obj = dict(overwrite_fns[m_name].__dict__)
|
||||
obj_type = obj["__orig_bases__"][0]
|
||||
got_signature = obj_type.__args__
|
||||
# implementation signature
|
||||
obj = dict(overwrite_fns[m_name].__dict__)
|
||||
obj_type = obj["__orig_bases__"][0]
|
||||
got_signature = obj_type.__args__
|
||||
|
||||
if exp_signature != got_signature:
|
||||
log.error(f"Expected signature: {exp_signature}")
|
||||
log.error(f"Actual signature: {got_signature}")
|
||||
msg = f"Overwritten method '{m_name}' has different signature than the implementation"
|
||||
raise ValueError(msg)
|
||||
if exp_signature != got_signature:
|
||||
log.error(f"Expected signature: {exp_signature}")
|
||||
log.error(f"Actual signature: {got_signature}")
|
||||
msg = f"Overwritten method '{m_name}' has different signature than the implementation"
|
||||
raise ValueError(msg)
|
||||
|
||||
def get_obj(self, fn_name: str) -> type[ImplFunc]:
|
||||
result = self._obj_registry.get(fn_name, None)
|
||||
@@ -146,11 +145,10 @@ class GObjApi:
|
||||
self.thread = MethodExecutor(plain_fn, *args, **kwargs)
|
||||
self.thread.start()
|
||||
return GLib.SOURCE_CONTINUE
|
||||
elif self.thread.finished:
|
||||
if self.thread.finished:
|
||||
result = self.thread.result
|
||||
self.returns(method_name=fn_name, result=result)
|
||||
return GLib.SOURCE_REMOVE
|
||||
else:
|
||||
return GLib.SOURCE_CONTINUE
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
return cast(type[ImplFunc], GenericFnRuntime)
|
||||
|
||||
@@ -67,9 +67,7 @@ class WebExecutor(GObject.Object):
|
||||
if self.content_uri.startswith("http://") and uri.startswith(self.content_uri):
|
||||
log.debug(f"Allow navigation request: {uri}")
|
||||
return False
|
||||
elif self.content_uri.startswith("file://") and uri.startswith(
|
||||
self.content_uri
|
||||
):
|
||||
if self.content_uri.startswith("file://") and uri.startswith(self.content_uri):
|
||||
log.debug(f"Allow navigation request: {uri}")
|
||||
return False
|
||||
log.warning(
|
||||
|
||||
@@ -56,7 +56,8 @@ def get_directory(current_path: str) -> Directory:
|
||||
directory = Directory(path=str(curr_dir))
|
||||
|
||||
if not curr_dir.is_dir():
|
||||
raise ClanError()
|
||||
msg = f"Path {curr_dir} is not a directory"
|
||||
raise ClanError(msg)
|
||||
|
||||
with os.scandir(curr_dir.resolve()) as it:
|
||||
for entry in it:
|
||||
|
||||
@@ -71,16 +71,15 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
|
||||
if not field.name.startswith("_")
|
||||
and getattr(obj, field.name) is not None # type: ignore
|
||||
}
|
||||
elif isinstance(obj, list | tuple):
|
||||
if isinstance(obj, list | tuple):
|
||||
return [_to_dict(item) for item in obj]
|
||||
elif isinstance(obj, dict):
|
||||
if isinstance(obj, dict):
|
||||
return {sanitize_string(k): _to_dict(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, Path):
|
||||
if isinstance(obj, Path):
|
||||
return sanitize_string(str(obj))
|
||||
elif isinstance(obj, str):
|
||||
if isinstance(obj, str):
|
||||
return sanitize_string(obj)
|
||||
else:
|
||||
return obj
|
||||
return obj
|
||||
|
||||
return _to_dict(obj)
|
||||
|
||||
@@ -144,7 +143,7 @@ def construct_value(
|
||||
|
||||
# If the field expects a path
|
||||
# Field_value must be a string
|
||||
elif is_type_in_union(t, Path):
|
||||
if is_type_in_union(t, Path):
|
||||
if not isinstance(field_value, str):
|
||||
msg = (
|
||||
f"Expected string, cannot construct pathlib.Path() from: {field_value} "
|
||||
@@ -157,22 +156,22 @@ def construct_value(
|
||||
return Path(field_value)
|
||||
|
||||
# Trivial values
|
||||
elif t is str:
|
||||
if t is str:
|
||||
if not isinstance(field_value, str):
|
||||
msg = f"Expected string, got {field_value}"
|
||||
raise ClanError(msg, location=f"{loc}")
|
||||
|
||||
return field_value
|
||||
|
||||
elif t is int and not isinstance(field_value, str):
|
||||
if t is int and not isinstance(field_value, str):
|
||||
return int(field_value) # type: ignore
|
||||
elif t is float and not isinstance(field_value, str):
|
||||
if t is float and not isinstance(field_value, str):
|
||||
return float(field_value) # type: ignore
|
||||
elif t is bool and isinstance(field_value, bool):
|
||||
if t is bool and isinstance(field_value, bool):
|
||||
return field_value # type: ignore
|
||||
|
||||
# Union types construct the first non-None type
|
||||
elif is_union_type(t):
|
||||
if is_union_type(t):
|
||||
# Unwrap the union type
|
||||
inner = unwrap_none_type(t)
|
||||
# Construct the field value
|
||||
@@ -181,34 +180,33 @@ def construct_value(
|
||||
# Nested types
|
||||
# list
|
||||
# dict
|
||||
elif get_origin(t) is list:
|
||||
if get_origin(t) is list:
|
||||
if not isinstance(field_value, list):
|
||||
msg = f"Expected list, got {field_value}"
|
||||
raise ClanError(msg, location=f"{loc}")
|
||||
|
||||
return [construct_value(get_args(t)[0], item) for item in field_value]
|
||||
elif get_origin(t) is dict and isinstance(field_value, dict):
|
||||
if get_origin(t) is dict and isinstance(field_value, dict):
|
||||
return {
|
||||
key: construct_value(get_args(t)[1], value)
|
||||
for key, value in field_value.items()
|
||||
}
|
||||
elif get_origin(t) is Literal:
|
||||
if get_origin(t) is Literal:
|
||||
valid_values = get_args(t)
|
||||
if field_value not in valid_values:
|
||||
msg = f"Expected one of {valid_values}, got {field_value}"
|
||||
raise ClanError(msg, location=f"{loc}")
|
||||
return field_value
|
||||
|
||||
elif get_origin(t) is Annotated:
|
||||
if get_origin(t) is Annotated:
|
||||
(base_type,) = get_args(t)
|
||||
return construct_value(base_type, field_value)
|
||||
|
||||
# elif get_origin(t) is Union:
|
||||
|
||||
# Unhandled
|
||||
else:
|
||||
msg = f"Unhandled field type {t} with value {field_value}"
|
||||
raise ClanError(msg)
|
||||
msg = f"Unhandled field type {t} with value {field_value}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def construct_dataclass(
|
||||
@@ -274,5 +272,4 @@ def from_dict(
|
||||
msg = f"{data} is not a dict. Expected {t}"
|
||||
raise ClanError(msg)
|
||||
return construct_dataclass(t, data, path) # type: ignore
|
||||
else:
|
||||
return construct_value(t, data, path)
|
||||
return construct_value(t, data, path)
|
||||
|
||||
@@ -112,7 +112,7 @@ def type_to_dict(
|
||||
"additionalProperties": False,
|
||||
}
|
||||
|
||||
elif type(t) is UnionType:
|
||||
if type(t) is UnionType:
|
||||
return {
|
||||
"oneOf": [type_to_dict(arg, scope, type_map) for arg in t.__args__],
|
||||
}
|
||||
@@ -126,7 +126,7 @@ def type_to_dict(
|
||||
raise JSchemaTypeError(msg)
|
||||
return type_to_dict(type_map.get(t), scope, type_map)
|
||||
|
||||
elif hasattr(t, "__origin__"): # Check if it's a generic type
|
||||
if hasattr(t, "__origin__"): # Check if it's a generic type
|
||||
origin = get_origin(t)
|
||||
args = get_args(t)
|
||||
|
||||
@@ -136,41 +136,40 @@ def type_to_dict(
|
||||
msg = f"{scope} Unhandled Type: "
|
||||
raise JSchemaTypeError(msg, origin)
|
||||
|
||||
elif origin is Literal:
|
||||
if origin is Literal:
|
||||
# Handle Literal values for enums in JSON Schema
|
||||
return {
|
||||
"type": "string",
|
||||
"enum": list(args), # assumes all args are strings
|
||||
}
|
||||
|
||||
elif origin is Annotated:
|
||||
if origin is Annotated:
|
||||
base_type, *metadata = get_args(t)
|
||||
schema = type_to_dict(base_type, scope) # Generate schema for the base type
|
||||
return apply_annotations(schema, metadata)
|
||||
|
||||
elif origin is Union:
|
||||
if origin is Union:
|
||||
union_types = [type_to_dict(arg, scope, type_map) for arg in t.__args__]
|
||||
return {
|
||||
"oneOf": union_types,
|
||||
}
|
||||
|
||||
elif origin in {list, set, frozenset}:
|
||||
if origin in {list, set, frozenset}:
|
||||
return {
|
||||
"type": "array",
|
||||
"items": type_to_dict(t.__args__[0], scope, type_map),
|
||||
}
|
||||
|
||||
elif issubclass(origin, dict):
|
||||
if issubclass(origin, dict):
|
||||
value_type = t.__args__[1]
|
||||
if value_type is Any:
|
||||
return {"type": "object", "additionalProperties": True}
|
||||
else:
|
||||
return {
|
||||
"type": "object",
|
||||
"additionalProperties": type_to_dict(value_type, scope, type_map),
|
||||
}
|
||||
return {
|
||||
"type": "object",
|
||||
"additionalProperties": type_to_dict(value_type, scope, type_map),
|
||||
}
|
||||
# Generic dataclass with type parameters
|
||||
elif dataclasses.is_dataclass(origin):
|
||||
if dataclasses.is_dataclass(origin):
|
||||
# This behavior should mimic the scoping of typeVars in dataclasses
|
||||
# Once type_to_dict() encounters a TypeVar, it will look up the type in the type_map
|
||||
# When type_to_dict() returns the map goes out of scope.
|
||||
@@ -182,7 +181,7 @@ def type_to_dict(
|
||||
msg = f"{scope} - Error api type not yet supported {t!s}"
|
||||
raise JSchemaTypeError(msg)
|
||||
|
||||
elif isinstance(t, type):
|
||||
if isinstance(t, type):
|
||||
if t is str:
|
||||
return {"type": "string"}
|
||||
if t is int:
|
||||
@@ -211,6 +210,5 @@ def type_to_dict(
|
||||
|
||||
msg = f"{scope} - Error primitive type not supported {t!s}"
|
||||
raise JSchemaTypeError(msg)
|
||||
else:
|
||||
msg = f"{scope} - Error type not supported {t!s}"
|
||||
raise JSchemaTypeError(msg)
|
||||
msg = f"{scope} - Error type not supported {t!s}"
|
||||
raise JSchemaTypeError(msg)
|
||||
|
||||
@@ -24,8 +24,7 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
|
||||
if proc.returncode != 0:
|
||||
msg = "failed to start backup"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
print("successfully started backup")
|
||||
print("successfully started backup")
|
||||
else:
|
||||
if provider not in backup_scripts["providers"]:
|
||||
msg = f"provider {provider} not found"
|
||||
@@ -36,8 +35,7 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
|
||||
if proc.returncode != 0:
|
||||
msg = "failed to start backup"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
print("successfully started backup")
|
||||
print("successfully started backup")
|
||||
|
||||
|
||||
def create_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -30,12 +30,9 @@ def list_provider(machine: Machine, provider: str) -> list[Backup]:
|
||||
# TODO this should be a warning, only raise exception if no providers succeed
|
||||
msg = f"failed to list backups for provider {provider}: {proc.stdout}"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
parsed_json = json.loads(proc.stdout)
|
||||
for archive in parsed_json:
|
||||
results.append(
|
||||
Backup(name=archive["name"], job_name=archive.get("job_name"))
|
||||
)
|
||||
parsed_json = json.loads(proc.stdout)
|
||||
for archive in parsed_json:
|
||||
results.append(Backup(name=archive["name"], job_name=archive.get("job_name")))
|
||||
return results
|
||||
|
||||
|
||||
|
||||
@@ -53,8 +53,7 @@ def show_clan_meta(uri: str | Path) -> Meta:
|
||||
description="Icon path must be a URL or a relative path.",
|
||||
)
|
||||
|
||||
else:
|
||||
icon_path = str((Path(uri) / meta_icon).resolve())
|
||||
icon_path = str((Path(uri) / meta_icon).resolve())
|
||||
else:
|
||||
msg = "Invalid schema"
|
||||
raise ClanError(
|
||||
|
||||
@@ -171,6 +171,5 @@ def run_no_stdout(
|
||||
cwd = Path.cwd()
|
||||
if logging.getLogger(__name__.split(".")[0]).isEnabledFor(logging.DEBUG):
|
||||
return run(cmd, env=env, log=log, check=check, error_msg=error_msg)
|
||||
else:
|
||||
log = Log.NONE
|
||||
return run(cmd, env=env, log=log, check=check, error_msg=error_msg)
|
||||
log = Log.NONE
|
||||
return run(cmd, env=env, log=log, check=check, error_msg=error_msg)
|
||||
|
||||
@@ -22,26 +22,25 @@ log = logging.getLogger(__name__)
|
||||
def map_type(nix_type: str) -> Any:
|
||||
if nix_type == "boolean":
|
||||
return bool
|
||||
elif nix_type in [
|
||||
if nix_type in [
|
||||
"integer",
|
||||
"signed integer",
|
||||
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
||||
]:
|
||||
return int
|
||||
elif nix_type.startswith("string"):
|
||||
if nix_type.startswith("string"):
|
||||
return str
|
||||
elif nix_type.startswith("null or "):
|
||||
if nix_type.startswith("null or "):
|
||||
subtype = nix_type.removeprefix("null or ")
|
||||
return map_type(subtype) | None
|
||||
elif nix_type.startswith("attribute set of"):
|
||||
if nix_type.startswith("attribute set of"):
|
||||
subtype = nix_type.removeprefix("attribute set of ")
|
||||
return dict[str, map_type(subtype)] # type: ignore
|
||||
elif nix_type.startswith("list of"):
|
||||
if nix_type.startswith("list of"):
|
||||
subtype = nix_type.removeprefix("list of ")
|
||||
return list[map_type(subtype)] # type: ignore
|
||||
else:
|
||||
msg = f"Unknown type {nix_type}"
|
||||
raise ClanError(msg)
|
||||
msg = f"Unknown type {nix_type}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
# merge two dicts recursively
|
||||
@@ -77,31 +76,29 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||
if isinstance(input_type, bool):
|
||||
if value[0] in ["true", "True", "yes", "y", "1"]:
|
||||
return True
|
||||
elif value[0] in ["false", "False", "no", "n", "0"]:
|
||||
if value[0] in ["false", "False", "no", "n", "0"]:
|
||||
return False
|
||||
else:
|
||||
msg = f"Invalid value {value} for boolean"
|
||||
raise ClanError(msg)
|
||||
msg = f"Invalid value {value} for boolean"
|
||||
raise ClanError(msg)
|
||||
# handle lists
|
||||
elif get_origin(input_type) is list:
|
||||
if get_origin(input_type) is list:
|
||||
subtype = input_type.__args__[0]
|
||||
return [cast([x], subtype, opt_description) for x in value]
|
||||
# handle dicts
|
||||
elif get_origin(input_type) is dict:
|
||||
if get_origin(input_type) is dict:
|
||||
if not isinstance(value, dict):
|
||||
msg = f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||
raise ClanError(msg)
|
||||
subtype = input_type.__args__[1]
|
||||
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
||||
elif str(input_type) == "str | None":
|
||||
if str(input_type) == "str | None":
|
||||
if value[0] in ["null", "None"]:
|
||||
return None
|
||||
return value[0]
|
||||
else:
|
||||
if len(value) > 1:
|
||||
msg = f"Too many values for {opt_description}"
|
||||
raise ClanError(msg)
|
||||
return input_type(value[0])
|
||||
if len(value) > 1:
|
||||
msg = f"Too many values for {opt_description}"
|
||||
raise ClanError(msg)
|
||||
return input_type(value[0])
|
||||
except ValueError as e:
|
||||
msg = f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
@@ -41,20 +41,18 @@ def subtype_from_schema(schema: dict[str, Any]) -> type:
|
||||
if "additionalProperties" in schema:
|
||||
sub_type = subtype_from_schema(schema["additionalProperties"])
|
||||
return dict[str, sub_type] # type: ignore
|
||||
elif "properties" in schema:
|
||||
if "properties" in schema:
|
||||
msg = "Nested dicts are not supported"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
msg = "Unknown object type"
|
||||
raise ClanError(msg)
|
||||
elif schema["type"] == "array":
|
||||
msg = "Unknown object type"
|
||||
raise ClanError(msg)
|
||||
if schema["type"] == "array":
|
||||
if "items" not in schema:
|
||||
msg = "Untyped arrays are not supported"
|
||||
raise ClanError(msg)
|
||||
sub_type = subtype_from_schema(schema["items"])
|
||||
return list[sub_type] # type: ignore
|
||||
else:
|
||||
return type_map[schema["type"]]
|
||||
return type_map[schema["type"]]
|
||||
|
||||
|
||||
def type_from_schema_path(
|
||||
@@ -66,19 +64,17 @@ def type_from_schema_path(
|
||||
full_path = path
|
||||
if len(path) == 0:
|
||||
return subtype_from_schema(schema)
|
||||
elif schema["type"] == "object":
|
||||
if schema["type"] == "object":
|
||||
if "properties" in schema:
|
||||
subtype = type_from_schema_path(schema["properties"][path[0]], path[1:])
|
||||
return subtype
|
||||
elif "additionalProperties" in schema:
|
||||
if "additionalProperties" in schema:
|
||||
subtype = type_from_schema_path(schema["additionalProperties"], path[1:])
|
||||
return subtype
|
||||
else:
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
@@ -100,7 +96,7 @@ def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
for sub_name, sub_type in sub_result.items():
|
||||
result[f"{name}.{sub_name}"] = sub_type
|
||||
continue
|
||||
elif type_ == "array":
|
||||
if type_ == "array":
|
||||
if "items" not in value:
|
||||
msg = f"Untyped arrays are not supported (field: {name})"
|
||||
raise ClanError(msg)
|
||||
|
||||
@@ -47,21 +47,19 @@ def clan_templates() -> Path:
|
||||
template_path = module_root().parent.parent.parent / "templates"
|
||||
if template_path.exists():
|
||||
return template_path
|
||||
else:
|
||||
template_path = module_root() / "templates"
|
||||
if not template_path.exists():
|
||||
msg = f"BUG! clan core not found at {template_path}. This is an issue with packaging the cli"
|
||||
raise ClanError(msg)
|
||||
return template_path
|
||||
template_path = module_root() / "templates"
|
||||
if not template_path.exists():
|
||||
msg = f"BUG! clan core not found at {template_path}. This is an issue with packaging the cli"
|
||||
raise ClanError(msg)
|
||||
return template_path
|
||||
|
||||
|
||||
def user_config_dir() -> Path:
|
||||
if sys.platform == "win32":
|
||||
return Path(os.getenv("APPDATA", os.path.expanduser("~\\AppData\\Roaming\\")))
|
||||
elif sys.platform == "darwin":
|
||||
if sys.platform == "darwin":
|
||||
return Path(os.path.expanduser("~/Library/Application Support/"))
|
||||
else:
|
||||
return Path(os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config")))
|
||||
return Path(os.getenv("XDG_CONFIG_HOME", os.path.expanduser("~/.config")))
|
||||
|
||||
|
||||
def user_data_dir() -> Path:
|
||||
@@ -69,10 +67,9 @@ def user_data_dir() -> Path:
|
||||
return Path(
|
||||
os.getenv("LOCALAPPDATA", os.path.expanduser("~\\AppData\\Local\\"))
|
||||
)
|
||||
elif sys.platform == "darwin":
|
||||
if sys.platform == "darwin":
|
||||
return Path(os.path.expanduser("~/Library/Application Support/"))
|
||||
else:
|
||||
return Path(os.getenv("XDG_DATA_HOME", os.path.expanduser("~/.local/share")))
|
||||
return Path(os.getenv("XDG_DATA_HOME", os.path.expanduser("~/.local/share")))
|
||||
|
||||
|
||||
def user_cache_dir() -> Path:
|
||||
@@ -80,10 +77,9 @@ def user_cache_dir() -> Path:
|
||||
return Path(
|
||||
os.getenv("LOCALAPPDATA", os.path.expanduser("~\\AppData\\Local\\"))
|
||||
)
|
||||
elif sys.platform == "darwin":
|
||||
if sys.platform == "darwin":
|
||||
return Path(os.path.expanduser("~/Library/Caches/"))
|
||||
else:
|
||||
return Path(os.getenv("XDG_CACHE_HOME", os.path.expanduser("~/.cache")))
|
||||
return Path(os.getenv("XDG_CACHE_HOME", os.path.expanduser("~/.cache")))
|
||||
|
||||
|
||||
def user_gcroot_dir() -> Path:
|
||||
|
||||
@@ -24,9 +24,8 @@ class FactStore(FactStoreBase):
|
||||
fact_path.touch()
|
||||
fact_path.write_bytes(value)
|
||||
return fact_path
|
||||
else:
|
||||
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
|
||||
raise ClanError(msg)
|
||||
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
|
||||
raise ClanError(msg)
|
||||
|
||||
def exists(self, service: str, name: str) -> bool:
|
||||
fact_path = (
|
||||
|
||||
@@ -30,8 +30,7 @@ def help_hyperlink(description: str, url: str) -> str:
|
||||
if sys.argv[0].__contains__("docs.py"):
|
||||
return docs_hyperlink(description, url)
|
||||
|
||||
else:
|
||||
return hyperlink_same_text_and_url(url)
|
||||
return hyperlink_same_text_and_url(url)
|
||||
|
||||
|
||||
def docs_hyperlink(description: str, url: str) -> str:
|
||||
|
||||
@@ -9,7 +9,7 @@ class ClanJSONEncoder(json.JSONEncoder):
|
||||
if hasattr(o, "to_json") and callable(o.to_json):
|
||||
return o.to_json()
|
||||
# Check if the object is a dataclass
|
||||
elif dataclasses.is_dataclass(o):
|
||||
if dataclasses.is_dataclass(o):
|
||||
return dataclasses.asdict(o)
|
||||
# Otherwise, use the default serialization
|
||||
return super().default(o)
|
||||
|
||||
@@ -145,7 +145,7 @@ def install_command(args: argparse.Namespace) -> None:
|
||||
if not args.yes:
|
||||
ask = input(f"Install {args.machine} to {target_host}? [y/N] ")
|
||||
if ask != "y":
|
||||
return
|
||||
return None
|
||||
|
||||
return install_machine(
|
||||
InstallOptions(
|
||||
|
||||
@@ -110,11 +110,10 @@ class Machine:
|
||||
def flake_dir(self) -> Path:
|
||||
if self.flake.is_local():
|
||||
return self.flake.path
|
||||
elif self.flake.is_remote():
|
||||
if self.flake.is_remote():
|
||||
return Path(nix_metadata(self.flake.url)["path"])
|
||||
else:
|
||||
msg = f"Unsupported flake url: {self.flake}"
|
||||
raise ClanError(msg)
|
||||
msg = f"Unsupported flake url: {self.flake}"
|
||||
raise ClanError(msg)
|
||||
|
||||
@property
|
||||
def target_host(self) -> Host:
|
||||
@@ -218,12 +217,11 @@ class Machine:
|
||||
if method == "eval":
|
||||
output = run_no_stdout(nix_eval(args)).stdout.strip()
|
||||
return output
|
||||
elif method == "build":
|
||||
if method == "build":
|
||||
outpath = run_no_stdout(nix_build(args)).stdout.strip()
|
||||
return Path(outpath)
|
||||
else:
|
||||
msg = f"Unknown method {method}"
|
||||
raise ValueError(msg)
|
||||
msg = f"Unknown method {method}"
|
||||
raise ValueError(msg)
|
||||
|
||||
def eval_nix(
|
||||
self,
|
||||
@@ -246,9 +244,8 @@ class Machine:
|
||||
if isinstance(output, str):
|
||||
self._eval_cache[attr] = output
|
||||
return output
|
||||
else:
|
||||
msg = "eval_nix returned not a string"
|
||||
raise ClanError(msg)
|
||||
msg = "eval_nix returned not a string"
|
||||
raise ClanError(msg)
|
||||
|
||||
def build_nix(
|
||||
self,
|
||||
@@ -272,6 +269,5 @@ class Machine:
|
||||
if isinstance(output, Path):
|
||||
self._build_cache[attr] = output
|
||||
return output
|
||||
else:
|
||||
msg = "build_nix returned not a Path"
|
||||
raise ClanError(msg)
|
||||
msg = "build_nix returned not a Path"
|
||||
raise ClanError(msg)
|
||||
|
||||
@@ -40,19 +40,18 @@ def nix_build(flags: list[str], gcroot: Path | None = None) -> list[str]:
|
||||
)
|
||||
+ flags
|
||||
)
|
||||
else:
|
||||
return (
|
||||
nix_command(
|
||||
[
|
||||
"build",
|
||||
"--no-link",
|
||||
"--print-out-paths",
|
||||
"--no-write-lock-file",
|
||||
"--show-trace",
|
||||
]
|
||||
)
|
||||
+ flags
|
||||
return (
|
||||
nix_command(
|
||||
[
|
||||
"build",
|
||||
"--no-link",
|
||||
"--print-out-paths",
|
||||
"--no-write-lock-file",
|
||||
"--show-trace",
|
||||
]
|
||||
)
|
||||
+ flags
|
||||
)
|
||||
|
||||
|
||||
def nix_add_to_gcroots(nix_path: Path, dest: Path) -> None:
|
||||
|
||||
@@ -88,8 +88,7 @@ def trim_path_to_three_levels(path: str) -> str:
|
||||
parts = path.split(os.path.sep)
|
||||
if len(parts) > 4:
|
||||
return os.path.sep.join(parts[-4:])
|
||||
else:
|
||||
return path
|
||||
return path
|
||||
|
||||
|
||||
PROFS = ProfilerStore()
|
||||
@@ -116,5 +115,4 @@ def profile(func: Callable) -> Callable:
|
||||
|
||||
if os.getenv("PERF", "0") == "1":
|
||||
return wrapper
|
||||
else:
|
||||
return func
|
||||
return func
|
||||
|
||||
@@ -96,8 +96,7 @@ def default_sops_key_path() -> Path:
|
||||
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
||||
if raw_path:
|
||||
return Path(raw_path)
|
||||
else:
|
||||
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||
|
||||
|
||||
def ensure_sops_key(flake_dir: Path) -> SopsKey:
|
||||
@@ -107,9 +106,8 @@ def ensure_sops_key(flake_dir: Path) -> SopsKey:
|
||||
path = default_sops_key_path()
|
||||
if path.exists():
|
||||
return ensure_user_or_machine(flake_dir, get_public_key(path.read_text()))
|
||||
else:
|
||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
raise ClanError(msg)
|
||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
||||
@@ -348,11 +348,10 @@ class Host:
|
||||
raise subprocess.CalledProcessError(
|
||||
ret, cmd=cmd, output=stdout_data, stderr=stderr_data
|
||||
)
|
||||
else:
|
||||
cmdlog.warning(
|
||||
f"[Command failed: {ret}] {displayed_cmd}",
|
||||
extra={"command_prefix": self.command_prefix},
|
||||
)
|
||||
cmdlog.warning(
|
||||
f"[Command failed: {ret}] {displayed_cmd}",
|
||||
extra={"command_prefix": self.command_prefix},
|
||||
)
|
||||
return subprocess.CompletedProcess(
|
||||
cmd, ret, stdout=stdout_data, stderr=stderr_data
|
||||
)
|
||||
|
||||
@@ -18,8 +18,7 @@ class FactStore(FactStoreBase):
|
||||
def _var_path(self, generator_name: str, name: str, shared: bool) -> Path:
|
||||
if shared:
|
||||
return self.shared_folder / generator_name / name
|
||||
else:
|
||||
return self.per_machine_folder / generator_name / name
|
||||
return self.per_machine_folder / generator_name / name
|
||||
|
||||
def set(
|
||||
self, generator_name: str, name: str, value: bytes, shared: bool = False
|
||||
@@ -30,9 +29,8 @@ class FactStore(FactStoreBase):
|
||||
fact_path.touch()
|
||||
fact_path.write_bytes(value)
|
||||
return fact_path
|
||||
else:
|
||||
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
|
||||
raise ClanError(msg)
|
||||
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
|
||||
raise ClanError(msg)
|
||||
|
||||
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
|
||||
return self._var_path(generator_name, name, shared).exists()
|
||||
|
||||
@@ -21,8 +21,7 @@ class SecretStore(SecretStoreBase):
|
||||
def _var_path(self, generator_name: str, name: str, shared: bool) -> Path:
|
||||
if shared:
|
||||
return Path(f"shared/{generator_name}/{name}")
|
||||
else:
|
||||
return Path(f"machines/{self.machine.name}/{generator_name}/{name}")
|
||||
return Path(f"machines/{self.machine.name}/{generator_name}/{name}")
|
||||
|
||||
def set(
|
||||
self,
|
||||
|
||||
@@ -43,39 +43,38 @@ def graphics_options(vm: VmConfig) -> GraphicOptions:
|
||||
#"-chardev", "socket,id=vgpu,path=/tmp/vgpu.sock",
|
||||
], cid)
|
||||
# fmt: on
|
||||
if not os.path.exists("/run/opengl-driver"):
|
||||
display_options = [
|
||||
"-vga",
|
||||
"none",
|
||||
"-display",
|
||||
"gtk,gl=on",
|
||||
"-device",
|
||||
"virtio-gpu-gl",
|
||||
"-display",
|
||||
"spice-app,gl=on",
|
||||
]
|
||||
else:
|
||||
if not os.path.exists("/run/opengl-driver"):
|
||||
display_options = [
|
||||
"-vga",
|
||||
"none",
|
||||
"-display",
|
||||
"gtk,gl=on",
|
||||
"-device",
|
||||
"virtio-gpu-gl",
|
||||
"-display",
|
||||
"spice-app,gl=on",
|
||||
]
|
||||
else:
|
||||
display_options = ["-display", "spice-app"]
|
||||
display_options = ["-display", "spice-app"]
|
||||
|
||||
# fmt: off
|
||||
return GraphicOptions([
|
||||
*common,
|
||||
*display_options,
|
||||
"-device", "virtio-serial-pci",
|
||||
"-chardev", "spicevmc,id=vdagent0,name=vdagent",
|
||||
"-device", "virtserialport,chardev=vdagent0,name=com.redhat.spice.0",
|
||||
"-device", "qemu-xhci,id=spicepass",
|
||||
"-chardev", "spicevmc,id=usbredirchardev1,name=usbredir",
|
||||
"-device", "usb-redir,chardev=usbredirchardev1,id=usbredirdev1",
|
||||
"-chardev", "spicevmc,id=usbredirchardev2,name=usbredir",
|
||||
"-device", "usb-redir,chardev=usbredirchardev2,id=usbredirdev2",
|
||||
"-chardev", "spicevmc,id=usbredirchardev3,name=usbredir",
|
||||
"-device", "usb-redir,chardev=usbredirchardev3,id=usbredirdev3",
|
||||
"-device", "pci-ohci,id=smartpass",
|
||||
"-device", "usb-ccid",
|
||||
"-chardev", "spicevmc,id=ccid,name=smartcard",
|
||||
], None)
|
||||
# fmt: off
|
||||
return GraphicOptions([
|
||||
*common,
|
||||
*display_options,
|
||||
"-device", "virtio-serial-pci",
|
||||
"-chardev", "spicevmc,id=vdagent0,name=vdagent",
|
||||
"-device", "virtserialport,chardev=vdagent0,name=com.redhat.spice.0",
|
||||
"-device", "qemu-xhci,id=spicepass",
|
||||
"-chardev", "spicevmc,id=usbredirchardev1,name=usbredir",
|
||||
"-device", "usb-redir,chardev=usbredirchardev1,id=usbredirdev1",
|
||||
"-chardev", "spicevmc,id=usbredirchardev2,name=usbredir",
|
||||
"-device", "usb-redir,chardev=usbredirchardev2,id=usbredirdev2",
|
||||
"-chardev", "spicevmc,id=usbredirchardev3,name=usbredir",
|
||||
"-device", "usb-redir,chardev=usbredirchardev3,id=usbredirdev3",
|
||||
"-device", "pci-ohci,id=smartpass",
|
||||
"-device", "usb-ccid",
|
||||
"-chardev", "spicevmc,id=ccid,name=smartcard",
|
||||
], None)
|
||||
# fmt: on
|
||||
|
||||
|
||||
|
||||
@@ -10,9 +10,8 @@ def is_valid_age_key(secret_key: str) -> bool:
|
||||
|
||||
if result.returncode == 0:
|
||||
return True
|
||||
else:
|
||||
msg = f"Invalid age key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
msg = f"Invalid age key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool:
|
||||
@@ -30,6 +29,5 @@ def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool:
|
||||
msg = f"Expected '{ssh_pub}' got '{result.stdout}' for ssh key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
return True
|
||||
else:
|
||||
msg = f"Invalid ssh key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
msg = f"Invalid ssh key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
|
||||
@@ -28,7 +28,6 @@ def test_commit_file(git_repo: Path) -> None:
|
||||
def test_commit_file_outside_git_raises_error(git_repo: Path) -> None:
|
||||
# create a file outside the git (a temporary file)
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
|
||||
# this should not fail but skip the commit
|
||||
with pytest.raises(ClanError):
|
||||
git.commit_file(Path(tmp.name), git_repo, "test commit")
|
||||
|
||||
@@ -175,8 +175,7 @@ class VMObject(GObject.Object):
|
||||
if self.progress_bar.is_visible():
|
||||
self.progress_bar.pulse()
|
||||
return GLib.SOURCE_CONTINUE
|
||||
else:
|
||||
return GLib.SOURCE_REMOVE
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def __start(self) -> None:
|
||||
with self._create_machine() as machine:
|
||||
|
||||
@@ -19,23 +19,22 @@ def map_json_type(
|
||||
return res
|
||||
if isinstance(json_type, dict):
|
||||
return map_json_type(json_type.get("type"))
|
||||
elif json_type == "string":
|
||||
if json_type == "string":
|
||||
return {"str"}
|
||||
elif json_type == "integer":
|
||||
if json_type == "integer":
|
||||
return {"int"}
|
||||
elif json_type == "boolean":
|
||||
if json_type == "boolean":
|
||||
return {"bool"}
|
||||
elif json_type == "array":
|
||||
if json_type == "array":
|
||||
assert nested_types, f"Array type not found for {parent}"
|
||||
return {f"""list[{" | ".join(nested_types)}]"""}
|
||||
elif json_type == "object":
|
||||
if json_type == "object":
|
||||
assert nested_types, f"dict type not found for {parent}"
|
||||
return {f"""dict[str, {" | ".join(nested_types)}]"""}
|
||||
elif json_type == "null":
|
||||
if json_type == "null":
|
||||
return {"None"}
|
||||
else:
|
||||
msg = f"Python type not found for {json_type}"
|
||||
raise ValueError(msg)
|
||||
msg = f"Python type not found for {json_type}"
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
known_classes = set()
|
||||
@@ -116,12 +115,12 @@ def field_def_from_default_value(
|
||||
field_types=field_types | {"None"},
|
||||
default="None",
|
||||
)
|
||||
elif isinstance(default_value, list):
|
||||
if isinstance(default_value, list):
|
||||
return finalize_field(
|
||||
field_types=field_types,
|
||||
default_factory="list",
|
||||
)
|
||||
elif isinstance(default_value, dict):
|
||||
if isinstance(default_value, dict):
|
||||
serialised_types = " | ".join(field_types)
|
||||
if serialised_types == nested_class_name:
|
||||
return finalize_field(
|
||||
@@ -129,28 +128,26 @@ def field_def_from_default_value(
|
||||
default_factory=nested_class_name,
|
||||
)
|
||||
|
||||
elif "dict[str," in serialised_types:
|
||||
if "dict[str," in serialised_types:
|
||||
return finalize_field(
|
||||
field_types=field_types,
|
||||
default_factory="dict",
|
||||
)
|
||||
else:
|
||||
return finalize_field(
|
||||
field_types=field_types,
|
||||
default_factory="dict",
|
||||
type_apendix=" | dict[str,Any]",
|
||||
)
|
||||
elif default_value == "‹name›":
|
||||
return finalize_field(
|
||||
field_types=field_types,
|
||||
default_factory="dict",
|
||||
type_apendix=" | dict[str,Any]",
|
||||
)
|
||||
if default_value == "‹name›":
|
||||
return None
|
||||
elif isinstance(default_value, str):
|
||||
if isinstance(default_value, str):
|
||||
return finalize_field(
|
||||
field_types=field_types,
|
||||
default=f"'{default_value}'",
|
||||
)
|
||||
else:
|
||||
# Other default values unhandled yet.
|
||||
msg = f"Unhandled default value for field '{field_name}' - default value: {default_value}"
|
||||
raise ValueError(msg)
|
||||
# Other default values unhandled yet.
|
||||
msg = f"Unhandled default value for field '{field_name}' - default value: {default_value}"
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
def get_field_def(
|
||||
|
||||
@@ -39,6 +39,7 @@ def convert_bytearray_to_string(byte_array: str) -> str:
|
||||
if byte_array.startswith('"@ByteArray(') and byte_array.endswith(')"'):
|
||||
byte_array = byte_array[12:-2]
|
||||
return byte_array.replace("\\n", "\n")
|
||||
return byte_array
|
||||
|
||||
|
||||
# this must be created before moonlight is first run
|
||||
|
||||
@@ -101,7 +101,7 @@ def main() -> None:
|
||||
|
||||
if "workspace" in crate_manifest:
|
||||
print(f"{sys.argv[1]} is a workspace manifest, skipping", file=sys.stderr)
|
||||
return
|
||||
return None
|
||||
|
||||
changed = False
|
||||
|
||||
|
||||
@@ -29,10 +29,13 @@ lint.select = [
|
||||
"PIE",
|
||||
"PT",
|
||||
"PYI",
|
||||
"Q",
|
||||
"RET",
|
||||
"RSE",
|
||||
"RUF",
|
||||
"T10",
|
||||
"TID",
|
||||
"U",
|
||||
"YTT",
|
||||
]
|
||||
lint.ignore = ["E501", "E402", "E731", "ANN101", "ANN401", "A003"]
|
||||
lint.ignore = ["E501", "E402", "E731", "ANN101", "ANN401", "A003", "RET504"]
|
||||
|
||||
Reference in New Issue
Block a user