Merge pull request 'enable ASYNC, DTZ, YTT and EM lints' (#2014) from Mic92-type-checking into main

This commit is contained in:
clan-bot
2024-09-02 12:13:31 +00:00
109 changed files with 686 additions and 638 deletions

View File

@@ -14,14 +14,6 @@ find = {}
[tool.setuptools.package-data] [tool.setuptools.package-data]
test_driver = ["py.typed"] test_driver = ["py.typed"]
[tool.ruff]
target-version = "py311"
line-length = 88
lint.select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
lint.ignore = ["E501", "ANN101", "ANN401", "A003"]
[tool.mypy] [tool.mypy]
python_version = "3.11" python_version = "3.11"
warn_redundant_casts = true warn_redundant_casts = true

View File

@@ -32,7 +32,8 @@ def retry(fn: Callable, timeout: int = 900) -> None:
time.sleep(1) time.sleep(1)
if not fn(True): if not fn(True):
raise Exception(f"action timed out after {timeout} seconds") msg = f"action timed out after {timeout} seconds"
raise Exception(msg)
class Machine: class Machine:
@@ -75,7 +76,8 @@ class Machine:
if line.startswith("systemd[1]: Startup finished in"): if line.startswith("systemd[1]: Startup finished in"):
break break
else: else:
raise RuntimeError(f"Failed to start container {self.name}") msg = f"Failed to start container {self.name}"
raise RuntimeError(msg)
childs = ( childs = (
Path(f"/proc/{self.process.pid}/task/{self.process.pid}/children") Path(f"/proc/{self.process.pid}/task/{self.process.pid}/children")
.read_text() .read_text()
@@ -86,8 +88,9 @@ class Machine:
), f"Expected exactly one child process for systemd-nspawn, got {childs}" ), f"Expected exactly one child process for systemd-nspawn, got {childs}"
try: try:
return int(childs[0]) return int(childs[0])
except ValueError: except ValueError as e:
raise RuntimeError(f"Failed to parse child process id {childs[0]}") msg = f"Failed to parse child process id {childs[0]}"
raise RuntimeError(msg) from e
def get_unit_info(self, unit: str) -> dict[str, str]: def get_unit_info(self, unit: str) -> dict[str, str]:
proc = self.systemctl(f'--no-pager show "{unit}"') proc = self.systemctl(f'--no-pager show "{unit}"')
@@ -202,16 +205,16 @@ class Machine:
info = self.get_unit_info(unit) info = self.get_unit_info(unit)
state = info["ActiveState"] state = info["ActiveState"]
if state == "failed": if state == "failed":
raise Exception(f'unit "{unit}" reached state "{state}"') msg = f'unit "{unit}" reached state "{state}"'
raise Exception(msg)
if state == "inactive": if state == "inactive":
proc = self.systemctl("list-jobs --full 2>&1") proc = self.systemctl("list-jobs --full 2>&1")
if "No jobs" in proc.stdout: if "No jobs" in proc.stdout:
info = self.get_unit_info(unit) info = self.get_unit_info(unit)
if info["ActiveState"] == state: if info["ActiveState"] == state:
raise Exception( msg = f'unit "{unit}" is inactive and there are no pending jobs'
f'unit "{unit}" is inactive and there are no pending jobs' raise Exception(msg)
)
return state == "active" return state == "active"
@@ -220,7 +223,8 @@ class Machine:
def succeed(self, command: str, timeout: int | None = None) -> str: def succeed(self, command: str, timeout: int | None = None) -> str:
res = self.execute(command, timeout=timeout) res = self.execute(command, timeout=timeout)
if res.returncode != 0: if res.returncode != 0:
raise RuntimeError(f"Failed to run command {command}") msg = f"Failed to run command {command}"
raise RuntimeError(msg)
return res.stdout return res.stdout
def shutdown(self) -> None: def shutdown(self) -> None:
@@ -260,7 +264,8 @@ class Driver:
for container in containers: for container in containers:
name_match = re.match(r".*-nixos-system-(.+)-(.+)", container.name) name_match = re.match(r".*-nixos-system-(.+)-(.+)", container.name)
if not name_match: if not name_match:
raise ValueError(f"Unable to extract hostname from {container.name}") msg = f"Unable to extract hostname from {container.name}"
raise ValueError(msg)
name = name_match.group(1) name = name_match.group(1)
self.machines.append( self.machines.append(
Machine( Machine(
@@ -276,12 +281,12 @@ class Driver:
machine.start() machine.start()
def test_symbols(self) -> dict[str, Any]: def test_symbols(self) -> dict[str, Any]:
general_symbols = dict( general_symbols = {
start_all=self.start_all, "start_all": self.start_all,
machines=self.machines, "machines": self.machines,
driver=self, "driver": self,
Machine=Machine, # for typing "Machine": Machine, # for typing
) }
machine_symbols = {pythonize_name(m.name): m for m in self.machines} machine_symbols = {pythonize_name(m.name): m for m in self.machines}
# If there's exactly one machine, make it available under the name # If there's exactly one machine, make it available under the name
# "machine", even if it's not called that. # "machine", even if it's not called that.
@@ -289,7 +294,7 @@ class Driver:
(machine_symbols["machine"],) = self.machines (machine_symbols["machine"],) = self.machines
print( print(
"additionally exposed symbols:\n " "additionally exposed symbols:\n "
+ ", ".join(map(lambda m: m.name, self.machines)) + ", ".join(m.name for m in self.machines)
+ ",\n " + ",\n "
+ ", ".join(list(general_symbols.keys())) + ", ".join(list(general_symbols.keys()))
) )
@@ -319,9 +324,11 @@ def writeable_dir(arg: str) -> Path:
""" """
path = Path(arg) path = Path(arg)
if not path.is_dir(): if not path.is_dir():
raise argparse.ArgumentTypeError(f"{path} is not a directory") msg = f"{path} is not a directory"
raise argparse.ArgumentTypeError(msg)
if not os.access(path, os.W_OK): if not os.access(path, os.W_OK):
raise argparse.ArgumentTypeError(f"{path} is not a writeable directory") msg = f"{path} is not a writeable directory"
raise argparse.ArgumentTypeError(msg)
return path return path

View File

@@ -152,12 +152,12 @@ options_head = "\n## Module Options\n"
def produce_clan_core_docs() -> None: def produce_clan_core_docs() -> None:
if not CLAN_CORE_DOCS: if not CLAN_CORE_DOCS:
raise ValueError( msg = f"Environment variables are not set correctly: $CLAN_CORE_DOCS={CLAN_CORE_DOCS}"
f"Environment variables are not set correctly: $CLAN_CORE_DOCS={CLAN_CORE_DOCS}" raise ValueError(msg)
)
if not OUT: if not OUT:
raise ValueError(f"Environment variables are not set correctly: $out={OUT}") msg = f"Environment variables are not set correctly: $out={OUT}"
raise ValueError(msg)
# A mapping of output file to content # A mapping of output file to content
core_outputs: dict[str, str] = {} core_outputs: dict[str, str] = {}
@@ -227,17 +227,18 @@ clan_modules_descr = """Clan modules are [NixOS modules](https://wiki.nixos.org/
def produce_clan_modules_docs() -> None: def produce_clan_modules_docs() -> None:
if not CLAN_MODULES: if not CLAN_MODULES:
raise ValueError( msg = (
f"Environment variables are not set correctly: $CLAN_MODULES={CLAN_MODULES}" f"Environment variables are not set correctly: $CLAN_MODULES={CLAN_MODULES}"
) )
raise ValueError(msg)
if not CLAN_CORE_PATH: if not CLAN_CORE_PATH:
raise ValueError( msg = f"Environment variables are not set correctly: $CLAN_CORE_PATH={CLAN_CORE_PATH}"
f"Environment variables are not set correctly: $CLAN_CORE_PATH={CLAN_CORE_PATH}" raise ValueError(msg)
)
if not OUT: if not OUT:
raise ValueError(f"Environment variables are not set correctly: $out={OUT}") msg = f"Environment variables are not set correctly: $out={OUT}"
raise ValueError(msg)
with open(CLAN_MODULES) as f: with open(CLAN_MODULES) as f:
links: dict[str, str] = json.load(f) links: dict[str, str] = json.load(f)

View File

@@ -71,9 +71,11 @@ class ZerotierController:
self, self,
path: str, path: str,
method: str = "GET", method: str = "GET",
headers: dict[str, str] = {}, headers: dict[str, str] | None = None,
data: dict[str, Any] | None = None, data: dict[str, Any] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
if headers is None:
headers = {}
body = None body = None
headers = headers.copy() headers = headers.copy()
if data is not None: if data is not None:
@@ -88,7 +90,9 @@ class ZerotierController:
def status(self) -> dict[str, Any]: def status(self) -> dict[str, Any]:
return self._http_request("/status") return self._http_request("/status")
def create_network(self, data: dict[str, Any] = {}) -> dict[str, Any]: def create_network(self, data: dict[str, Any] | None = None) -> dict[str, Any]:
if data is None:
data = {}
return self._http_request( return self._http_request(
f"/controller/network/{self.identity.node_id()}______", f"/controller/network/{self.identity.node_id()}______",
method="POST", method="POST",
@@ -104,7 +108,8 @@ def zerotier_controller() -> Iterator[ZerotierController]:
# This check could be racy but it's unlikely in practice # This check could be racy but it's unlikely in practice
controller_port = find_free_port() controller_port = find_free_port()
if controller_port is None: if controller_port is None:
raise ClanError("cannot find a free port for zerotier controller") msg = "cannot find a free port for zerotier controller"
raise ClanError(msg)
with TemporaryDirectory() as d: with TemporaryDirectory() as d:
tempdir = Path(d) tempdir = Path(d)
@@ -128,9 +133,10 @@ def zerotier_controller() -> Iterator[ZerotierController]:
while not try_connect_port(controller_port): while not try_connect_port(controller_port):
status = p.poll() status = p.poll()
if status is not None: if status is not None:
raise ClanError( msg = (
f"zerotier-one has been terminated unexpected with {status}" f"zerotier-one has been terminated unexpected with {status}"
) )
raise ClanError(msg)
time.sleep(0.1) time.sleep(0.1)
print() print()
@@ -209,7 +215,8 @@ def main() -> None:
match args.mode: match args.mode:
case "network": case "network":
if args.network_id is None: if args.network_id is None:
raise ValueError("network_id parameter is required") msg = "network_id parameter is required"
raise ValueError(msg)
controller = create_network_controller() controller = create_network_controller()
identity = controller.identity identity = controller.identity
network_id = controller.networkid network_id = controller.networkid
@@ -218,7 +225,8 @@ def main() -> None:
identity = create_identity() identity = create_identity()
network_id = args.network_id network_id = args.network_id
case _: case _:
raise ValueError(f"unknown mode {args.mode}") msg = f"unknown mode {args.mode}"
raise ValueError(msg)
ip = compute_zerotier_ip(network_id, identity) ip = compute_zerotier_ip(network_id, identity)
args.identity_secret.write_text(identity.private) args.identity_secret.write_text(identity.private)

View File

@@ -47,7 +47,8 @@ class ImplFunc(GObject.Object, Generic[P, B]):
self.connect("returns", fn) self.connect("returns", fn)
def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool: def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool:
raise NotImplementedError("Method 'async_run' must be implemented") msg = "Method 'async_run' must be implemented"
raise NotImplementedError(msg)
def _async_run(self, data: Any) -> bool: def _async_run(self, data: Any) -> bool:
result = GLib.SOURCE_REMOVE result = GLib.SOURCE_REMOVE
@@ -56,7 +57,6 @@ class ImplFunc(GObject.Object, Generic[P, B]):
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
# TODO: send error to js # TODO: send error to js
finally:
return result return result
@@ -93,7 +93,8 @@ class GObjApi:
fn_name = obj.__name__ fn_name = obj.__name__
if fn_name in self._obj_registry: if fn_name in self._obj_registry:
raise ValueError(f"Function '{fn_name}' already registered") msg = f"Function '{fn_name}' already registered"
raise ValueError(msg)
self._obj_registry[fn_name] = obj self._obj_registry[fn_name] = obj
def check_signature(self, fn_signatures: dict[str, inspect.Signature]) -> None: def check_signature(self, fn_signatures: dict[str, inspect.Signature]) -> None:
@@ -120,9 +121,8 @@ class GObjApi:
if exp_signature != got_signature: if exp_signature != got_signature:
log.error(f"Expected signature: {exp_signature}") log.error(f"Expected signature: {exp_signature}")
log.error(f"Actual signature: {got_signature}") log.error(f"Actual signature: {got_signature}")
raise ValueError( msg = f"Overwritten method '{m_name}' has different signature than the implementation"
f"Overwritten method '{m_name}' has different signature than the implementation" raise ValueError(msg)
)
def get_obj(self, fn_name: str) -> type[ImplFunc]: def get_obj(self, fn_name: str) -> type[ImplFunc]:
result = self._obj_registry.get(fn_name, None) result = self._obj_registry.get(fn_name, None)
@@ -131,7 +131,8 @@ class GObjApi:
plain_fn = self._methods.get(fn_name, None) plain_fn = self._methods.get(fn_name, None)
if plain_fn is None: if plain_fn is None:
raise ValueError(f"Method '{fn_name}' not found in Api") msg = f"Method '{fn_name}' not found in Api"
raise ValueError(msg)
class GenericFnRuntime(ImplFunc[..., Any]): class GenericFnRuntime(ImplFunc[..., Any]):
def __init__(self) -> None: def __init__(self) -> None:

View File

@@ -27,7 +27,8 @@ class ToastOverlay:
_instance: "None | ToastOverlay" = None _instance: "None | ToastOverlay" = None
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod @classmethod
def use(cls: Any) -> "ToastOverlay": def use(cls: Any) -> "ToastOverlay":

View File

@@ -25,7 +25,8 @@ class ViewStack:
# Make sure the VMS class is used as a singleton # Make sure the VMS class is used as a singleton
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod @classmethod
def use(cls: Any) -> "ViewStack": def use(cls: Any) -> "ViewStack":

View File

@@ -131,15 +131,6 @@ python3.pkgs.buildPythonApplication rec {
${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests ${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests
touch $out touch $out
''; '';
clan-app-no-breakpoints = runCommand "clan-app-no-breakpoints" { } ''
if grep --include \*.py -Rq "breakpoint()" ${source}; then
echo "breakpoint() found in ${source}:"
grep --include \*.py -Rn "breakpoint()" ${source}
exit 1
fi
touch $out
'';
}; };
}; };

View File

@@ -39,9 +39,3 @@ no_implicit_optional = true
[[tool.mypy.overrides]] [[tool.mypy.overrides]]
module = "clan_cli.*" module = "clan_cli.*"
ignore_missing_imports = true ignore_missing_imports = true
[tool.ruff]
target-version = "py312"
line-length = 88
lint.select = ["E", "F", "I", "U", "N", "RUF", "ANN", "A"]
lint.ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]

View File

@@ -17,12 +17,14 @@ class Command:
def run( def run(
self, self,
command: list[str], command: list[str],
extra_env: dict[str, str] = {}, extra_env: dict[str, str] | None = None,
stdin: _FILE = None, stdin: _FILE = None,
stdout: _FILE = None, stdout: _FILE = None,
stderr: _FILE = None, stderr: _FILE = None,
workdir: Path | None = None, workdir: Path | None = None,
) -> subprocess.Popen[str]: ) -> subprocess.Popen[str]:
if extra_env is None:
extra_env = {}
env = os.environ.copy() env = os.environ.copy()
env.update(extra_env) env.update(extra_env)
# We start a new session here so that we can than more reliably kill all childs as well # We start a new session here so that we can than more reliably kill all childs as well

View File

@@ -1,9 +1,8 @@
import logging import logging
import shlex import shlex
from clan_cli.custom_logger import get_caller
from clan_app import main from clan_app import main
from clan_cli.custom_logger import get_caller
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@@ -85,8 +85,7 @@ class MethodRegistry:
def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]: def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]:
@wraps(fn) @wraps(fn)
def wrapper(*args: Any, op_key: str, **kwargs: Any) -> ApiResponse[T]: def wrapper(*args: Any, op_key: str, **kwargs: Any) -> ApiResponse[T]:
raise NotImplementedError( msg = f"""{fn.__name__} - The platform didn't implement this function.
f"""{fn.__name__} - The platform didn't implement this function.
--- ---
# Example # Example
@@ -103,16 +102,18 @@ def open_file(file_request: FileRequest) -> str | None:
API.register(open_file) API.register(open_file)
--- ---
""" """
) raise NotImplementedError(msg)
self.register(wrapper) self.register(wrapper)
return fn return fn
def register(self, fn: Callable[..., T]) -> Callable[..., T]: def register(self, fn: Callable[..., T]) -> Callable[..., T]:
if fn.__name__ in self._registry: if fn.__name__ in self._registry:
raise ValueError(f"Function {fn.__name__} already registered") msg = f"Function {fn.__name__} already registered"
raise ValueError(msg)
if fn.__name__ in self._orig_signature: if fn.__name__ in self._orig_signature:
raise ValueError(f"Function {fn.__name__} already registered") msg = f"Function {fn.__name__} already registered"
raise ValueError(msg)
# make copy of original function # make copy of original function
self._orig_signature[fn.__name__] = signature(fn) self._orig_signature[fn.__name__] = signature(fn)
@@ -158,7 +159,7 @@ API.register(open_file)
"$comment": "An object containing API methods. ", "$comment": "An object containing API methods. ",
"type": "object", "type": "object",
"additionalProperties": False, "additionalProperties": False,
"required": [func_name for func_name in self._registry.keys()], "required": list(self._registry.keys()),
"properties": {}, "properties": {},
} }

View File

@@ -38,12 +38,14 @@ def set_admin_service(
inventory = load_inventory_eval(base_url) inventory = load_inventory_eval(base_url)
if not allowed_keys: if not allowed_keys:
raise ValueError("At least one key must be provided to ensure access") msg = "At least one key must be provided to ensure access"
raise ValueError(msg)
keys = {} keys = {}
for name, keyfile in allowed_keys.items(): for name, keyfile in allowed_keys.items():
if not keyfile.startswith("/"): if not keyfile.startswith("/"):
raise ValueError(f"Keyfile '{keyfile}' must be an absolute path") msg = f"Keyfile '{keyfile}' must be an absolute path"
raise ValueError(msg)
with open(keyfile) as f: with open(keyfile) as f:
pubkey = f.read() pubkey = f.read()
keys[name] = pubkey keys[name] = pubkey

View File

@@ -51,8 +51,9 @@ def extract_frontmatter(readme_content: str, err_scope: str) -> tuple[Frontmatte
# Parse the TOML frontmatter # Parse the TOML frontmatter
frontmatter_parsed = tomllib.loads(frontmatter_raw) frontmatter_parsed = tomllib.loads(frontmatter_raw)
except tomllib.TOMLDecodeError as e: except tomllib.TOMLDecodeError as e:
msg = f"Error parsing TOML frontmatter: {e}"
raise ClanError( raise ClanError(
f"Error parsing TOML frontmatter: {e}", msg,
description=f"Invalid TOML frontmatter. {err_scope}", description=f"Invalid TOML frontmatter. {err_scope}",
location="extract_frontmatter", location="extract_frontmatter",
) from e ) from e
@@ -60,8 +61,9 @@ def extract_frontmatter(readme_content: str, err_scope: str) -> tuple[Frontmatte
return Frontmatter(**frontmatter_parsed), remaining_content return Frontmatter(**frontmatter_parsed), remaining_content
# If no frontmatter is found, raise an error # If no frontmatter is found, raise an error
msg = "Invalid README: Frontmatter not found."
raise ClanError( raise ClanError(
"Invalid README: Frontmatter not found.", msg,
location="extract_frontmatter", location="extract_frontmatter",
description=f"{err_scope} does not contain valid frontmatter.", description=f"{err_scope} does not contain valid frontmatter.",
) )
@@ -98,8 +100,9 @@ def get_modules(base_path: str) -> dict[str, str]:
proc = run_no_stdout(cmd) proc = run_no_stdout(cmd)
res = proc.stdout.strip() res = proc.stdout.strip()
except ClanCmdError as e: except ClanCmdError as e:
msg = "clanInternals might not have clanModules attributes"
raise ClanError( raise ClanError(
"clanInternals might not have clanModules attributes", msg,
location=f"list_modules {base_path}", location=f"list_modules {base_path}",
description="Evaluation failed on clanInternals.clanModules attribute", description="Evaluation failed on clanInternals.clanModules attribute",
) from e ) from e
@@ -127,15 +130,17 @@ def get_module_info(
Retrieves information about a module Retrieves information about a module
""" """
if not module_path: if not module_path:
msg = "Module not found"
raise ClanError( raise ClanError(
"Module not found", msg,
location=f"show_module_info {module_name}", location=f"show_module_info {module_name}",
description="Module does not exist", description="Module does not exist",
) )
module_readme = Path(module_path) / "README.md" module_readme = Path(module_path) / "README.md"
if not module_readme.exists(): if not module_readme.exists():
msg = "Module not found"
raise ClanError( raise ClanError(
"Module not found", msg,
location=f"show_module_info {module_name}", location=f"show_module_info {module_name}",
description="Module does not exist or doesn't have any README.md file", description="Module does not exist or doesn't have any README.md file",
) )
@@ -170,9 +175,8 @@ def set_service_instance(
service_keys = get_type_hints(Service).keys() service_keys = get_type_hints(Service).keys()
if module_name not in service_keys: if module_name not in service_keys:
raise ValueError( msg = f"{module_name} is not a valid Service attribute. Expected one of {', '.join(service_keys)}."
f"{module_name} is not a valid Service attribute. Expected one of {', '.join(service_keys)}." raise ValueError(msg)
)
inventory = load_inventory_json(base_path) inventory = load_inventory_json(base_path)
target_type = get_args(get_type_hints(Service)[module_name])[1] target_type = get_args(get_type_hints(Service)[module_name])[1]

View File

@@ -129,7 +129,8 @@ def construct_value(
if loc is None: if loc is None:
loc = [] loc = []
if t is None and field_value: if t is None and field_value:
raise ClanError(f"Expected None but got: {field_value}", location=f"{loc}") msg = f"Expected None but got: {field_value}"
raise ClanError(msg, location=f"{loc}")
if is_type_in_union(t, type(None)) and field_value is None: if is_type_in_union(t, type(None)) and field_value is None:
# Sometimes the field value is None, which is valid if the type hint allows None # Sometimes the field value is None, which is valid if the type hint allows None
@@ -145,8 +146,11 @@ def construct_value(
# Field_value must be a string # Field_value must be a string
elif is_type_in_union(t, Path): elif is_type_in_union(t, Path):
if not isinstance(field_value, str): if not isinstance(field_value, str):
msg = (
f"Expected string, cannot construct pathlib.Path() from: {field_value} "
)
raise ClanError( raise ClanError(
f"Expected string, cannot construct pathlib.Path() from: {field_value} ", msg,
location=f"{loc}", location=f"{loc}",
) )
@@ -155,7 +159,8 @@ def construct_value(
# Trivial values # Trivial values
elif t is str: elif t is str:
if not isinstance(field_value, str): if not isinstance(field_value, str):
raise ClanError(f"Expected string, got {field_value}", location=f"{loc}") msg = f"Expected string, got {field_value}"
raise ClanError(msg, location=f"{loc}")
return field_value return field_value
@@ -178,7 +183,8 @@ def construct_value(
# dict # dict
elif get_origin(t) is list: elif get_origin(t) is list:
if not isinstance(field_value, list): if not isinstance(field_value, list):
raise ClanError(f"Expected list, got {field_value}", location=f"{loc}") msg = f"Expected list, got {field_value}"
raise ClanError(msg, location=f"{loc}")
return [construct_value(get_args(t)[0], item) for item in field_value] return [construct_value(get_args(t)[0], item) for item in field_value]
elif get_origin(t) is dict and isinstance(field_value, dict): elif get_origin(t) is dict and isinstance(field_value, dict):
@@ -189,9 +195,8 @@ def construct_value(
elif get_origin(t) is Literal: elif get_origin(t) is Literal:
valid_values = get_args(t) valid_values = get_args(t)
if field_value not in valid_values: if field_value not in valid_values:
raise ClanError( msg = f"Expected one of {valid_values}, got {field_value}"
f"Expected one of {valid_values}, got {field_value}", location=f"{loc}" raise ClanError(msg, location=f"{loc}")
)
return field_value return field_value
elif get_origin(t) is Annotated: elif get_origin(t) is Annotated:
@@ -202,7 +207,8 @@ def construct_value(
# Unhandled # Unhandled
else: else:
raise ClanError(f"Unhandled field type {t} with value {field_value}") msg = f"Unhandled field type {t} with value {field_value}"
raise ClanError(msg)
def construct_dataclass( def construct_dataclass(
@@ -215,7 +221,8 @@ def construct_dataclass(
if path is None: if path is None:
path = [] path = []
if not is_dataclass(t): if not is_dataclass(t):
raise ClanError(f"{t.__name__} is not a dataclass") msg = f"{t.__name__} is not a dataclass"
raise ClanError(msg)
# Attempt to create an instance of the data_class# # Attempt to create an instance of the data_class#
field_values: dict[str, Any] = {} field_values: dict[str, Any] = {}
@@ -251,9 +258,8 @@ def construct_dataclass(
for field_name in required: for field_name in required:
if field_name not in field_values: if field_name not in field_values:
formatted_path = " ".join(path) formatted_path = " ".join(path)
raise ClanError( msg = f"Default value missing for: '{field_name}' in {t} {formatted_path}, got Value: {data}"
f"Default value missing for: '{field_name}' in {t} {formatted_path}, got Value: {data}" raise ClanError(msg)
)
return t(**field_values) # type: ignore return t(**field_values) # type: ignore
@@ -265,7 +271,8 @@ def from_dict(
path = [] path = []
if is_dataclass(t): if is_dataclass(t):
if not isinstance(data, dict): if not isinstance(data, dict):
raise ClanError(f"{data} is not a dict. Expected {t}") msg = f"{data} is not a dict. Expected {t}"
raise ClanError(msg)
return construct_dataclass(t, data, path) # type: ignore return construct_dataclass(t, data, path) # type: ignore
else: else:
return construct_value(t, data, path) return construct_value(t, data, path)

View File

@@ -122,9 +122,8 @@ def type_to_dict(
# And return the resolved type instead of the TypeVar # And return the resolved type instead of the TypeVar
resolved = type_map.get(t) resolved = type_map.get(t)
if not resolved: if not resolved:
raise JSchemaTypeError( msg = f"{scope} - TypeVar {t} not found in type_map, map: {type_map}"
f"{scope} - TypeVar {t} not found in type_map, map: {type_map}" raise JSchemaTypeError(msg)
)
return type_to_dict(type_map.get(t), scope, type_map) return type_to_dict(type_map.get(t), scope, type_map)
elif hasattr(t, "__origin__"): # Check if it's a generic type elif hasattr(t, "__origin__"): # Check if it's a generic type
@@ -134,7 +133,8 @@ def type_to_dict(
if origin is None: if origin is None:
# Non-generic user-defined or built-in type # Non-generic user-defined or built-in type
# TODO: handle custom types # TODO: handle custom types
raise JSchemaTypeError(f"{scope} Unhandled Type: ", origin) msg = f"{scope} Unhandled Type: "
raise JSchemaTypeError(msg, origin)
elif origin is Literal: elif origin is Literal:
# Handle Literal values for enums in JSON Schema # Handle Literal values for enums in JSON Schema
@@ -179,7 +179,8 @@ def type_to_dict(
new_map.update(inspect_dataclass_fields(t)) new_map.update(inspect_dataclass_fields(t))
return type_to_dict(origin, scope, new_map) return type_to_dict(origin, scope, new_map)
raise JSchemaTypeError(f"{scope} - Error api type not yet supported {t!s}") msg = f"{scope} - Error api type not yet supported {t!s}"
raise JSchemaTypeError(msg)
elif isinstance(t, type): elif isinstance(t, type):
if t is str: if t is str:
@@ -193,23 +194,23 @@ def type_to_dict(
if t is object: if t is object:
return {"type": "object"} return {"type": "object"}
if t is Any: if t is Any:
raise JSchemaTypeError( msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}"
f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}" raise JSchemaTypeError(msg)
)
if t is pathlib.Path: if t is pathlib.Path:
return { return {
# TODO: maybe give it a pattern for URI # TODO: maybe give it a pattern for URI
"type": "string", "type": "string",
} }
if t is dict: if t is dict:
raise JSchemaTypeError( msg = f"{scope} - Generic 'dict' type not supported. Use dict[str, Any] or any more expressive type."
f"{scope} - Generic 'dict' type not supported. Use dict[str, Any] or any more expressive type." raise JSchemaTypeError(msg)
)
# Optional[T] gets internally transformed Union[T,NoneType] # Optional[T] gets internally transformed Union[T,NoneType]
if t is NoneType: if t is NoneType:
return {"type": "null"} return {"type": "null"}
raise JSchemaTypeError(f"{scope} - Error primitive type not supported {t!s}") msg = f"{scope} - Error primitive type not supported {t!s}"
raise JSchemaTypeError(msg)
else: else:
raise JSchemaTypeError(f"{scope} - Error type not supported {t!s}") msg = f"{scope} - Error type not supported {t!s}"
raise JSchemaTypeError(msg)

View File

@@ -22,24 +22,28 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
[backup_scripts["providers"][provider]["create"]], [backup_scripts["providers"][provider]["create"]],
) )
if proc.returncode != 0: if proc.returncode != 0:
raise ClanError("failed to start backup") msg = "failed to start backup"
raise ClanError(msg)
else: else:
print("successfully started backup") print("successfully started backup")
else: else:
if provider not in backup_scripts["providers"]: if provider not in backup_scripts["providers"]:
raise ClanError(f"provider {provider} not found") msg = f"provider {provider} not found"
raise ClanError(msg)
proc = machine.target_host.run( proc = machine.target_host.run(
[backup_scripts["providers"][provider]["create"]], [backup_scripts["providers"][provider]["create"]],
) )
if proc.returncode != 0: if proc.returncode != 0:
raise ClanError("failed to start backup") msg = "failed to start backup"
raise ClanError(msg)
else: else:
print("successfully started backup") print("successfully started backup")
def create_command(args: argparse.Namespace) -> None: def create_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
machine = Machine(name=args.machine, flake=args.flake) machine = Machine(name=args.machine, flake=args.flake)
create_backup(machine=machine, provider=args.provider) create_backup(machine=machine, provider=args.provider)

View File

@@ -54,7 +54,8 @@ def list_backups(machine: Machine, provider: str | None = None) -> list[Backup]:
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
machine = Machine(name=args.machine, flake=args.flake) machine = Machine(name=args.machine, flake=args.flake)
backups = list_backups(machine=machine, provider=args.provider) backups = list_backups(machine=machine, provider=args.provider)
for backup in backups: for backup in backups:

View File

@@ -32,9 +32,8 @@ def restore_service(machine: Machine, name: str, provider: str, service: str) ->
extra_env=env, extra_env=env,
) )
if proc.returncode != 0: if proc.returncode != 0:
raise ClanError( msg = f"failed to run preRestoreCommand: {pre_restore}, error was: {proc.stdout}"
f"failed to run preRestoreCommand: {pre_restore}, error was: {proc.stdout}" raise ClanError(msg)
)
proc = machine.target_host.run( proc = machine.target_host.run(
[backup_metadata["providers"][provider]["restore"]], [backup_metadata["providers"][provider]["restore"]],
@@ -42,9 +41,8 @@ def restore_service(machine: Machine, name: str, provider: str, service: str) ->
extra_env=env, extra_env=env,
) )
if proc.returncode != 0: if proc.returncode != 0:
raise ClanError( msg = f"failed to restore backup: {backup_metadata['providers'][provider]['restore']}"
f"failed to restore backup: {backup_metadata['providers'][provider]['restore']}" raise ClanError(msg)
)
if post_restore := backup_folders[service]["postRestoreCommand"]: if post_restore := backup_folders[service]["postRestoreCommand"]:
proc = machine.target_host.run( proc = machine.target_host.run(
@@ -53,9 +51,8 @@ def restore_service(machine: Machine, name: str, provider: str, service: str) ->
extra_env=env, extra_env=env,
) )
if proc.returncode != 0: if proc.returncode != 0:
raise ClanError( msg = f"failed to run postRestoreCommand: {post_restore}, error was: {proc.stdout}"
f"failed to run postRestoreCommand: {post_restore}, error was: {proc.stdout}" raise ClanError(msg)
)
def restore_backup( def restore_backup(
@@ -85,7 +82,8 @@ def restore_backup(
def restore_command(args: argparse.Namespace) -> None: def restore_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
machine = Machine(name=args.machine, flake=args.flake) machine = Machine(name=args.machine, flake=args.flake)
restore_backup( restore_backup(
machine=machine, machine=machine,

View File

@@ -50,9 +50,8 @@ def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
# Check if the machine exists # Check if the machine exists
machines: list[str] = list_nixos_machines(flake_url) machines: list[str] = list_nixos_machines(flake_url)
if machine_name not in machines: if machine_name not in machines:
raise ClanError( msg = f"Machine {machine_name} not found in {flake_url}. Available machines: {', '.join(machines)}"
f"Machine {machine_name} not found in {flake_url}. Available machines: {', '.join(machines)}" raise ClanError(msg)
)
machine = Machine(machine_name, FlakeId(str(flake_url))) machine = Machine(machine_name, FlakeId(str(flake_url)))
vm = inspect_vm(machine) vm = inspect_vm(machine)

View File

@@ -27,8 +27,9 @@ def show_clan_meta(uri: str | Path) -> Meta:
proc = run_no_stdout(cmd) proc = run_no_stdout(cmd)
res = proc.stdout.strip() res = proc.stdout.strip()
except ClanCmdError as e: except ClanCmdError as e:
msg = "Evaluation failed on meta attribute"
raise ClanError( raise ClanError(
"Evaluation failed on meta attribute", msg,
location=f"show_clan {uri}", location=f"show_clan {uri}",
description=str(e.cmd), description=str(e.cmd),
) from e ) from e
@@ -45,8 +46,9 @@ def show_clan_meta(uri: str | Path) -> Meta:
icon_path = meta_icon icon_path = meta_icon
elif scheme in [""]: elif scheme in [""]:
if Path(meta_icon).is_absolute(): if Path(meta_icon).is_absolute():
msg = "Invalid absolute path"
raise ClanError( raise ClanError(
"Invalid absolute path", msg,
location=f"show_clan {uri}", location=f"show_clan {uri}",
description="Icon path must be a URL or a relative path.", description="Icon path must be a URL or a relative path.",
) )
@@ -54,8 +56,9 @@ def show_clan_meta(uri: str | Path) -> Meta:
else: else:
icon_path = str((Path(uri) / meta_icon).resolve()) icon_path = str((Path(uri) / meta_icon).resolve())
else: else:
msg = "Invalid schema"
raise ClanError( raise ClanError(
"Invalid schema", msg,
location=f"show_clan {uri}", location=f"show_clan {uri}",
description="Icon path must be a URL or a relative path.", description="Icon path must be a URL or a relative path.",
) )

View File

@@ -74,7 +74,8 @@ class ClanURI:
if uri.startswith("clan://"): if uri.startswith("clan://"):
nested_uri = uri[7:] nested_uri = uri[7:]
else: else:
raise ClanError(f"Invalid uri: expected clan://, got {uri}") msg = f"Invalid uri: expected clan://, got {uri}"
raise ClanError(msg)
# Parse the URI into components # Parse the URI into components
# url://netloc/path;parameters?query#fragment # url://netloc/path;parameters?query#fragment

View File

@@ -1,3 +1,4 @@
import datetime
import logging import logging
import os import os
import select import select
@@ -5,7 +6,7 @@ import shlex
import subprocess import subprocess
import sys import sys
import weakref import weakref
from datetime import datetime, timedelta from datetime import timedelta
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import IO, Any from typing import IO, Any
@@ -116,7 +117,7 @@ def run(
) )
else: else:
glog.debug(f"$: {shlex.join(cmd)} \nCaller: {get_caller()}") glog.debug(f"$: {shlex.join(cmd)} \nCaller: {get_caller()}")
tstart = datetime.now() tstart = datetime.datetime.now(tz=datetime.UTC)
# Start the subprocess # Start the subprocess
process = subprocess.Popen( process = subprocess.Popen(
@@ -132,7 +133,7 @@ def run(
process.communicate(input) process.communicate(input)
else: else:
process.wait() process.wait()
tend = datetime.now() tend = datetime.datetime.now(tz=datetime.UTC)
global TIME_TABLE global TIME_TABLE
TIME_TABLE.add(shlex.join(cmd), tend - tstart) TIME_TABLE.add(shlex.join(cmd), tend - tstart)

View File

@@ -40,7 +40,8 @@ def map_type(nix_type: str) -> Any:
subtype = nix_type.removeprefix("list of ") subtype = nix_type.removeprefix("list of ")
return list[map_type(subtype)] # type: ignore return list[map_type(subtype)] # type: ignore
else: else:
raise ClanError(f"Unknown type {nix_type}") msg = f"Unknown type {nix_type}"
raise ClanError(msg)
# merge two dicts recursively # merge two dicts recursively
@@ -79,7 +80,8 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
elif value[0] in ["false", "False", "no", "n", "0"]: elif value[0] in ["false", "False", "no", "n", "0"]:
return False return False
else: else:
raise ClanError(f"Invalid value {value} for boolean") msg = f"Invalid value {value} for boolean"
raise ClanError(msg)
# handle lists # handle lists
elif get_origin(input_type) is list: elif get_origin(input_type) is list:
subtype = input_type.__args__[0] subtype = input_type.__args__[0]
@@ -87,9 +89,8 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
# handle dicts # handle dicts
elif get_origin(input_type) is dict: elif get_origin(input_type) is dict:
if not isinstance(value, dict): if not isinstance(value, dict):
raise ClanError( msg = f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>" raise ClanError(msg)
)
subtype = input_type.__args__[1] subtype = input_type.__args__[1]
return {k: cast(v, subtype, opt_description) for k, v in value.items()} return {k: cast(v, subtype, opt_description) for k, v in value.items()}
elif str(input_type) == "str | None": elif str(input_type) == "str | None":
@@ -98,12 +99,12 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
return value[0] return value[0]
else: else:
if len(value) > 1: if len(value) > 1:
raise ClanError(f"Too many values for {opt_description}") msg = f"Too many values for {opt_description}"
raise ClanError(msg)
return input_type(value[0]) return input_type(value[0])
except ValueError as e: except ValueError as e:
raise ClanError( msg = f"Invalid type for option {opt_description} (expected {input_type.__name__})"
f"Invalid type for option {opt_description} (expected {input_type.__name__})" raise ClanError(msg) from e
) from e
def options_for_machine( def options_for_machine(
@@ -237,7 +238,8 @@ def find_option(
# element of the option path and find matching parent option # element of the option path and find matching parent option
# (see examples above for why this is needed) # (see examples above for why this is needed)
if len(option_path) == 1: if len(option_path) == 1:
raise ClanError(f"Option {option_description} not found") msg = f"Option {option_description} not found"
raise ClanError(msg)
option_path_parent = option_path[:-1] option_path_parent = option_path[:-1]
attr_prefix = option_path[-1] attr_prefix = option_path[-1]
return find_option( return find_option(

View File

@@ -89,7 +89,8 @@ def config_for_machine(flake_dir: Path, machine_name: str) -> dict:
def set_config_for_machine(flake_dir: Path, machine_name: str, config: dict) -> None: def set_config_for_machine(flake_dir: Path, machine_name: str, config: dict) -> None:
hostname_regex = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$" hostname_regex = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
if not re.match(hostname_regex, machine_name): if not re.match(hostname_regex, machine_name):
raise ClanError("Machine name must be a valid hostname") msg = "Machine name must be a valid hostname"
raise ClanError(msg)
if "networking" in config and "hostName" in config["networking"]: if "networking" in config and "hostName" in config["networking"]:
if machine_name != config["networking"]["hostName"]: if machine_name != config["networking"]["hostName"]:
raise ClanHttpError( raise ClanHttpError(

View File

@@ -42,12 +42,15 @@ def subtype_from_schema(schema: dict[str, Any]) -> type:
sub_type = subtype_from_schema(schema["additionalProperties"]) sub_type = subtype_from_schema(schema["additionalProperties"])
return dict[str, sub_type] # type: ignore return dict[str, sub_type] # type: ignore
elif "properties" in schema: elif "properties" in schema:
raise ClanError("Nested dicts are not supported") msg = "Nested dicts are not supported"
raise ClanError(msg)
else: else:
raise ClanError("Unknown object type") msg = "Unknown object type"
raise ClanError(msg)
elif schema["type"] == "array": elif schema["type"] == "array":
if "items" not in schema: if "items" not in schema:
raise ClanError("Untyped arrays are not supported") msg = "Untyped arrays are not supported"
raise ClanError(msg)
sub_type = subtype_from_schema(schema["items"]) sub_type = subtype_from_schema(schema["items"])
return list[sub_type] # type: ignore return list[sub_type] # type: ignore
else: else:
@@ -71,9 +74,11 @@ def type_from_schema_path(
subtype = type_from_schema_path(schema["additionalProperties"], path[1:]) subtype = type_from_schema_path(schema["additionalProperties"], path[1:])
return subtype return subtype
else: else:
raise ClanError(f"Unknown type for path {path}") msg = f"Unknown type for path {path}"
raise ClanError(msg)
else: else:
raise ClanError(f"Unknown type for path {path}") msg = f"Unknown type for path {path}"
raise ClanError(msg)
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]: def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
@@ -86,9 +91,8 @@ def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
if "additionalProperties" in value: if "additionalProperties" in value:
sub_type = value["additionalProperties"].get("type") sub_type = value["additionalProperties"].get("type")
if sub_type not in type_map: if sub_type not in type_map:
raise ClanError( msg = f"Unsupported object type {sub_type} (field {name})"
f"Unsupported object type {sub_type} (field {name})" raise ClanError(msg)
)
result[f"{name}.<name>"] = type_map[sub_type] result[f"{name}.<name>"] = type_map[sub_type]
continue continue
# handle properties # handle properties
@@ -98,10 +102,12 @@ def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
continue continue
elif type_ == "array": elif type_ == "array":
if "items" not in value: if "items" not in value:
raise ClanError(f"Untyped arrays are not supported (field: {name})") msg = f"Untyped arrays are not supported (field: {name})"
raise ClanError(msg)
sub_type = value["items"].get("type") sub_type = value["items"].get("type")
if sub_type not in type_map: if sub_type not in type_map:
raise ClanError(f"Unsupported list type {sub_type} (field {name})") msg = f"Unsupported list type {sub_type} (field {name})"
raise ClanError(msg)
sub_type_: type = type_map[sub_type] sub_type_: type = type_map[sub_type]
result[name] = list[sub_type_] # type: ignore result[name] = list[sub_type_] # type: ignore
continue continue

View File

@@ -110,5 +110,6 @@ def machine_schema(
env=env, env=env,
) )
if proc.returncode != 0: if proc.returncode != 0:
raise ClanError(f"Failed to read schema:\n{proc.stderr}") msg = f"Failed to read schema:\n{proc.stderr}"
raise ClanError(msg)
return json.loads(proc.stdout) return json.loads(proc.stdout)

View File

@@ -161,9 +161,8 @@ def _generate_facts_for_machine(
if service and service not in machine.facts_data: if service and service not in machine.facts_data:
services = list(machine.facts_data.keys()) services = list(machine.facts_data.keys())
raise ClanError( msg = f"Could not find service with name: {service}. The following services are available: {services}"
f"Could not find service with name: {service}. The following services are available: {services}" raise ClanError(msg)
)
if service: if service:
machine_service_facts = {service: machine.facts_data[service]} machine_service_facts = {service: machine.facts_data[service]}

View File

@@ -30,7 +30,7 @@ def get_command(args: argparse.Namespace) -> None:
# the raw_facts are bytestrings making them not json serializable # the raw_facts are bytestrings making them not json serializable
raw_facts = get_all_facts(machine) raw_facts = get_all_facts(machine)
facts = dict() facts = {}
for key in raw_facts["TODO"]: for key in raw_facts["TODO"]:
facts[key] = raw_facts["TODO"][key].decode("utf8") facts[key] = raw_facts["TODO"][key].decode("utf8")

View File

@@ -25,9 +25,8 @@ class FactStore(FactStoreBase):
fact_path.write_bytes(value) fact_path.write_bytes(value)
return fact_path return fact_path
else: else:
raise ClanError( msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
f"in_flake fact storage is only supported for local flakes: {self.machine.flake}" raise ClanError(msg)
)
def exists(self, service: str, name: str) -> bool: def exists(self, service: str, name: str) -> bool:
fact_path = ( fact_path = (

View File

@@ -32,7 +32,8 @@ class FactStore(FactStoreBase):
fact_path = self.dir / service / name fact_path = self.dir / service / name
if fact_path.exists(): if fact_path.exists():
return fact_path.read_bytes() return fact_path.read_bytes()
raise ClanError(f"Fact {name} for service {service} not found") msg = f"Fact {name} for service {service} not found"
raise ClanError(msg)
# get all facts # get all facts
def get_all(self) -> dict[str, dict[str, bytes]]: def get_all(self) -> dict[str, dict[str, bytes]]:

View File

@@ -44,7 +44,8 @@ def list_possible_keymaps() -> list[str]:
keymaps_dir = Path(result.stdout.strip()) / "share" / "keymaps" keymaps_dir = Path(result.stdout.strip()) / "share" / "keymaps"
if not keymaps_dir.exists(): if not keymaps_dir.exists():
raise FileNotFoundError(f"Keymaps directory '{keymaps_dir}' does not exist.") msg = f"Keymaps directory '{keymaps_dir}' does not exist."
raise FileNotFoundError(msg)
keymap_files = [] keymap_files = []
@@ -65,7 +66,8 @@ def list_possible_languages() -> list[str]:
locale_file = Path(result.stdout.strip()) / "share" / "i18n" / "SUPPORTED" locale_file = Path(result.stdout.strip()) / "share" / "i18n" / "SUPPORTED"
if not locale_file.exists(): if not locale_file.exists():
raise FileNotFoundError(f"Locale file '{locale_file}' does not exist.") msg = f"Locale file '{locale_file}' does not exist."
raise FileNotFoundError(msg)
with locale_file.open() as f: with locale_file.open() as f:
lines = f.readlines() lines = f.readlines()
@@ -107,29 +109,30 @@ def flash_machine(
if system_config.language: if system_config.language:
if system_config.language not in list_possible_languages(): if system_config.language not in list_possible_languages():
raise ClanError( msg = (
f"Language '{system_config.language}' is not a valid language. " f"Language '{system_config.language}' is not a valid language. "
f"Run 'clan flash --list-languages' to see a list of possible languages." f"Run 'clan flash --list-languages' to see a list of possible languages."
) )
raise ClanError(msg)
system_config_nix["i18n"] = {"defaultLocale": system_config.language} system_config_nix["i18n"] = {"defaultLocale": system_config.language}
if system_config.keymap: if system_config.keymap:
if system_config.keymap not in list_possible_keymaps(): if system_config.keymap not in list_possible_keymaps():
raise ClanError( msg = (
f"Keymap '{system_config.keymap}' is not a valid keymap. " f"Keymap '{system_config.keymap}' is not a valid keymap. "
f"Run 'clan flash --list-keymaps' to see a list of possible keymaps." f"Run 'clan flash --list-keymaps' to see a list of possible keymaps."
) )
raise ClanError(msg)
system_config_nix["console"] = {"keyMap": system_config.keymap} system_config_nix["console"] = {"keyMap": system_config.keymap}
if system_config.ssh_keys_path: if system_config.ssh_keys_path:
root_keys = [] root_keys = []
for key_path in map(lambda x: Path(x), system_config.ssh_keys_path): for key_path in (Path(x) for x in system_config.ssh_keys_path):
try: try:
root_keys.append(key_path.read_text()) root_keys.append(key_path.read_text())
except OSError as e: except OSError as e:
raise ClanError( msg = f"Cannot read SSH public key file: {key_path}: {e}"
f"Cannot read SSH public key file: {key_path}: {e}" raise ClanError(msg) from e
) from e
system_config_nix["users"] = { system_config_nix["users"] = {
"users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}} "users": {"root": {"openssh": {"authorizedKeys": {"keys": root_keys}}}}
} }
@@ -153,9 +156,8 @@ def flash_machine(
if os.geteuid() != 0: if os.geteuid() != 0:
if shutil.which("sudo") is None: if shutil.which("sudo") is None:
raise ClanError( msg = "sudo is required to run disko-install as a non-root user"
"sudo is required to run disko-install as a non-root user" raise ClanError(msg)
)
wrapper = 'set -x; disko_install=$(command -v disko-install); exec sudo "$disko_install" "$@"' wrapper = 'set -x; disko_install=$(command -v disko-install); exec sudo "$disko_install" "$@"'
disko_install.extend(["bash", "-c", wrapper]) disko_install.extend(["bash", "-c", wrapper])

View File

@@ -32,7 +32,8 @@ def commit_files(
# check that the file is in the git repository # check that the file is in the git repository
for file_path in file_paths: for file_path in file_paths:
if not Path(file_path).resolve().is_relative_to(repo_dir.resolve()): if not Path(file_path).resolve().is_relative_to(repo_dir.resolve()):
raise ClanError(f"File {file_path} is not in the git repository {repo_dir}") msg = f"File {file_path} is not in the git repository {repo_dir}"
raise ClanError(msg)
# generate commit message if not provided # generate commit message if not provided
if commit_message is None: if commit_message is None:
commit_message = "" commit_message = ""

View File

@@ -54,7 +54,8 @@ def list_history() -> list[HistoryEntry]:
parsed[i] = _merge_dicts(p, p.get("settings", {})) parsed[i] = _merge_dicts(p, p.get("settings", {}))
logs = [HistoryEntry(**p) for p in parsed] logs = [HistoryEntry(**p) for p in parsed]
except (json.JSONDecodeError, TypeError) as ex: except (json.JSONDecodeError, TypeError) as ex:
raise ClanError(f"History file at {user_history_file()} is corrupted") from ex msg = f"History file at {user_history_file()} is corrupted"
raise ClanError(msg) from ex
return logs return logs
@@ -63,7 +64,7 @@ def new_history_entry(url: str, machine: str) -> HistoryEntry:
flake = inspect_flake(url, machine) flake = inspect_flake(url, machine)
return HistoryEntry( return HistoryEntry(
flake=flake, flake=flake,
last_used=datetime.datetime.now().isoformat(), last_used=datetime.datetime.now(tz=datetime.UTC).isoformat(),
) )
@@ -93,7 +94,7 @@ def _add_maschine_to_history_list(
new_entry.flake.flake_url == str(uri_path) new_entry.flake.flake_url == str(uri_path)
and new_entry.flake.flake_attr == uri_machine and new_entry.flake.flake_attr == uri_machine
): ):
new_entry.last_used = datetime.datetime.now().isoformat() new_entry.last_used = datetime.datetime.now(tz=datetime.UTC).isoformat()
return new_entry return new_entry
new_entry = new_history_entry(uri_path, uri_machine) new_entry = new_history_entry(uri_path, uri_machine)

View File

@@ -33,7 +33,8 @@ def update_history() -> list[HistoryEntry]:
flake = inspect_flake(uri.get_url(), uri.machine_name) flake = inspect_flake(uri.get_url(), uri.machine_name)
flake.flake_url = flake.flake_url flake.flake_url = flake.flake_url
entry = HistoryEntry( entry = HistoryEntry(
flake=flake, last_used=datetime.datetime.now().isoformat() flake=flake,
last_used=datetime.datetime.now(tz=datetime.UTC).isoformat(),
) )
write_history_file(logs) write_history_file(logs)

View File

@@ -114,7 +114,8 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
inventory = from_dict(Inventory, data) inventory = from_dict(Inventory, data)
return inventory return inventory
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ClanError(f"Error decoding inventory from flake: {e}") from e msg = f"Error decoding inventory from flake: {e}"
raise ClanError(msg) from e
def load_inventory_json( def load_inventory_json(
@@ -134,7 +135,8 @@ def load_inventory_json(
inventory = from_dict(Inventory, res) inventory = from_dict(Inventory, res)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
# Error decoding the inventory file # Error decoding the inventory file
raise ClanError(f"Error decoding inventory file: {e}") from e msg = f"Error decoding inventory file: {e}"
raise ClanError(msg) from e
if not inventory_file.exists(): if not inventory_file.exists():
# Copy over the meta from the flake if the inventory is not initialized # Copy over the meta from the flake if the inventory is not initialized

View File

@@ -20,16 +20,16 @@ log = logging.getLogger(__name__)
def create_machine(flake: FlakeId, machine: Machine) -> None: def create_machine(flake: FlakeId, machine: Machine) -> None:
hostname_regex = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$" hostname_regex = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
if not re.match(hostname_regex, machine.name): if not re.match(hostname_regex, machine.name):
raise ClanError( msg = "Machine name must be a valid hostname"
"Machine name must be a valid hostname", location="Create Machine" raise ClanError(msg, location="Create Machine")
)
inventory = load_inventory_json(flake.path) inventory = load_inventory_json(flake.path)
full_inventory = load_inventory_eval(flake.path) full_inventory = load_inventory_eval(flake.path)
if machine.name in full_inventory.machines.keys(): if machine.name in full_inventory.machines.keys():
raise ClanError(f"Machine with the name {machine.name} already exists") msg = f"Machine with the name {machine.name} already exists"
raise ClanError(msg)
print(f"Define machine {machine.name}", machine) print(f"Define machine {machine.name}", machine)

View File

@@ -15,7 +15,8 @@ def delete_machine(flake: FlakeId, name: str) -> None:
machine = inventory.machines.pop(name, None) machine = inventory.machines.pop(name, None)
if machine is None: if machine is None:
raise ClanError(f"Machine {name} does not exist") msg = f"Machine {name} does not exist"
raise ClanError(msg)
save_inventory(inventory, flake.path, f"Delete machine {name}") save_inventory(inventory, flake.path, f"Delete machine {name}")

View File

@@ -159,7 +159,8 @@ def generate_machine_hardware_info(
if out.returncode != 0: if out.returncode != 0:
log.error(f"Failed to inspect {machine_name}. Address: {hostname}") log.error(f"Failed to inspect {machine_name}. Address: {hostname}")
log.error(out) log.error(out)
raise ClanError(f"Failed to inspect {machine_name}. Address: {hostname}") msg = f"Failed to inspect {machine_name}. Address: {hostname}"
raise ClanError(msg)
hw_file = Path( hw_file = Path(
f"{clan_dir}/machines/{machine_name}/{hw_nix_file if report_type == 'nixos-generate-config' else facter_file}" f"{clan_dir}/machines/{machine_name}/{hw_nix_file if report_type == 'nixos-generate-config' else facter_file}"
@@ -170,8 +171,9 @@ def generate_machine_hardware_info(
is_template = hw_file.exists() and "throw" in hw_file.read_text() is_template = hw_file.exists() and "throw" in hw_file.read_text()
if hw_file.exists() and not force and not is_template: if hw_file.exists() and not force and not is_template:
msg = "File exists."
raise ClanError( raise ClanError(
"File exists.", msg,
description="Hardware file already exists. To force overwrite the existing configuration use '--force'.", description="Hardware file already exists. To force overwrite the existing configuration use '--force'.",
location=f"{__name__} {hw_file}", location=f"{__name__} {hw_file}",
) )
@@ -205,8 +207,9 @@ def generate_machine_hardware_info(
backup_file.replace(hw_file) backup_file.replace(hw_file)
# TODO: Undo the commit # TODO: Undo the commit
msg = "Invalid hardware-configuration.nix file"
raise ClanError( raise ClanError(
"Invalid hardware-configuration.nix file", msg,
description="The hardware-configuration.nix file is invalid. Please check the file and try again.", description="The hardware-configuration.nix file is invalid. Please check the file and try again.",
location=f"{__name__} {hw_file}", location=f"{__name__} {hw_file}",
) from e ) from e

View File

@@ -132,7 +132,8 @@ def install_command(args: argparse.Namespace) -> None:
json_ssh_deploy = json.loads(qrcode_scan(args.png)) json_ssh_deploy = json.loads(qrcode_scan(args.png))
if not json_ssh_deploy and not args.target_host: if not json_ssh_deploy and not args.target_host:
raise ClanError("No target host provided, please provide a target host.") msg = "No target host provided, please provide a target host."
raise ClanError(msg)
if json_ssh_deploy: if json_ssh_deploy:
target_host = f"root@{find_reachable_host_from_deploy_json(json_ssh_deploy)}" target_host = f"root@{find_reachable_host_from_deploy_json(json_ssh_deploy)}"
@@ -171,13 +172,12 @@ def find_reachable_host_from_deploy_json(deploy_json: dict[str, str]) -> str:
host = addr host = addr
break break
if not host: if not host:
raise ClanError( msg = f"""
f"""
Could not reach any of the host addresses provided in the json string. Could not reach any of the host addresses provided in the json string.
Please doublecheck if they are reachable from your machine. Please doublecheck if they are reachable from your machine.
Try `ping [ADDR]` with one of the addresses: {deploy_json['addrs']} Try `ping [ADDR]` with one of the addresses: {deploy_json['addrs']}
""" """
) raise ClanError(msg)
return host return host

View File

@@ -44,7 +44,8 @@ def get_inventory_machine_details(
inventory = load_inventory_eval(flake_url) inventory = load_inventory_eval(flake_url)
machine = inventory.machines.get(machine_name) machine = inventory.machines.get(machine_name)
if machine is None: if machine is None:
raise ClanError(f"Machine {machine_name} not found in inventory") msg = f"Machine {machine_name} not found in inventory"
raise ClanError(msg)
hw_config_path = ( hw_config_path = (
Path(flake_url) / "machines" / Path(machine_name) / "hardware-configuration.nix" Path(flake_url) / "machines" / Path(machine_name) / "hardware-configuration.nix"
@@ -73,7 +74,8 @@ def list_nixos_machines(flake_url: str | Path) -> list[str]:
data = json.loads(res) data = json.loads(res)
return data return data
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ClanError(f"Error decoding machines from flake: {e}") from e msg = f"Error decoding machines from flake: {e}"
raise ClanError(msg) from e
@dataclass @dataclass
@@ -88,12 +90,14 @@ def check_machine_online(
) -> Literal["Online", "Offline"]: ) -> Literal["Online", "Offline"]:
machine = load_inventory_eval(flake_url).machines.get(machine_name) machine = load_inventory_eval(flake_url).machines.get(machine_name)
if not machine: if not machine:
raise ClanError(f"Machine {machine_name} not found in inventory") msg = f"Machine {machine_name} not found in inventory"
raise ClanError(msg)
hostname = machine.deploy.targetHost hostname = machine.deploy.targetHost
if not hostname: if not hostname:
raise ClanError(f"Machine {machine_name} does not specify a targetHost") msg = f"Machine {machine_name} does not specify a targetHost"
raise ClanError(msg)
timeout = opts.timeout if opts and opts.timeout else 20 timeout = opts.timeout if opts and opts.timeout else 20

View File

@@ -10,7 +10,7 @@ T = TypeVar("T")
class MachineGroup: class MachineGroup:
def __init__(self, machines: list[Machine]) -> None: def __init__(self, machines: list[Machine]) -> None:
self.group = HostGroup(list(m.target_host for m in machines)) self.group = HostGroup([m.target_host for m in machines])
def run_function( def run_function(
self, func: Callable[[Machine], T], check: bool = True self, func: Callable[[Machine], T], check: bool = True

View File

@@ -102,7 +102,8 @@ class Machine:
elif self.flake.is_remote(): elif self.flake.is_remote():
return Path(nix_metadata(self.flake.url)["path"]) return Path(nix_metadata(self.flake.url)["path"])
else: else:
raise ClanError(f"Unsupported flake url: {self.flake}") msg = f"Unsupported flake url: {self.flake}"
raise ClanError(msg)
@property @property
def target_host(self) -> Host: def target_host(self) -> Host:
@@ -144,7 +145,7 @@ class Machine:
config = nix_config() config = nix_config()
system = config["system"] system = config["system"]
file_info = dict() file_info = {}
with NamedTemporaryFile(mode="w") as config_json: with NamedTemporaryFile(mode="w") as config_json:
if extra_config is not None: if extra_config is not None:
json.dump(extra_config, config_json, indent=2) json.dump(extra_config, config_json, indent=2)
@@ -210,7 +211,8 @@ class Machine:
outpath = run_no_stdout(nix_build(args)).stdout.strip() outpath = run_no_stdout(nix_build(args)).stdout.strip()
return Path(outpath) return Path(outpath)
else: else:
raise ValueError(f"Unknown method {method}") msg = f"Unknown method {method}"
raise ValueError(msg)
def eval_nix( def eval_nix(
self, self,
@@ -234,7 +236,8 @@ class Machine:
self._eval_cache[attr] = output self._eval_cache[attr] = output
return output return output
else: else:
raise ClanError("eval_nix returned not a string") msg = "eval_nix returned not a string"
raise ClanError(msg)
def build_nix( def build_nix(
self, self,
@@ -259,4 +262,5 @@ class Machine:
self._build_cache[attr] = output self._build_cache[attr] = output
return output return output
else: else:
raise ClanError("build_nix returned not a Path") msg = "build_nix returned not a Path"
raise ClanError(msg)

View File

@@ -12,11 +12,9 @@ def validate_hostname(hostname: str) -> bool:
def machine_name_type(arg_value: str) -> str: def machine_name_type(arg_value: str) -> str:
if len(arg_value) > 63: if len(arg_value) > 63:
raise argparse.ArgumentTypeError( msg = "Machine name must be less than 63 characters long"
"Machine name must be less than 63 characters long" raise argparse.ArgumentTypeError(msg)
)
if not VALID_HOSTNAME.match(arg_value): if not VALID_HOSTNAME.match(arg_value):
raise argparse.ArgumentTypeError( msg = "Invalid character in machine name. Allowed characters are a-z, 0-9, ., and -. Must not start with a number"
"Invalid character in machine name. Allowed characters are a-z, 0-9, ., and -. Must not start with a number" raise argparse.ArgumentTypeError(msg)
)
return arg_value return arg_value

View File

@@ -80,9 +80,8 @@ def upload_sources(
try: try:
return json.loads(proc.stdout)["path"] return json.loads(proc.stdout)["path"]
except (json.JSONDecodeError, OSError) as e: except (json.JSONDecodeError, OSError) as e:
raise ClanError( msg = f"failed to parse output of {shlex.join(cmd)}: {e}\nGot: {proc.stdout}"
f"failed to parse output of {shlex.join(cmd)}: {e}\nGot: {proc.stdout}" raise ClanError(msg) from e
) from e
@API.register @API.register
@@ -96,7 +95,8 @@ def update_machines(base_path: str, machines: list[InventoryMachine]) -> None:
flake=FlakeId(base_path), flake=FlakeId(base_path),
) )
if not machine.deploy.targetHost: if not machine.deploy.targetHost:
raise ClanError(f"'TargetHost' is not set for machine '{machine.name}'") msg = f"'TargetHost' is not set for machine '{machine.name}'"
raise ClanError(msg)
# Copy targetHost to machine # Copy targetHost to machine
m.target_host_address = machine.deploy.targetHost m.target_host_address = machine.deploy.targetHost
group_machines.append(m) group_machines.append(m)
@@ -161,7 +161,8 @@ def deploy_machine(machines: MachineGroup) -> None:
def update(args: argparse.Namespace) -> None: def update(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
machines = [] machines = []
if len(args.machines) == 1 and args.target_host is not None: if len(args.machines) == 1 and args.target_host is not None:
machine = Machine( machine = Machine(

View File

@@ -145,7 +145,8 @@ class Programs:
def run_cmd(programs: list[str], cmd: list[str]) -> list[str]: def run_cmd(programs: list[str], cmd: list[str]) -> list[str]:
for program in programs: for program in programs:
if not Programs.is_allowed(program): if not Programs.is_allowed(program):
raise ValueError(f"Program not allowed: {program}") msg = f"Program not allowed: {program}"
raise ValueError(msg)
if os.environ.get("IN_NIX_SANDBOX"): if os.environ.get("IN_NIX_SANDBOX"):
return cmd return cmd
missing_packages = [ missing_packages = [

View File

@@ -58,7 +58,8 @@ class QgaSession:
self.sock.send(status_payload) self.sock.send(status_payload)
result = self.get_response() result = self.get_response()
if "error" in result and result["error"]["desc"].startswith("PID"): if "error" in result and result["error"]["desc"].startswith("PID"):
raise Exception("PID could not be found") msg = "PID could not be found"
raise Exception(msg)
if result["return"]["exited"]: if result["return"]["exited"]:
break break
sleep(0.1) sleep(0.1)
@@ -75,7 +76,6 @@ class QgaSession:
else base64.b64decode(result["return"]["err-data"]).decode("utf-8") else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
) )
if check and exitcode != 0: if check and exitcode != 0:
raise Exception( msg = f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}"
f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}" raise Exception(msg)
)
return exitcode, stdout, stderr return exitcode, stdout, stderr

View File

@@ -139,11 +139,14 @@ class QEMUMonitorProtocol:
try: try:
ret = self.__json_read(only_event=True) ret = self.__json_read(only_event=True)
except TimeoutError as e: except TimeoutError as e:
raise QMPTimeoutError("Timeout waiting for event") from e msg = "Timeout waiting for event"
raise QMPTimeoutError(msg) from e
except OSError as e: except OSError as e:
raise QMPConnectError("Error while reading from socket") from e msg = "Error while reading from socket"
raise QMPConnectError(msg) from e
if ret is None: if ret is None:
raise QMPConnectError("Error while reading from socket") msg = "Error while reading from socket"
raise QMPConnectError(msg)
self.__sock.settimeout(None) self.__sock.settimeout(None)
def __enter__(self) -> "QEMUMonitorProtocol": def __enter__(self) -> "QEMUMonitorProtocol":

View File

@@ -39,7 +39,8 @@ def remove_object(path: Path, name: str) -> list[Path]:
shutil.rmtree(path / name) shutil.rmtree(path / name)
paths_to_commit.append(path / name) paths_to_commit.append(path / name)
except FileNotFoundError as e: except FileNotFoundError as e:
raise ClanError(f"{name} not found in {path}") from e msg = f"{name} not found in {path}"
raise ClanError(msg) from e
if not os.listdir(path): if not os.listdir(path):
os.rmdir(path) os.rmdir(path)
return paths_to_commit return paths_to_commit

View File

@@ -103,7 +103,7 @@ def update_group_keys(flake_dir: Path, group: str) -> list[Path]:
if (secret / "groups" / group).is_symlink(): if (secret / "groups" / group).is_symlink():
updated_paths += update_keys( updated_paths += update_keys(
secret, secret,
list(sorted(secrets.collect_keys_for_path(secret))), sorted(secrets.collect_keys_for_path(secret)),
) )
return updated_paths return updated_paths
@@ -120,9 +120,8 @@ def add_member(
user_target = group_folder / name user_target = group_folder / name
if user_target.exists(): if user_target.exists():
if not user_target.is_symlink(): if not user_target.is_symlink():
raise ClanError( msg = f"Cannot add user {name}. {user_target} exists but is not a symlink"
f"Cannot add user {name}. {user_target} exists but is not a symlink" raise ClanError(msg)
)
os.remove(user_target) os.remove(user_target)
user_target.symlink_to(os.path.relpath(source, user_target.parent)) user_target.symlink_to(os.path.relpath(source, user_target.parent))
return update_group_keys(flake_dir, group_folder.parent.name) return update_group_keys(flake_dir, group_folder.parent.name)

View File

@@ -23,7 +23,8 @@ def import_sops(args: argparse.Namespace) -> None:
try: try:
file.read_text() file.read_text()
except OSError as e: except OSError as e:
raise ClanError(f"Could not read file {file}: {e}") from e msg = f"Could not read file {file}: {e}"
raise ClanError(msg) from e
if file_type == ".yaml": if file_type == ".yaml":
cmd = ["sops"] cmd = ["sops"]
if args.input_type: if args.input_type:

View File

@@ -23,13 +23,14 @@ def extract_public_key(filepath: Path) -> str:
# Extract and return the public key part after the prefix # Extract and return the public key part after the prefix
return line.strip().split(": ")[1] return line.strip().split(": ")[1]
except FileNotFoundError as e: except FileNotFoundError as e:
raise ClanError(f"The file at {filepath} was not found.") from e msg = f"The file at {filepath} was not found."
raise ClanError(msg) from e
except OSError as e: except OSError as e:
raise ClanError( msg = f"An error occurred while extracting the public key: {e}"
f"An error occurred while extracting the public key: {e}" raise ClanError(msg) from e
) from e
raise ClanError(f"Could not find the public key in the file at {filepath}.") msg = f"Could not find the public key in the file at {filepath}."
raise ClanError(msg)
def generate_key() -> str: def generate_key() -> str:

View File

@@ -96,7 +96,8 @@ def remove_secret(flake_dir: Path, machine: str, secret: str) -> None:
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
lst = list_sops_machines(args.flake.path) lst = list_sops_machines(args.flake.path)
if len(lst) > 0: if len(lst) > 0:
print("\n".join(lst)) print("\n".join(lst))
@@ -104,31 +105,36 @@ def list_command(args: argparse.Namespace) -> None:
def add_command(args: argparse.Namespace) -> None: def add_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_machine(args.flake.path, args.machine, args.key, args.force) add_machine(args.flake.path, args.machine, args.key, args.force)
def get_command(args: argparse.Namespace) -> None: def get_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
print(get_machine(args.flake.path, args.machine)) print(get_machine(args.flake.path, args.machine))
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
remove_machine(args.flake.path, args.machine) remove_machine(args.flake.path, args.machine)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_secret(args.flake.path, args.machine, args.secret) add_secret(args.flake.path, args.machine, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
remove_secret(args.flake.path, args.machine, args.secret) remove_secret(args.flake.path, args.machine, args.secret)

View File

@@ -42,7 +42,7 @@ def update_secrets(
changed_files.extend( changed_files.extend(
update_keys( update_keys(
secret_path, secret_path,
list(sorted(collect_keys_for_path(secret_path))), sorted(collect_keys_for_path(secret_path)),
) )
) )
return changed_files return changed_files
@@ -69,7 +69,7 @@ def collect_keys_for_type(folder: Path) -> set[str]:
def collect_keys_for_path(path: Path) -> set[str]: def collect_keys_for_path(path: Path) -> set[str]:
keys = set([]) keys = set()
keys.update(collect_keys_for_type(path / "machines")) keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users")) keys.update(collect_keys_for_type(path / "users"))
groups = path / "groups" groups = path / "groups"
@@ -99,7 +99,7 @@ def encrypt_secret(
if add_users is None: if add_users is None:
add_users = [] add_users = []
key = ensure_sops_key(flake_dir) key = ensure_sops_key(flake_dir)
recipient_keys = set([]) recipient_keys = set()
files_to_commit = [] files_to_commit = []
for user in add_users: for user in add_users:
@@ -146,7 +146,7 @@ def encrypt_secret(
) )
secret_path = secret_path / "secret" secret_path = secret_path / "secret"
encrypt_file(secret_path, value, list(sorted(recipient_keys)), meta) encrypt_file(secret_path, value, sorted(recipient_keys), meta)
files_to_commit.append(secret_path) files_to_commit.append(secret_path)
commit_files( commit_files(
files_to_commit, files_to_commit,
@@ -158,7 +158,8 @@ def encrypt_secret(
def remove_secret(flake_dir: Path, secret: str) -> None: def remove_secret(flake_dir: Path, secret: str) -> None:
path = sops_secrets_folder(flake_dir) / secret path = sops_secrets_folder(flake_dir) / secret
if not path.exists(): if not path.exists():
raise ClanError(f"Secret '{secret}' does not exist") msg = f"Secret '{secret}' does not exist"
raise ClanError(msg)
shutil.rmtree(path) shutil.rmtree(path)
commit_files( commit_files(
[path], [path],
@@ -215,9 +216,8 @@ def allow_member(
user_target = group_folder / name user_target = group_folder / name
if user_target.exists(): if user_target.exists():
if not user_target.is_symlink(): if not user_target.is_symlink():
raise ClanError( msg = f"Cannot add user '{name}' to {group_folder.parent.name} secret. {user_target} exists but is not a symlink"
f"Cannot add user '{name}' to {group_folder.parent.name} secret. {user_target} exists but is not a symlink" raise ClanError(msg)
)
os.remove(user_target) os.remove(user_target)
user_target.symlink_to(os.path.relpath(source, user_target.parent)) user_target.symlink_to(os.path.relpath(source, user_target.parent))
@@ -226,7 +226,7 @@ def allow_member(
changed.extend( changed.extend(
update_keys( update_keys(
group_folder.parent, group_folder.parent,
list(sorted(collect_keys_for_path(group_folder.parent))), sorted(collect_keys_for_path(group_folder.parent)),
) )
) )
return changed return changed
@@ -242,9 +242,8 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
keys = collect_keys_for_path(group_folder.parent) keys = collect_keys_for_path(group_folder.parent)
if len(keys) < 2: if len(keys) < 2:
raise ClanError( msg = f"Cannot remove {name} from {group_folder.parent.name}. No keys left. Use 'clan secrets remove {name}' to remove the secret."
f"Cannot remove {name} from {group_folder.parent.name}. No keys left. Use 'clan secrets remove {name}' to remove the secret." raise ClanError(msg)
)
os.remove(target) os.remove(target)
if len(os.listdir(group_folder)) == 0: if len(os.listdir(group_folder)) == 0:
@@ -254,7 +253,7 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
os.rmdir(group_folder.parent) os.rmdir(group_folder.parent)
return update_keys( return update_keys(
target.parent.parent, list(sorted(collect_keys_for_path(group_folder.parent))) target.parent.parent, sorted(collect_keys_for_path(group_folder.parent))
) )
@@ -295,7 +294,8 @@ def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
ensure_sops_key(flake_dir) ensure_sops_key(flake_dir)
path = secret_path / "secret" path = secret_path / "secret"
if not path.exists(): if not path.exists():
raise ClanError(f"Secret '{secret_path!s}' does not exist") msg = f"Secret '{secret_path!s}' does not exist"
raise ClanError(msg)
return decrypt_file(path) return decrypt_file(path)
@@ -332,9 +332,11 @@ def rename_command(args: argparse.Namespace) -> None:
old_path = sops_secrets_folder(flake_dir) / args.secret old_path = sops_secrets_folder(flake_dir) / args.secret
new_path = sops_secrets_folder(flake_dir) / args.new_name new_path = sops_secrets_folder(flake_dir) / args.new_name
if not old_path.exists(): if not old_path.exists():
raise ClanError(f"Secret '{args.secret}' does not exist") msg = f"Secret '{args.secret}' does not exist"
raise ClanError(msg)
if new_path.exists(): if new_path.exists():
raise ClanError(f"Secret '{args.new_name}' already exists") msg = f"Secret '{args.new_name}' already exists"
raise ClanError(msg)
os.rename(old_path, new_path) os.rename(old_path, new_path)
commit_files( commit_files(
[old_path, new_path], [old_path, new_path],

View File

@@ -30,9 +30,8 @@ def get_public_key(privkey: str) -> str:
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True
) )
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise ClanError( msg = "Failed to get public key for age private key. Is the key malformed?"
"Failed to get public key for age private key. Is the key malformed?" raise ClanError(msg) from e
) from e
return res.stdout.strip() return res.stdout.strip()
@@ -49,15 +48,18 @@ def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
if not line.startswith("#"): if not line.startswith("#"):
private_key = line private_key = line
if not pubkey: if not pubkey:
raise ClanError("Could not find public key in age-keygen output") msg = "Could not find public key in age-keygen output"
raise ClanError(msg)
if not private_key: if not private_key:
raise ClanError("Could not find private key in age-keygen output") msg = "Could not find private key in age-keygen output"
raise ClanError(msg)
if out_file: if out_file:
out_file.parent.mkdir(parents=True, exist_ok=True) out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_text(res) out_file.write_text(res)
return private_key, pubkey return private_key, pubkey
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise ClanError("Failed to generate private sops key") from e msg = "Failed to generate private sops key"
raise ClanError(msg) from e
def get_user_name(flake_dir: Path, user: str) -> str: def get_user_name(flake_dir: Path, user: str) -> str:
@@ -86,9 +88,8 @@ def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
key.username = user.name key.username = user.name
return key return key
raise ClanError( msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" raise ClanError(msg)
)
def default_sops_key_path() -> Path: def default_sops_key_path() -> Path:
@@ -107,16 +108,15 @@ def ensure_sops_key(flake_dir: Path) -> SopsKey:
if path.exists(): if path.exists():
return ensure_user_or_machine(flake_dir, get_public_key(path.read_text())) return ensure_user_or_machine(flake_dir, get_public_key(path.read_text()))
else: else:
raise ClanError( msg = "No sops key found. Please generate one with 'clan secrets key generate'."
"No sops key found. Please generate one with 'clan secrets key generate'." raise ClanError(msg)
)
@contextmanager @contextmanager
def sops_manifest(keys: list[str]) -> Iterator[Path]: def sops_manifest(keys: list[str]) -> Iterator[Path]:
with NamedTemporaryFile(delete=False, mode="w") as manifest: with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump( json.dump(
dict(creation_rules=[dict(key_groups=[dict(age=keys)])]), manifest, indent=2 {"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2
) )
manifest.flush() manifest.flush()
yield Path(manifest.name) yield Path(manifest.name)
@@ -164,9 +164,10 @@ def encrypt_file(
p = subprocess.run(cmd, check=False) p = subprocess.run(cmd, check=False)
# returns 200 if the file is changed # returns 200 if the file is changed
if p.returncode != 0 and p.returncode != 200: if p.returncode != 0 and p.returncode != 200:
raise ClanError( msg = (
f"Failed to encrypt {secret_path}: sops exited with {p.returncode}" f"Failed to encrypt {secret_path}: sops exited with {p.returncode}"
) )
raise ClanError(msg)
return return
# hopefully /tmp is written to an in-memory file to avoid leaking secrets # hopefully /tmp is written to an in-memory file to avoid leaking secrets
@@ -182,7 +183,8 @@ def encrypt_file(
with open(f.name, "w") as fd: with open(f.name, "w") as fd:
shutil.copyfileobj(content, fd) shutil.copyfileobj(content, fd)
else: else:
raise ClanError(f"Invalid content type: {type(content)}") msg = f"Invalid content type: {type(content)}"
raise ClanError(msg)
# we pass an empty manifest to pick up existing configuration of the user # we pass an empty manifest to pick up existing configuration of the user
args = ["sops", "--config", str(manifest)] args = ["sops", "--config", str(manifest)]
args.extend(["-i", "--encrypt", str(f.name)]) args.extend(["-i", "--encrypt", str(f.name)])
@@ -228,9 +230,8 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None:
flags |= os.O_EXCL flags |= os.O_EXCL
fd = os.open(path / "key.json", flags) fd = os.open(path / "key.json", flags)
except FileExistsError as e: except FileExistsError as e:
raise ClanError( msg = f"{path.name} already exists in {path}. Use --force to overwrite."
f"{path.name} already exists in {path}. Use --force to overwrite." raise ClanError(msg) from e
) from e
with os.fdopen(fd, "w") as f: with os.fdopen(fd, "w") as f:
json.dump({"publickey": publickey, "type": "age"}, f, indent=2) json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
@@ -240,12 +241,13 @@ def read_key(path: Path) -> str:
try: try:
key = json.load(f) key = json.load(f)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ClanError(f"Failed to decode {path.name}: {e}") from e msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
if key["type"] != "age": if key["type"] != "age":
raise ClanError( msg = f"{path.name} is not an age key but {key['type']}. This is not supported"
f"{path.name} is not an age key but {key['type']}. This is not supported" raise ClanError(msg)
)
publickey = key.get("publickey") publickey = key.get("publickey")
if not publickey: if not publickey:
raise ClanError(f"{path.name} does not contain a public key") msg = f"{path.name} does not contain a public key"
raise ClanError(msg)
return publickey return publickey

View File

@@ -14,9 +14,8 @@ VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
def secret_name_type(arg_value: str) -> str: def secret_name_type(arg_value: str) -> str:
if not VALID_SECRET_NAME.match(arg_value): if not VALID_SECRET_NAME.match(arg_value):
raise argparse.ArgumentTypeError( msg = "Invalid character in secret name. Allowed characters are a-z, A-Z, 0-9, ., -, and _"
"Invalid character in secret name. Allowed characters are a-z, A-Z, 0-9, ., -, and _" raise argparse.ArgumentTypeError(msg)
)
return arg_value return arg_value
@@ -28,22 +27,19 @@ def public_or_private_age_key_type(arg_value: str) -> str:
if arg_value.startswith("AGE-SECRET-KEY-"): if arg_value.startswith("AGE-SECRET-KEY-"):
return get_public_key(arg_value) return get_public_key(arg_value)
if not arg_value.startswith("age1"): if not arg_value.startswith("age1"):
raise ClanError( msg = f"Please provide an age key starting with age1, got: '{arg_value}'"
f"Please provide an age key starting with age1, got: '{arg_value}'" raise ClanError(msg)
)
return arg_value return arg_value
def group_or_user_name_type(what: str) -> Callable[[str], str]: def group_or_user_name_type(what: str) -> Callable[[str], str]:
def name_type(arg_value: str) -> str: def name_type(arg_value: str) -> str:
if len(arg_value) > 32: if len(arg_value) > 32:
raise argparse.ArgumentTypeError( msg = f"{what.capitalize()} name must be less than 32 characters long"
f"{what.capitalize()} name must be less than 32 characters long" raise argparse.ArgumentTypeError(msg)
)
if not VALID_USER_NAME.match(arg_value): if not VALID_USER_NAME.match(arg_value):
raise argparse.ArgumentTypeError( msg = f"Invalid character in {what} name. Allowed characters are a-z, 0-9, -, and _. Must start with a letter or _"
f"Invalid character in {what} name. Allowed characters are a-z, 0-9, -, and _. Must start with a letter or _" raise argparse.ArgumentTypeError(msg)
)
return arg_value return arg_value
return name_type return name_type

View File

@@ -84,7 +84,8 @@ def remove_secret(flake_dir: Path, user: str, secret: str) -> None:
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
lst = list_users(args.flake.path) lst = list_users(args.flake.path)
if len(lst) > 0: if len(lst) > 0:
print("\n".join(lst)) print("\n".join(lst))
@@ -92,31 +93,36 @@ def list_command(args: argparse.Namespace) -> None:
def add_command(args: argparse.Namespace) -> None: def add_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_user(args.flake.path, args.user, args.key, args.force) add_user(args.flake.path, args.user, args.key, args.force)
def get_command(args: argparse.Namespace) -> None: def get_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
print(get_user(args.flake.path, args.user)) print(get_user(args.flake.path, args.user))
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
remove_user(args.flake.path, args.user) remove_user(args.flake.path, args.user)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_secret(args.flake.path, args.user, args.secret) add_secret(args.flake.path, args.user, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
raise ClanError("Could not find clan flake toplevel directory") msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
remove_secret(args.flake.path, args.user, args.secret) remove_secret(args.flake.path, args.user, args.secret)

View File

@@ -222,12 +222,12 @@ class Host:
for line in lines: for line in lines:
if not is_err: if not is_err:
cmdlog.info( cmdlog.info(
line, extra=dict(command_prefix=self.command_prefix) line, extra={"command_prefix": self.command_prefix}
) )
pass pass
else: else:
cmdlog.error( cmdlog.error(
line, extra=dict(command_prefix=self.command_prefix) line, extra={"command_prefix": self.command_prefix}
) )
print_buf = "" print_buf = ""
last_output = time.time() last_output = time.time()
@@ -248,7 +248,7 @@ class Host:
elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed)) elapsed_msg = time.strftime("%H:%M:%S", time.gmtime(elapsed))
cmdlog.warn( cmdlog.warn(
f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)", f"still waiting for '{displayed_cmd}' to finish... ({elapsed_msg} elapsed)",
extra=dict(command_prefix=self.command_prefix), extra={"command_prefix": self.command_prefix},
) )
def handle_fd(fd: IO[Any] | None, readlist: list[IO[Any]]) -> str: def handle_fd(fd: IO[Any] | None, readlist: list[IO[Any]]) -> str:
@@ -295,7 +295,8 @@ class Host:
elif stdout == subprocess.PIPE: elif stdout == subprocess.PIPE:
stdout_read, stdout_write = stack.enter_context(_pipe()) stdout_read, stdout_write = stack.enter_context(_pipe())
else: else:
raise ClanError(f"unsupported value for stdout parameter: {stdout}") msg = f"unsupported value for stdout parameter: {stdout}"
raise ClanError(msg)
if stderr is None: if stderr is None:
stderr_read = None stderr_read = None
@@ -303,7 +304,8 @@ class Host:
elif stderr == subprocess.PIPE: elif stderr == subprocess.PIPE:
stderr_read, stderr_write = stack.enter_context(_pipe()) stderr_read, stderr_write = stack.enter_context(_pipe())
else: else:
raise ClanError(f"unsupported value for stderr parameter: {stderr}") msg = f"unsupported value for stderr parameter: {stderr}"
raise ClanError(msg)
env = os.environ.copy() env = os.environ.copy()
env.update(extra_env) env.update(extra_env)
@@ -350,12 +352,13 @@ class Host:
else: else:
cmdlog.warning( cmdlog.warning(
f"[Command failed: {ret}] {displayed_cmd}", f"[Command failed: {ret}] {displayed_cmd}",
extra=dict(command_prefix=self.command_prefix), extra={"command_prefix": self.command_prefix},
) )
return subprocess.CompletedProcess( return subprocess.CompletedProcess(
cmd, ret, stdout=stdout_data, stderr=stderr_data cmd, ret, stdout=stdout_data, stderr=stderr_data
) )
raise RuntimeError("unreachable") msg = "unreachable"
raise RuntimeError(msg)
def run_local( def run_local(
self, self,
@@ -386,9 +389,7 @@ class Host:
cmd = [cmd] cmd = [cmd]
shell = True shell = True
displayed_cmd = " ".join(cmd) displayed_cmd = " ".join(cmd)
cmdlog.info( cmdlog.info(f"$ {displayed_cmd}", extra={"command_prefix": self.command_prefix})
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
)
return self._run( return self._run(
cmd, cmd,
displayed_cmd, displayed_cmd,
@@ -446,9 +447,7 @@ class Host:
displayed_cmd += " ".join(cmd) displayed_cmd += " ".join(cmd)
else: else:
displayed_cmd += cmd displayed_cmd += cmd
cmdlog.info( cmdlog.info(f"$ {displayed_cmd}", extra={"command_prefix": self.command_prefix})
f"$ {displayed_cmd}", extra=dict(command_prefix=self.command_prefix)
)
bash_cmd = export_cmd bash_cmd = export_cmd
bash_args = [] bash_args = []
@@ -624,13 +623,12 @@ class HostGroup:
if e: if e:
cmdlog.error( cmdlog.error(
f"failed with: {e}", f"failed with: {e}",
extra=dict(command_prefix=result.host.command_prefix), extra={"command_prefix": result.host.command_prefix},
) )
errors += 1 errors += 1
if errors > 0: if errors > 0:
raise ClanError( msg = f"{errors} hosts failed with an error. Check the logs above"
f"{errors} hosts failed with an error. Check the logs above" raise ClanError(msg)
)
def _run( def _run(
self, self,
@@ -653,19 +651,19 @@ class HostGroup:
fn = self._run_local if local else self._run_remote fn = self._run_local if local else self._run_remote
thread = Thread( thread = Thread(
target=fn, target=fn,
kwargs=dict( kwargs={
results=results, "results": results,
cmd=cmd, "cmd": cmd,
host=host, "host": host,
stdout=stdout, "stdout": stdout,
stderr=stderr, "stderr": stderr,
extra_env=extra_env, "extra_env": extra_env,
cwd=cwd, "cwd": cwd,
check=check, "check": check,
timeout=timeout, "timeout": timeout,
verbose_ssh=verbose_ssh, "verbose_ssh": verbose_ssh,
tty=tty, "tty": tty,
), },
) )
thread.start() thread.start()
threads.append(thread) threads.append(thread)
@@ -806,7 +804,8 @@ def parse_deployment_address(
options[k] = v options[k] = v
result = urllib.parse.urlsplit("//" + hostname) result = urllib.parse.urlsplit("//" + hostname)
if not result.hostname: if not result.hostname:
raise Exception(f"Invalid hostname: {hostname}") msg = f"Invalid hostname: {hostname}"
raise Exception(msg)
hostname = result.hostname hostname = result.hostname
port = result.port port = result.port
meta = meta.copy() meta = meta.copy()

View File

@@ -34,8 +34,9 @@ def list_state_folders(machine: str, service: None | str = None) -> None:
proc = run_no_stdout(cmd) proc = run_no_stdout(cmd)
res = proc.stdout.strip() res = proc.stdout.strip()
except ClanCmdError as e: except ClanCmdError as e:
msg = "Clan might not have meta attributes"
raise ClanError( raise ClanError(
"Clan might not have meta attributes", msg,
location=f"show_clan {uri}", location=f"show_clan {uri}",
description="Evaluation failed on clanInternals.meta attribute", description="Evaluation failed on clanInternals.meta attribute",
) from e ) from e
@@ -45,8 +46,9 @@ def list_state_folders(machine: str, service: None | str = None) -> None:
if state_info := state.get(service): if state_info := state.get(service):
state = {service: state_info} state = {service: state_info}
else: else:
msg = f"Service {service} isn't configured for this machine."
raise ClanError( raise ClanError(
f"Service {service} isn't configured for this machine.", msg,
location=f"clan state list {machine} --service {service}", location=f"clan state list {machine} --service {service}",
description=f"The service: {service} needs to be configured for the machine.", description=f"The service: {service} needs to be configured for the machine.",
) )

View File

@@ -195,7 +195,8 @@ def prompt_func(description: str, input_type: str) -> str:
elif input_type == "hidden": elif input_type == "hidden":
result = getpass(f"Enter the value for {description} (hidden): ") result = getpass(f"Enter the value for {description} (hidden): ")
else: else:
raise ClanError(f"Unknown input type: {input_type} for prompt {description}") msg = f"Unknown input type: {input_type} for prompt {description}"
raise ClanError(msg)
log.info("Input received. Processing...") log.info("Input received. Processing...")
return result return result
@@ -226,9 +227,8 @@ def _generate_vars_for_machine(
if generator_name and generator_name not in machine.vars_generators: if generator_name and generator_name not in machine.vars_generators:
generators = list(machine.vars_generators.keys()) generators = list(machine.vars_generators.keys())
raise ClanError( msg = f"Could not find generator with name: {generator_name}. The following generators are available: {generators}"
f"Could not find generator with name: {generator_name}. The following generators are available: {generators}" raise ClanError(msg)
)
graph = { graph = {
gen_name: set(generator["dependencies"]) gen_name: set(generator["dependencies"])
@@ -243,9 +243,8 @@ def _generate_vars_for_machine(
for gen_name, dependencies in graph.items(): for gen_name, dependencies in graph.items():
for dep in dependencies: for dep in dependencies:
if dep not in graph: if dep not in graph:
raise ClanError( msg = f"Generator {gen_name} has a dependency on {dep}, which does not exist"
f"Generator {gen_name} has a dependency on {dep}, which does not exist" raise ClanError(msg)
)
# process generators in topological order # process generators in topological order
sorter = TopologicalSorter(graph) sorter = TopologicalSorter(graph)
@@ -280,9 +279,8 @@ def generate_vars(
log.error(f"Failed to generate facts for {machine.name}: {exc}") log.error(f"Failed to generate facts for {machine.name}: {exc}")
errors += [exc] errors += [exc]
if len(errors) > 0: if len(errors) > 0:
raise ClanError( msg = f"Failed to generate facts for {len(errors)} hosts. Check the logs above"
f"Failed to generate facts for {len(errors)} hosts. Check the logs above" raise ClanError(msg) from errors[0]
) from errors[0]
if not was_regenerated: if not was_regenerated:
print("All secrets and facts are already up to date") print("All secrets and facts are already up to date")

View File

@@ -31,9 +31,8 @@ class FactStore(FactStoreBase):
fact_path.write_bytes(value) fact_path.write_bytes(value)
return fact_path return fact_path
else: else:
raise ClanError( msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
f"in_flake fact storage is only supported for local flakes: {self.machine.flake}" raise ClanError(msg)
)
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool: def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
return self._var_path(generator_name, name, shared).exists() return self._var_path(generator_name, name, shared).exists()

View File

@@ -34,4 +34,5 @@ class FactStore(FactStoreBase):
fact_path = self.dir / service / name fact_path = self.dir / service / name
if fact_path.exists(): if fact_path.exists():
return fact_path.read_bytes() return fact_path.read_bytes()
raise ClanError(f"Fact {name} for service {service} not found") msg = f"Fact {name} for service {service} not found"
raise ClanError(msg)

View File

@@ -67,9 +67,9 @@ class SecretStore(SecretStoreBase):
value, value,
add_machines=[self.machine.name], add_machines=[self.machine.name],
add_groups=groups, add_groups=groups,
meta=dict( meta={
deploy=deployed, "deploy": deployed,
), },
) )
return path return path

View File

@@ -169,7 +169,8 @@ class QMPWrapper:
def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]: def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
rpath = self._qmp_socket.resolve() rpath = self._qmp_socket.resolve()
if not rpath.exists(): if not rpath.exists():
raise ClanError(f"qmp socket {rpath} does not exist. Is the VM running?") msg = f"qmp socket {rpath} does not exist. Is the VM running?"
raise ClanError(msg)
qmp = QEMUMonitorProtocol(str(rpath)) qmp = QEMUMonitorProtocol(str(rpath))
qmp.connect() qmp.connect()
try: try:

View File

@@ -60,7 +60,8 @@ def build_vm(
vm_data["secrets_dir"] = str(secrets_dir) vm_data["secrets_dir"] = str(secrets_dir)
return vm_data return vm_data
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ClanError(f"Failed to parse vm config: {e}") from e msg = f"Failed to parse vm config: {e}"
raise ClanError(msg) from e
def get_secrets( def get_secrets(
@@ -208,7 +209,7 @@ def run_command(
vm: VmConfig = inspect_vm(machine=machine_obj) vm: VmConfig = inspect_vm(machine=machine_obj)
portmap = [(h, g) for h, g in (p.split(":") for p in args.publish)] portmap = [p.split(":") for p in args.publish]
run_vm(vm, nix_options=args.option, portmap=portmap) run_vm(vm, nix_options=args.option, portmap=portmap)

View File

@@ -151,15 +151,6 @@ python3.pkgs.buildPythonApplication {
${pythonWithTestDeps}/bin/python -m pytest -m "not impure and with_core" ./tests ${pythonWithTestDeps}/bin/python -m pytest -m "not impure and with_core" ./tests
touch $out touch $out
''; '';
check-for-breakpoints = runCommand "breakpoints" { } ''
if grep --include \*.py -Rq "breakpoint()" ${source}; then
echo "breakpoint() found in ${source}:"
grep --include \*.py -Rn "breakpoint()" ${source}
exit 1
fi
touch $out
'';
}; };
passthru.nixpkgs = nixpkgs'; passthru.nixpkgs = nixpkgs';

View File

@@ -64,19 +64,3 @@ ignore_missing_imports = true
module = "setuptools.*" module = "setuptools.*"
ignore_missing_imports = true ignore_missing_imports = true
[tool.ruff]
target-version = "py311"
line-length = 88
lint.select = [
"A",
"ANN",
"B",
"E",
"F",
"I",
"N",
"RUF",
"TID",
"U",
]
lint.ignore = ["E501", "E402", "E731", "ANN101", "ANN401", "A003"]

View File

@@ -2,7 +2,6 @@ import subprocess
from pathlib import Path from pathlib import Path
import pytest import pytest
from clan_cli.custom_logger import setup_logging from clan_cli.custom_logger import setup_logging
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell

View File

@@ -9,9 +9,8 @@ from pathlib import Path
from typing import NamedTuple from typing import NamedTuple
import pytest import pytest
from root import CLAN_CORE
from clan_cli.dirs import nixpkgs_source from clan_cli.dirs import nixpkgs_source
from root import CLAN_CORE
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -226,9 +225,8 @@ def test_flake(
) )
if git_proc.returncode != 0: if git_proc.returncode != 0:
log.error(git_proc.stderr.decode()) log.error(git_proc.stderr.decode())
raise Exception( msg = "git diff on ./sops is not empty. This should not happen as all changes should be committed"
"git diff on ./sops is not empty. This should not happen as all changes should be committed" raise Exception(msg)
)
@pytest.fixture @pytest.fixture
@@ -236,9 +234,8 @@ def test_flake_with_core(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( msg = "clan-core flake not found. This test requires the clan-core flake to be present"
"clan-core flake not found. This test requires the clan-core flake to be present" raise Exception(msg)
)
yield from create_flake( yield from create_flake(
temporary_home, temporary_home,
"test_flake_with_core", "test_flake_with_core",
@@ -252,14 +249,14 @@ def test_local_democlan(
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
democlan = os.getenv(key="DEMOCLAN_ROOT") democlan = os.getenv(key="DEMOCLAN_ROOT")
if democlan is None: if democlan is None:
raise Exception( msg = (
"DEMOCLAN_ROOT not set. This test requires the democlan flake to be present" "DEMOCLAN_ROOT not set. This test requires the democlan flake to be present"
) )
raise Exception(msg)
democlan_p = Path(democlan).resolve() democlan_p = Path(democlan).resolve()
if not democlan_p.is_dir(): if not democlan_p.is_dir():
raise Exception( msg = f"DEMOCLAN_ROOT ({democlan_p}) is not a directory. This test requires the democlan directory to be present"
f"DEMOCLAN_ROOT ({democlan_p}) is not a directory. This test requires the democlan directory to be present" raise Exception(msg)
)
yield FlakeForTest(democlan_p) yield FlakeForTest(democlan_p)
@@ -269,9 +266,8 @@ def test_flake_with_core_and_pass(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( msg = "clan-core flake not found. This test requires the clan-core flake to be present"
"clan-core flake not found. This test requires the clan-core flake to be present" raise Exception(msg)
)
yield from create_flake( yield from create_flake(
temporary_home, temporary_home,
"test_flake_with_core_and_pass", "test_flake_with_core_and_pass",
@@ -284,9 +280,8 @@ def test_flake_minimal(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( msg = "clan-core flake not found. This test requires the clan-core flake to be present"
"clan-core flake not found. This test requires the clan-core flake to be present" raise Exception(msg)
)
yield from create_flake( yield from create_flake(
temporary_home, temporary_home,
CLAN_CORE / "templates" / "minimal", CLAN_CORE / "templates" / "minimal",

View File

@@ -11,7 +11,8 @@ def is_valid_age_key(secret_key: str) -> bool:
if result.returncode == 0: if result.returncode == 0:
return True return True
else: else:
raise ValueError(f"Invalid age key: {secret_key}") msg = f"Invalid age key: {secret_key}"
raise ValueError(msg)
def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool: def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool:
@@ -26,9 +27,9 @@ def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool:
if result.returncode == 0: if result.returncode == 0:
if result.stdout != ssh_pub: if result.stdout != ssh_pub:
raise ValueError( msg = f"Expected '{ssh_pub}' got '{result.stdout}' for ssh key: {secret_key}"
f"Expected '{ssh_pub}' got '{result.stdout}' for ssh key: {secret_key}" raise ValueError(msg)
)
return True return True
else: else:
raise ValueError(f"Invalid ssh key: {secret_key}") msg = f"Invalid ssh key: {secret_key}"
raise ValueError(msg)

View File

@@ -49,9 +49,8 @@ def wait_vm_up(machine_name: str, flake_url: str | None = None) -> None:
timeout: float = 600 timeout: float = 600
while True: while True:
if timeout <= 0: if timeout <= 0:
raise TimeoutError( msg = f"qmp socket {socket_file} not found. Is the VM running?"
f"qmp socket {socket_file} not found. Is the VM running?" raise TimeoutError(msg)
)
if socket_file.exists(): if socket_file.exists():
break break
sleep(0.1) sleep(0.1)
@@ -66,9 +65,8 @@ def wait_vm_down(machine_name: str, flake_url: str | None = None) -> None:
timeout: float = 300 timeout: float = 300
while socket_file.exists(): while socket_file.exists():
if timeout <= 0: if timeout <= 0:
raise TimeoutError( msg = f"qmp socket {socket_file} still exists. Is the VM down?"
f"qmp socket {socket_file} still exists. Is the VM down?" raise TimeoutError(msg)
)
sleep(0.1) sleep(0.1)
timeout -= 0.1 timeout -= 0.1

View File

@@ -2,9 +2,8 @@ import os
import pwd import pwd
import pytest import pytest
from sshd import Sshd
from clan_cli.ssh import Host, HostGroup, HostKeyCheck from clan_cli.ssh import Host, HostGroup, HostKeyCheck
from sshd import Sshd
@pytest.fixture @pytest.fixture

View File

@@ -42,7 +42,7 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
host_key = test_root / "data" / "ssh_host_ed25519_key" host_key = test_root / "data" / "ssh_host_ed25519_key"
host_key.chmod(0o600) host_key.chmod(0o600)
template = (test_root / "data" / "sshd_config").read_text() template = (test_root / "data" / "sshd_config").read_text()
content = string.Template(template).substitute(dict(host_key=host_key)) content = string.Template(template).substitute({"host_key": host_key})
config = tmpdir / "sshd_config" config = tmpdir / "sshd_config"
config.write_text(content) config.write_text(content)
login_shell = tmpdir / "shell" login_shell = tmpdir / "shell"
@@ -100,10 +100,10 @@ def sshd(
sshd = shutil.which("sshd") sshd = shutil.which("sshd")
assert sshd is not None, "no sshd binary found" assert sshd is not None, "no sshd binary found"
env = {} env = {}
env = dict( env = {
LD_PRELOAD=str(sshd_config.preload_lib), "LD_PRELOAD": str(sshd_config.preload_lib),
LOGIN_SHELL=str(sshd_config.login_shell), "LOGIN_SHELL": str(sshd_config.login_shell),
) }
proc = command.run( proc = command.run(
[sshd, "-f", str(sshd_config.path), "-D", "-p", str(port)], extra_env=env [sshd, "-f", str(sshd_config.path), "-D", "-p", str(port)], extra_env=env
) )
@@ -133,5 +133,6 @@ def sshd(
else: else:
rc = proc.poll() rc = proc.poll()
if rc is not None: if rc is not None:
raise Exception(f"sshd processes was terminated with {rc}") msg = f"sshd processes was terminated with {rc}"
raise Exception(msg)
time.sleep(0.1) time.sleep(0.1)

View File

@@ -82,14 +82,17 @@ def load_dataclass_from_file(
sys.path.insert(0, root_dir) sys.path.insert(0, root_dir)
spec = importlib.util.spec_from_file_location(module_name, file_path) spec = importlib.util.spec_from_file_location(module_name, file_path)
if not spec: if not spec:
raise ClanError(f"Could not load spec from file: {file_path}") msg = f"Could not load spec from file: {file_path}"
raise ClanError(msg)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
if not module: if not module:
raise ClanError(f"Could not create module: {file_path}") msg = f"Could not create module: {file_path}"
raise ClanError(msg)
if not spec.loader: if not spec.loader:
raise ClanError(f"Could not load loader from spec: {spec}") msg = f"Could not load loader from spec: {spec}"
raise ClanError(msg)
spec.loader.exec_module(module) spec.loader.exec_module(module)
@@ -100,7 +103,8 @@ def load_dataclass_from_file(
if dataclass_type and is_dataclass(dataclass_type): if dataclass_type and is_dataclass(dataclass_type):
return dataclass_type return dataclass_type
raise ClanError(f"Could not load dataclass {class_name} from file: {file_path}") msg = f"Could not load dataclass {class_name} from file: {file_path}"
raise ClanError(msg)
def test_all_dataclasses() -> None: def test_all_dataclasses() -> None:
@@ -132,8 +136,7 @@ def test_all_dataclasses() -> None:
type_to_dict(dclass) type_to_dict(dclass)
except JSchemaTypeError as e: except JSchemaTypeError as e:
print(f"Error loading dataclass {dataclass} from {file}: {e}") print(f"Error loading dataclass {dataclass} from {file}: {e}")
raise ClanError( msg = f"""
f"""
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
Error converting dataclass 'class {dataclass}()' from {file} Error converting dataclass 'class {dataclass}()' from {file}
@@ -144,6 +147,8 @@ Help:
- Converting public fields to PRIVATE by prefixing them with underscore ('_') - Converting public fields to PRIVATE by prefixing them with underscore ('_')
- Ensure all private fields are initialized the API wont provide initial values for them. - Ensure all private fields are initialized the API wont provide initial values for them.
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
""", """
raise ClanError(
msg,
location=__file__, location=__file__,
) from e ) from e

View File

@@ -1,7 +1,6 @@
from pathlib import Path from pathlib import Path
import pytest import pytest
from clan_cli import config from clan_cli import config
from clan_cli.config import parsing from clan_cli.config import parsing
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -10,21 +9,21 @@ example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
def test_walk_jsonschema_all_types() -> None: def test_walk_jsonschema_all_types() -> None:
schema = dict( schema = {
type="object", "type": "object",
properties=dict( "properties": {
array=dict( "array": {
type="array", "type": "array",
items=dict( "items": {
type="string", "type": "string",
), },
), },
boolean=dict(type="boolean"), "boolean": {"type": "boolean"},
integer=dict(type="integer"), "integer": {"type": "integer"},
number=dict(type="number"), "number": {"type": "number"},
string=dict(type="string"), "string": {"type": "string"},
), },
) }
expected = { expected = {
"array": list[str], "array": list[str],
"boolean": bool, "boolean": bool,
@@ -36,19 +35,19 @@ def test_walk_jsonschema_all_types() -> None:
def test_walk_jsonschema_nested() -> None: def test_walk_jsonschema_nested() -> None:
schema = dict( schema = {
type="object", "type": "object",
properties=dict( "properties": {
name=dict( "name": {
type="object", "type": "object",
properties=dict( "properties": {
first=dict(type="string"), "first": {"type": "string"},
last=dict(type="string"), "last": {"type": "string"},
), },
), },
age=dict(type="integer"), "age": {"type": "integer"},
), },
) }
expected = { expected = {
"age": int, "age": int,
"name.first": str, "name.first": str,
@@ -59,16 +58,16 @@ def test_walk_jsonschema_nested() -> None:
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties") # test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
def test_walk_jsonschema_dynamic_attrs() -> None: def test_walk_jsonschema_dynamic_attrs() -> None:
schema = dict( schema = {
type="object", "type": "object",
properties=dict( "properties": {
age=dict(type="integer"), "age": {"type": "integer"},
users=dict( "users": {
type="object", "type": "object",
additionalProperties=dict(type="string"), "additionalProperties": {"type": "string"},
), },
), },
) }
expected = { expected = {
"age": int, "age": int,
"users.<name>": str, # <name> is a placeholder for any string "users.<name>": str, # <name> is a placeholder for any string
@@ -77,41 +76,41 @@ def test_walk_jsonschema_dynamic_attrs() -> None:
def test_type_from_schema_path_simple() -> None: def test_type_from_schema_path_simple() -> None:
schema = dict( schema = {
type="boolean", "type": "boolean",
) }
assert parsing.type_from_schema_path(schema, []) is bool assert parsing.type_from_schema_path(schema, []) is bool
def test_type_from_schema_path_nested() -> None: def test_type_from_schema_path_nested() -> None:
schema = dict( schema = {
type="object", "type": "object",
properties=dict( "properties": {
name=dict( "name": {
type="object", "type": "object",
properties=dict( "properties": {
first=dict(type="string"), "first": {"type": "string"},
last=dict(type="string"), "last": {"type": "string"},
), },
), },
age=dict(type="integer"), "age": {"type": "integer"},
), },
) }
assert parsing.type_from_schema_path(schema, ["age"]) is int assert parsing.type_from_schema_path(schema, ["age"]) is int
assert parsing.type_from_schema_path(schema, ["name", "first"]) is str assert parsing.type_from_schema_path(schema, ["name", "first"]) is str
def test_type_from_schema_path_dynamic_attrs() -> None: def test_type_from_schema_path_dynamic_attrs() -> None:
schema = dict( schema = {
type="object", "type": "object",
properties=dict( "properties": {
age=dict(type="integer"), "age": {"type": "integer"},
users=dict( "users": {
type="object", "type": "object",
additionalProperties=dict(type="string"), "additionalProperties": {"type": "string"},
), },
), },
) }
assert parsing.type_from_schema_path(schema, ["age"]) is int assert parsing.type_from_schema_path(schema, ["age"]) is int
assert parsing.type_from_schema_path(schema, ["users", "foo"]) is str assert parsing.type_from_schema_path(schema, ["users", "foo"]) is str

View File

@@ -3,7 +3,6 @@ import tempfile
from pathlib import Path from pathlib import Path
import pytest import pytest
from clan_cli import git from clan_cli import git
from clan_cli.errors import ClanError from clan_cli.errors import ClanError

View File

@@ -2,13 +2,12 @@ import json
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from clan_cli.dirs import user_history_file
from clan_cli.history.add import HistoryEntry
from fixtures_flakes import FlakeForTest from fixtures_flakes import FlakeForTest
from helpers import cli from helpers import cli
from stdout import CaptureOutput from stdout import CaptureOutput
from clan_cli.dirs import user_history_file
from clan_cli.history.add import HistoryEntry
if TYPE_CHECKING: if TYPE_CHECKING:
pass pass

View File

@@ -2,8 +2,6 @@ import json
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from fixtures_flakes import FlakeForTest
from clan_cli.api.modules import list_modules from clan_cli.api.modules import list_modules
from clan_cli.clan_uri import FlakeId from clan_cli.clan_uri import FlakeId
from clan_cli.inventory import ( from clan_cli.inventory import (
@@ -19,13 +17,13 @@ from clan_cli.inventory import (
) )
from clan_cli.machines.create import create_machine from clan_cli.machines.create import create_machine
from clan_cli.nix import nix_eval, run_no_stdout from clan_cli.nix import nix_eval, run_no_stdout
from fixtures_flakes import FlakeForTest
if TYPE_CHECKING: if TYPE_CHECKING:
from age_keys import KeyPair from age_keys import KeyPair
from helpers import cli
from clan_cli.machines.facts import machine_get_fact from clan_cli.machines.facts import machine_get_fact
from helpers import cli
@pytest.mark.with_core @pytest.mark.with_core

View File

@@ -5,12 +5,11 @@ from contextlib import contextmanager
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from clan_cli.errors import ClanError
from fixtures_flakes import FlakeForTest from fixtures_flakes import FlakeForTest
from helpers import cli from helpers import cli
from stdout import CaptureOutput from stdout import CaptureOutput
from clan_cli.errors import ClanError
if TYPE_CHECKING: if TYPE_CHECKING:
from age_keys import KeyPair from age_keys import KeyPair

View File

@@ -2,15 +2,14 @@ import ipaddress
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from helpers.validator import is_valid_age_key, is_valid_ssh_key
from clan_cli.clan_uri import FlakeId from clan_cli.clan_uri import FlakeId
from clan_cli.facts.secret_modules.sops import SecretStore from clan_cli.facts.secret_modules.sops import SecretStore
from clan_cli.machines.facts import machine_get_fact from clan_cli.machines.facts import machine_get_fact
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder from clan_cli.secrets.folders import sops_secrets_folder
from fixtures_flakes import FlakeForTest
from helpers import cli
from helpers.validator import is_valid_age_key, is_valid_ssh_key
if TYPE_CHECKING: if TYPE_CHECKING:
from age_keys import KeyPair from age_keys import KeyPair

View File

@@ -2,16 +2,15 @@ import subprocess
from pathlib import Path from pathlib import Path
import pytest import pytest
from fixtures_flakes import FlakeForTest
from helpers import cli
from helpers.validator import is_valid_ssh_key
from clan_cli.clan_uri import FlakeId from clan_cli.clan_uri import FlakeId
from clan_cli.facts.secret_modules.password_store import SecretStore from clan_cli.facts.secret_modules.password_store import SecretStore
from clan_cli.machines.facts import machine_get_fact from clan_cli.machines.facts import machine_get_fact
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from clan_cli.ssh import HostGroup from clan_cli.ssh import HostGroup
from fixtures_flakes import FlakeForTest
from helpers import cli
from helpers.validator import is_valid_ssh_key
@pytest.mark.impure @pytest.mark.impure

View File

@@ -1,11 +1,10 @@
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from clan_cli.ssh import HostGroup
from fixtures_flakes import FlakeForTest from fixtures_flakes import FlakeForTest
from helpers import cli from helpers import cli
from clan_cli.ssh import HostGroup
if TYPE_CHECKING: if TYPE_CHECKING:
from age_keys import KeyPair from age_keys import KeyPair

View File

@@ -1,14 +1,13 @@
import os import os
import sys import sys
import clan_cli
import pytest import pytest
import pytest_subprocess.fake_process import pytest_subprocess.fake_process
from clan_cli.ssh import cli
from pytest_subprocess import utils from pytest_subprocess import utils
from stdout import CaptureOutput from stdout import CaptureOutput
import clan_cli
from clan_cli.ssh import cli
def test_no_args( def test_no_args(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,

View File

@@ -7,13 +7,11 @@ hosts = HostGroup([Host("some_host")])
def test_run_environment() -> None: def test_run_environment() -> None:
p2 = hosts.run_local( p2 = hosts.run_local(
"echo $env_var", extra_env=dict(env_var="true"), stdout=subprocess.PIPE "echo $env_var", extra_env={"env_var": "true"}, stdout=subprocess.PIPE
) )
assert p2[0].result.stdout == "true\n" assert p2[0].result.stdout == "true\n"
p3 = hosts.run_local( p3 = hosts.run_local(["env"], extra_env={"env_var": "true"}, stdout=subprocess.PIPE)
["env"], extra_env=dict(env_var="true"), stdout=subprocess.PIPE
)
assert "env_var=true" in p3[0].result.stdout assert "env_var=true" in p3[0].result.stdout
@@ -27,7 +25,8 @@ def test_timeout() -> None:
except Exception: except Exception:
pass pass
else: else:
raise AssertionError("should have raised TimeoutExpired") msg = "should have raised TimeoutExpired"
raise AssertionError(msg)
def test_run_function() -> None: def test_run_function() -> None:
@@ -45,7 +44,8 @@ def test_run_exception() -> None:
except Exception: except Exception:
pass pass
else: else:
raise AssertionError("should have raised Exception") msg = "should have raised Exception"
raise AssertionError(msg)
def test_run_function_exception() -> None: def test_run_function_exception() -> None:
@@ -57,7 +57,8 @@ def test_run_function_exception() -> None:
except Exception: except Exception:
pass pass
else: else:
raise AssertionError("should have raised Exception") msg = "should have raised Exception"
raise AssertionError(msg)
def test_run_local_non_shell() -> None: def test_run_local_non_shell() -> None:

View File

@@ -19,10 +19,10 @@ def test_run(host_group: HostGroup) -> None:
def test_run_environment(host_group: HostGroup) -> None: def test_run_environment(host_group: HostGroup) -> None:
p1 = host_group.run( p1 = host_group.run(
"echo $env_var", stdout=subprocess.PIPE, extra_env=dict(env_var="true") "echo $env_var", stdout=subprocess.PIPE, extra_env={"env_var": "true"}
) )
assert p1[0].result.stdout == "true\n" assert p1[0].result.stdout == "true\n"
p2 = host_group.run(["env"], stdout=subprocess.PIPE, extra_env=dict(env_var="true")) p2 = host_group.run(["env"], stdout=subprocess.PIPE, extra_env={"env_var": "true"})
assert "env_var=true" in p2[0].result.stdout assert "env_var=true" in p2[0].result.stdout
@@ -46,7 +46,8 @@ def test_timeout(host_group: HostGroup) -> None:
except Exception: except Exception:
pass pass
else: else:
raise AssertionError("should have raised TimeoutExpired") msg = "should have raised TimeoutExpired"
raise AssertionError(msg)
def test_run_exception(host_group: HostGroup) -> None: def test_run_exception(host_group: HostGroup) -> None:
@@ -58,7 +59,8 @@ def test_run_exception(host_group: HostGroup) -> None:
except Exception: except Exception:
pass pass
else: else:
raise AssertionError("should have raised Exception") msg = "should have raised Exception"
raise AssertionError(msg)
def test_run_function_exception(host_group: HostGroup) -> None: def test_run_function_exception(host_group: HostGroup) -> None:
@@ -70,4 +72,5 @@ def test_run_function_exception(host_group: HostGroup) -> None:
except Exception: except Exception:
pass pass
else: else:
raise AssertionError("should have raised Exception") msg = "should have raised Exception"
raise AssertionError(msg)

View File

@@ -5,11 +5,6 @@ from tempfile import TemporaryDirectory
import pytest import pytest
from age_keys import SopsSetup from age_keys import SopsSetup
from fixtures_flakes import generate_flake
from helpers import cli
from helpers.nixos_config import nested_dict
from root import CLAN_CORE
from clan_cli.clan_uri import FlakeId from clan_cli.clan_uri import FlakeId
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
@@ -17,17 +12,21 @@ from clan_cli.vars.check import check_vars
from clan_cli.vars.list import stringify_all_vars from clan_cli.vars.list import stringify_all_vars
from clan_cli.vars.public_modules import in_repo from clan_cli.vars.public_modules import in_repo
from clan_cli.vars.secret_modules import password_store, sops from clan_cli.vars.secret_modules import password_store, sops
from fixtures_flakes import generate_flake
from helpers import cli
from helpers.nixos_config import nested_dict
from root import CLAN_CORE
def test_get_subgraph() -> None: def test_get_subgraph() -> None:
from clan_cli.vars.generate import _get_subgraph from clan_cli.vars.generate import _get_subgraph
graph = dict( graph = {
a={"b", "c"}, "a": {"b", "c"},
b={"c"}, "b": {"c"},
c=set(), "c": set(),
d=set(), "d": set(),
) }
assert _get_subgraph(graph, "a") == { assert _get_subgraph(graph, "a") == {
"a": {"b", "c"}, "a": {"b", "c"},
"b": {"c"}, "b": {"c"},
@@ -39,16 +38,16 @@ def test_get_subgraph() -> None:
def test_dependencies_as_files() -> None: def test_dependencies_as_files() -> None:
from clan_cli.vars.generate import dependencies_as_dir from clan_cli.vars.generate import dependencies_as_dir
decrypted_dependencies = dict( decrypted_dependencies = {
gen_1=dict( "gen_1": {
var_1a=b"var_1a", "var_1a": b"var_1a",
var_1b=b"var_1b", "var_1b": b"var_1b",
), },
gen_2=dict( "gen_2": {
var_2a=b"var_2a", "var_2a": b"var_2a",
var_2b=b"var_2b", "var_2b": b"var_2b",
), },
) }
with TemporaryDirectory() as tmpdir: with TemporaryDirectory() as tmpdir:
dep_tmpdir = Path(tmpdir) dep_tmpdir = Path(tmpdir)
dependencies_as_dir(decrypted_dependencies, dep_tmpdir) dependencies_as_dir(decrypted_dependencies, dep_tmpdir)
@@ -76,7 +75,7 @@ def test_generate_public_var(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
machine = Machine(name="my_machine", flake=FlakeId(str(flake.path))) machine = Machine(name="my_machine", flake=FlakeId(str(flake.path)))
@@ -105,7 +104,7 @@ def test_generate_secret_var_sops(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init() sops_setup.init()
@@ -140,7 +139,7 @@ def test_generate_secret_var_sops_with_default_group(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init() sops_setup.init()
@@ -170,7 +169,7 @@ def test_generate_secret_var_password_store(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
gnupghome = temporary_home / "gpg" gnupghome = temporary_home / "gpg"
@@ -237,7 +236,7 @@ def test_generate_secret_for_multiple_machines(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(machine1=machine1_config, machine2=machine2_config), machine_configs={"machine1": machine1_config, "machine2": machine2_config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init() sops_setup.init()
@@ -282,7 +281,7 @@ def test_dependant_generators(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
@@ -320,7 +319,7 @@ def test_prompt(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
monkeypatch.setattr("sys.stdin", StringIO(input_value)) monkeypatch.setattr("sys.stdin", StringIO(input_value))
@@ -358,7 +357,7 @@ def test_share_flag(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init() sops_setup.init()

View File

@@ -3,14 +3,13 @@ from pathlib import Path
import pytest import pytest
from age_keys import SopsSetup from age_keys import SopsSetup
from clan_cli.nix import nix_eval, run
from fixtures_flakes import generate_flake from fixtures_flakes import generate_flake
from helpers import cli from helpers import cli
from helpers.nixos_config import nested_dict from helpers.nixos_config import nested_dict
from helpers.vms import qga_connect, run_vm_in_thread, wait_vm_down from helpers.vms import qga_connect, run_vm_in_thread, wait_vm_down
from root import CLAN_CORE from root import CLAN_CORE
from clan_cli.nix import nix_eval, run
@pytest.mark.impure @pytest.mark.impure
def test_vm_deployment( def test_vm_deployment(
@@ -42,7 +41,7 @@ def test_vm_deployment(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config), machine_configs={"my_machine": config},
) )
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init() sops_setup.init()
@@ -57,7 +56,7 @@ def test_vm_deployment(
) )
).stdout.strip() ).stdout.strip()
) )
assert sops_secrets != dict() assert sops_secrets != {}
my_secret_path = run( my_secret_path = run(
nix_eval( nix_eval(
[ [

View File

@@ -65,15 +65,15 @@ def test_vm_qmp(
flake = generate_flake( flake = generate_flake(
temporary_home, temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal", flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict( machine_configs={
my_machine=dict( "my_machine": {
clan=dict( "clan": {
virtualisation=dict(graphics=False), "virtualisation": {"graphics": False},
networking=dict(targetHost="client"), "networking": {"targetHost": "client"},
), },
services=dict(getty=dict(autologinUser="root")), "services": {"getty": {"autologinUser": "root"}},
) }
), },
) )
# 'clan vms run' must be executed from within the flake # 'clan vms run' must be executed from within the flake

View File

@@ -29,7 +29,7 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
self.gtype = gtype self.gtype = gtype
self.key_gen = key_gen self.key_gen = key_gen
# From Python 3.7 onwards dictionaries are ordered by default # From Python 3.7 onwards dictionaries are ordered by default
self._items: dict[K, V] = dict() self._items: dict[K, V] = {}
################################## ##################################
# # # #
@@ -76,9 +76,11 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
) )
key = self.key_gen(item) key = self.key_gen(item)
if key in self._items: if key in self._items:
raise ValueError("Key already exists in the dictionary") msg = "Key already exists in the dictionary"
raise ValueError(msg)
if position < 0 or position > len(self._items): if position < 0 or position > len(self._items):
raise IndexError("Index out of range") msg = "Index out of range"
raise IndexError(msg)
# Temporary storage for items to be reinserted # Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]] temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
@@ -91,7 +93,7 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
self._items[key] = item self._items[key] = item
# Reinsert the items # Reinsert the items
for i, (k, v) in enumerate(temp_list): for _i, (k, v) in enumerate(temp_list):
self._items[k] = v self._items[k] = v
# Notify the model of the changes # Notify the model of the changes
@@ -100,7 +102,8 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
def insert_sorted( def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None: ) -> None:
raise NotImplementedError("insert_sorted is not implemented in GKVStore") msg = "insert_sorted is not implemented in GKVStore"
raise NotImplementedError(msg)
def remove(self, position: int) -> None: def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items(): if position < 0 or position >= self.get_n_items():
@@ -114,10 +117,12 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
self.items_changed(0, len(self._items), 0) self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None: def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
raise NotImplementedError("sort is not implemented in GKVStore") msg = "sort is not implemented in GKVStore"
raise NotImplementedError(msg)
def splice(self, position: int, n_removals: int, additions: list[V]) -> None: def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
raise NotImplementedError("splice is not implemented in GKVStore") msg = "splice is not implemented in GKVStore"
raise NotImplementedError(msg)
################################## ##################################
# # # #

View File

@@ -350,9 +350,8 @@ class StatusNotifierImplementation(BaseImplementation):
) )
if not registration_id: if not registration_id:
raise GLib.Error( msg = f"Failed to register object with path {self._object_path}"
f"Failed to register object with path {self._object_path}" raise GLib.Error(msg)
)
self._registration_id = registration_id self._registration_id = registration_id
@@ -582,9 +581,8 @@ class StatusNotifierImplementation(BaseImplementation):
except GLib.Error as error: except GLib.Error as error:
self.unload() self.unload()
raise ImplUnavailableError( msg = f"StatusNotifier implementation not available: {error}"
f"StatusNotifier implementation not available: {error}" raise ImplUnavailableError(msg) from error
) from error
self.update_menu() self.update_menu()

View File

@@ -1,3 +1,4 @@
import datetime
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import os import os
@@ -7,7 +8,6 @@ import time
import weakref import weakref
from collections.abc import Callable, Generator from collections.abc import Callable, Generator
from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import IO, ClassVar from typing import IO, ClassVar
@@ -181,7 +181,7 @@ class VMObject(GObject.Object):
def __start(self) -> None: def __start(self) -> None:
with self._create_machine() as machine: with self._create_machine() as machine:
# Start building VM # Start building VM
tstart = datetime.now() tstart = datetime.datetime.now(tz=datetime.UTC)
log.info(f"Building VM {self.get_id()}") log.info(f"Building VM {self.get_id()}")
log_dir = Path(str(self.log_dir.name)) log_dir = Path(str(self.log_dir.name))
@@ -219,7 +219,7 @@ class VMObject(GObject.Object):
# Wait for the build to finish then hide the progress bar # Wait for the build to finish then hide the progress bar
self.build_process.proc.join() self.build_process.proc.join()
tend = datetime.now() tend = datetime.datetime.now(tz=datetime.UTC)
log.info(f"VM {self.get_id()} build took {tend - tstart}s") log.info(f"VM {self.get_id()} build took {tend - tstart}s")
self.progress_bar.hide() self.progress_bar.hide()
@@ -312,9 +312,9 @@ class VMObject(GObject.Object):
def __stop(self) -> None: def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}") log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.now() start_time = datetime.datetime.now(tz=datetime.UTC)
while self.is_running(): while self.is_running():
diff = datetime.now() - start_time diff = datetime.datetime.now(tz=datetime.UTC) - start_time
if diff.seconds > self.KILL_TIMEOUT: if diff.seconds > self.KILL_TIMEOUT:
log.error( log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it" f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"

View File

@@ -30,7 +30,8 @@ class ToastOverlay:
_instance: "None | ToastOverlay" = None _instance: "None | ToastOverlay" = None
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod @classmethod
def use(cls: Any) -> "ToastOverlay": def use(cls: Any) -> "ToastOverlay":

View File

@@ -55,7 +55,8 @@ class JoinList:
# Make sure the VMS class is used as a singleton # Make sure the VMS class is used as a singleton
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod @classmethod
def use(cls: Any) -> "JoinList": def use(cls: Any) -> "JoinList":

View File

@@ -25,7 +25,8 @@ class ViewStack:
# Make sure the VMS class is used as a singleton # Make sure the VMS class is used as a singleton
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod @classmethod
def use(cls: Any) -> "ViewStack": def use(cls: Any) -> "ViewStack":

View File

@@ -44,7 +44,8 @@ class ClanStore:
# Make sure the VMS class is used as a singleton # Make sure the VMS class is used as a singleton
def __init__(self) -> None: def __init__(self) -> None:
raise RuntimeError("Call use() instead") msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod @classmethod
def use(cls: Any) -> "ClanStore": def use(cls: Any) -> "ClanStore":

View File

@@ -258,14 +258,16 @@ class ClanList(Gtk.Box):
def show_vm_build_logs(self, target: str) -> None: def show_vm_build_logs(self, target: str) -> None:
vm = ClanStore.use().set_logging_vm(target) vm = ClanStore.use().set_logging_vm(target)
if vm is None: if vm is None:
raise ValueError(f"VM {target} not found") msg = f"VM {target} not found"
raise ValueError(msg)
views = ViewStack.use().view views = ViewStack.use().view
# Reset the logs view # Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore logs: Logs = views.get_child_by_name("logs") # type: ignore
if logs is None: if logs is None:
raise ValueError("Logs view not found") msg = "Logs view not found"
raise ValueError(msg)
name = vm.machine.name if vm.machine else "Unknown" name = vm.machine.name if vm.machine else "Unknown"

View File

@@ -121,15 +121,6 @@ python3.pkgs.buildPythonApplication rec {
${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests ${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests
touch $out touch $out
''; '';
clan-vm-manager-no-breakpoints = runCommand "clan-vm-manager-no-breakpoints" { } ''
if grep --include \*.py -Rq "breakpoint()" ${source}; then
echo "breakpoint() found in ${source}:"
grep --include \*.py -Rn "breakpoint()" ${source}
exit 1
fi
touch $out
'';
}; };
}; };

View File

@@ -39,10 +39,3 @@ no_implicit_optional = true
[[tool.mypy.overrides]] [[tool.mypy.overrides]]
module = "argcomplete.*" module = "argcomplete.*"
ignore_missing_imports = true ignore_missing_imports = true
[tool.ruff]
target-version = "py311"
line-length = 88
lint.select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
lint.ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]

View File

@@ -17,12 +17,14 @@ class Command:
def run( def run(
self, self,
command: list[str], command: list[str],
extra_env: dict[str, str] = {}, extra_env: dict[str, str] | None = None,
stdin: _FILE = None, stdin: _FILE = None,
stdout: _FILE = None, stdout: _FILE = None,
stderr: _FILE = None, stderr: _FILE = None,
workdir: Path | None = None, workdir: Path | None = None,
) -> subprocess.Popen[str]: ) -> subprocess.Popen[str]:
if extra_env is None:
extra_env = {}
env = os.environ.copy() env = os.environ.copy()
env.update(extra_env) env.update(extra_env)
# We start a new session here so that we can than more reliably kill all childs as well # We start a new session here so that we can than more reliably kill all childs as well

View File

@@ -2,7 +2,6 @@ import logging
import shlex import shlex
from clan_cli.custom_logger import get_caller from clan_cli.custom_logger import get_caller
from clan_vm_manager import main from clan_vm_manager import main
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

Some files were not shown because too many files have changed in this diff Show More