diff --git a/lib/test/container-test-driver/test_driver/__init__.py b/lib/test/container-test-driver/test_driver/__init__.py index 67a336a4a..a2e4c5e4f 100644 --- a/lib/test/container-test-driver/test_driver/__init__.py +++ b/lib/test/container-test-driver/test_driver/__init__.py @@ -218,8 +218,12 @@ class Machine: self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, env=env) def get_systemd_process(self) -> int: - assert self.process is not None, "Machine not started" - assert self.process.stdout is not None, "Machine has no stdout" + if self.process is None: + msg = "Machine not started" + raise RuntimeError(msg) + if self.process.stdout is None: + msg = "Machine has no stdout" + raise RuntimeError(msg) for line in self.process.stdout: print(line, end="") @@ -236,9 +240,9 @@ class Machine: .read_text() .split() ) - assert len(childs) == 1, ( - f"Expected exactly one child process for systemd-nspawn, got {childs}" - ) + if len(childs) != 1: + msg = f"Expected exactly one child process for systemd-nspawn, got {childs}" + raise RuntimeError(msg) try: return int(childs[0]) except ValueError as e: @@ -258,7 +262,9 @@ class Machine: def tuple_from_line(line: str) -> tuple[str, str]: match = line_pattern.match(line) - assert match is not None + if match is None: + msg = f"Failed to parse line: {line}" + raise RuntimeError(msg) return match[1], match[2] return dict( @@ -575,7 +581,9 @@ class Driver: # We lauch a sleep here, so we can pgrep the process cmdline for # the uuid sleep = shutil.which("sleep") - assert sleep is not None, "sleep command not found" + if sleep is None: + msg = "sleep command not found" + raise RuntimeError(msg) machine.execute( f"systemd-run /bin/sh -c '{sleep} 999999999 && echo {nspawn_uuid}'", ) diff --git a/nixosModules/clanCore/zerotier/generate.py b/nixosModules/clanCore/zerotier/generate.py index 710b70670..e4386b46d 100644 --- a/nixosModules/clanCore/zerotier/generate.py +++ b/nixosModules/clanCore/zerotier/generate.py @@ -55,9 +55,9 @@ class Identity: def node_id(self) -> str: nid = self.public.split(":")[0] - assert len(nid) == 10, ( - f"node_id must be 10 characters long, got {len(nid)}: {nid}" - ) + if len(nid) != 10: + msg = f"node_id must be 10 characters long, got {len(nid)}: {nid}" + raise ClanError(msg) return nid @@ -172,9 +172,9 @@ def create_identity() -> Identity: def compute_zerotier_ip(network_id: str, identity: Identity) -> ipaddress.IPv6Address: - assert len(network_id) == 16, ( - f"network_id must be 16 characters long, got '{network_id}'" - ) + if len(network_id) != 16: + msg = f"network_id must be 16 characters long, got '{network_id}'" + raise ClanError(msg) nwid = int(network_id, 16) node_id = int(identity.node_id(), 16) addr_parts = bytearray( diff --git a/pkgs/clan-app/clan_app/deps/webview/webview.py b/pkgs/clan-app/clan_app/deps/webview/webview.py index 824313509..a9c28ff0c 100644 --- a/pkgs/clan-app/clan_app/deps/webview/webview.py +++ b/pkgs/clan-app/clan_app/deps/webview/webview.py @@ -99,7 +99,9 @@ class Webview: """Get the bridge, creating it if necessary.""" if self._bridge is None: self.create_bridge() - assert self._bridge is not None, "Bridge should be created" + if self._bridge is None: + msg = "Bridge should be created" + raise RuntimeError(msg) return self._bridge def api_wrapper( diff --git a/pkgs/clan-cli/clan_cli/arg_actions.py b/pkgs/clan-cli/clan_cli/arg_actions.py index ade2ebe66..3869342ed 100644 --- a/pkgs/clan-cli/clan_cli/arg_actions.py +++ b/pkgs/clan-cli/clan_cli/arg_actions.py @@ -3,6 +3,8 @@ import logging from collections.abc import Sequence from typing import Any +from clan_lib.errors import ClanError + log = logging.getLogger(__name__) @@ -19,6 +21,8 @@ class AppendOptionAction(argparse.Action): ) -> None: lst = getattr(namespace, self.dest) lst.append("--option") - assert isinstance(values, list), "values must be a list" + if not values or not hasattr(values, "__getitem__"): + msg = "values must be indexable" + raise ClanError(msg) lst.append(values[0]) lst.append(values[1]) diff --git a/pkgs/clan-cli/clan_cli/templates/apply_disk.py b/pkgs/clan-cli/clan_cli/templates/apply_disk.py index e57707d1f..4cbc53b6d 100644 --- a/pkgs/clan-cli/clan_cli/templates/apply_disk.py +++ b/pkgs/clan-cli/clan_cli/templates/apply_disk.py @@ -3,6 +3,7 @@ import logging from collections.abc import Sequence from typing import Any +from clan_lib.errors import ClanError from clan_lib.machines.machines import Machine from clan_lib.templates.disk import set_machine_disk_schema @@ -27,7 +28,9 @@ class AppendSetAction(argparse.Action): option_string: str | None = None, ) -> None: lst = getattr(namespace, self.dest) - assert isinstance(values, list), "values must be a list" + if not values or not hasattr(values, "__getitem__"): + msg = "values must be indexable" + raise ClanError(msg) lst.append((values[0], values[1])) diff --git a/pkgs/clan-cli/clan_cli/vars/generator.py b/pkgs/clan-cli/clan_cli/vars/generator.py index e0298caa3..30eeb7176 100644 --- a/pkgs/clan-cli/clan_cli/vars/generator.py +++ b/pkgs/clan-cli/clan_cli/vars/generator.py @@ -4,6 +4,7 @@ from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING +from clan_lib.errors import ClanError from clan_lib.nix import nix_test_store from .check import check_vars @@ -64,8 +65,12 @@ class Generator: @cached_property def exists(self) -> bool: - assert self.machine is not None - assert self._flake is not None + if self.machine is None: + msg = "Machine cannot be None" + raise ClanError(msg) + if self._flake is None: + msg = "Flake cannot be None" + raise ClanError(msg) return check_vars(self.machine, self._flake, generator_name=self.name) @classmethod @@ -174,8 +179,12 @@ class Generator: return None def final_script(self) -> Path: - assert self.machine is not None - assert self._flake is not None + if self.machine is None: + msg = "Machine cannot be None" + raise ClanError(msg) + if self._flake is None: + msg = "Flake cannot be None" + raise ClanError(msg) from clan_lib.machines.machines import Machine machine = Machine(name=self.machine, flake=self._flake) @@ -189,8 +198,12 @@ class Generator: return output def validation(self) -> str | None: - assert self.machine is not None - assert self._flake is not None + if self.machine is None: + msg = "Machine cannot be None" + raise ClanError(msg) + if self._flake is None: + msg = "Flake cannot be None" + raise ClanError(msg) from clan_lib.machines.machines import Machine machine = Machine(name=self.machine, flake=self._flake) diff --git a/pkgs/clan-cli/clan_cli/vars/var.py b/pkgs/clan-cli/clan_cli/vars/var.py index 6651920aa..aa88dbb9e 100644 --- a/pkgs/clan-cli/clan_cli/vars/var.py +++ b/pkgs/clan-cli/clan_cli/vars/var.py @@ -2,6 +2,8 @@ from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING +from clan_lib.errors import ClanError + if TYPE_CHECKING: from clan_cli.vars.generator import Generator @@ -31,8 +33,12 @@ class Var: @property def value(self) -> bytes: - assert self._store is not None - assert self._generator is not None + if self._store is None: + msg = "Store cannot be None" + raise ClanError(msg) + if self._generator is None: + msg = "Generator cannot be None" + raise ClanError(msg) if not self._store.exists(self._generator, self.name): msg = f"Var {self.id} has not been generated yet" raise ValueError(msg) @@ -47,14 +53,22 @@ class Var: return "" def set(self, value: bytes) -> list[Path]: - assert self._store is not None - assert self._generator is not None + if self._store is None: + msg = "Store cannot be None" + raise ClanError(msg) + if self._generator is None: + msg = "Generator cannot be None" + raise ClanError(msg) return self._store.set(self._generator, self, value) @property def exists(self) -> bool: - assert self._store is not None - assert self._generator is not None + if self._store is None: + msg = "Store cannot be None" + raise ClanError(msg) + if self._generator is None: + msg = "Generator cannot be None" + raise ClanError(msg) return self._store.exists(self._generator, self.name) def __str__(self) -> str: diff --git a/pkgs/clan-cli/clan_lib/api/tasks.py b/pkgs/clan-cli/clan_lib/api/tasks.py index 33c7ce144..0a2bd2e3d 100644 --- a/pkgs/clan-cli/clan_lib/api/tasks.py +++ b/pkgs/clan-cli/clan_lib/api/tasks.py @@ -5,6 +5,7 @@ from dataclasses import dataclass from clan_lib.api import API from clan_lib.async_run import get_async_ctx, is_async_cancelled +from clan_lib.errors import ClanError log = logging.getLogger(__name__) @@ -21,7 +22,9 @@ BAKEND_THREADS: dict[str, WebThread] | None = None @API.register def delete_task(task_id: str) -> None: """Cancel a task by its op_key.""" - assert BAKEND_THREADS is not None, "Backend threads not initialized" + if BAKEND_THREADS is None: + msg = "Backend threads not initialized" + raise ClanError(msg) future = BAKEND_THREADS.get(task_id) log.debug(f"Thread ID: {threading.get_ident()}") @@ -53,5 +56,7 @@ def run_task_blocking(somearg: str) -> str: @API.register def list_tasks() -> list[str]: """List all tasks.""" - assert BAKEND_THREADS is not None, "Backend threads not initialized" + if BAKEND_THREADS is None: + msg = "Backend threads not initialized" + raise ClanError(msg) return list(BAKEND_THREADS.keys()) diff --git a/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py b/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py index ba3840b1f..0ea6b2d47 100644 --- a/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py +++ b/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py @@ -21,6 +21,7 @@ from typing import ( ) from clan_lib.api.serde import dataclass_to_dict +from clan_lib.errors import ClanError class JSchemaTypeError(Exception): @@ -123,9 +124,9 @@ def type_to_dict( for f in fields: if f.name.startswith("_"): continue - assert not isinstance(f.type, str), ( - f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" - ) + if isinstance(f.type, str): + msg = f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" + raise ClanError(msg) properties[f.metadata.get("alias", f.name)] = type_to_dict( f.type, f"{scope} {t.__name__}.{f.name}", # type: ignore diff --git a/pkgs/clan-cli/clan_lib/cmd/__init__.py b/pkgs/clan-cli/clan_lib/cmd/__init__.py index b7de80cb2..d768d2a11 100644 --- a/pkgs/clan-cli/clan_lib/cmd/__init__.py +++ b/pkgs/clan-cli/clan_lib/cmd/__init__.py @@ -159,7 +159,9 @@ def handle_io( if process.stdin in writelist: if input_bytes: try: - assert process.stdin is not None + if process.stdin is None: + msg = "Process stdin is unexpectedly None" + raise ClanError(msg) written = os.write(process.stdin.fileno(), input_bytes) except BrokenPipeError: wlist.remove(process.stdin) diff --git a/pkgs/clan-cli/clan_lib/dirs/__init__.py b/pkgs/clan-cli/clan_lib/dirs/__init__.py index 814237dd1..e64ecf53e 100644 --- a/pkgs/clan-cli/clan_lib/dirs/__init__.py +++ b/pkgs/clan-cli/clan_lib/dirs/__init__.py @@ -157,7 +157,9 @@ def machines_dir(flake: "Flake") -> Path: return flake.path / "machines" store_path = flake.store_path - assert store_path is not None, "Invalid flake object. Doesn't have a store path" + if store_path is None: + msg = "Invalid flake object. Doesn't have a store path" + raise ClanError(msg) return Path(store_path) / "machines" diff --git a/pkgs/clan-cli/clan_lib/flake/flake.py b/pkgs/clan-cli/clan_lib/flake/flake.py index a4c3b3a6c..f3b58ad92 100644 --- a/pkgs/clan-cli/clan_lib/flake/flake.py +++ b/pkgs/clan-cli/clan_lib/flake/flake.py @@ -138,7 +138,9 @@ class Selector: def as_dict(self) -> dict[str, Any]: if self.type == SelectorType.SET: - assert isinstance(self.value, list) + if not isinstance(self.value, list): + msg = f"Expected list for SET selector, got {type(self.value)}" + raise ClanError(msg) return { "type": self.type.value, "value": [asdict(selector) for selector in self.value], @@ -146,10 +148,14 @@ class Selector: if self.type == SelectorType.ALL: return {"type": self.type.value} if self.type == SelectorType.STR: - assert isinstance(self.value, str) + if not isinstance(self.value, str): + msg = f"Expected str for STR selector, got {type(self.value)}" + raise ClanError(msg) return {"type": self.type.value, "value": self.value} if self.type == SelectorType.MAYBE: - assert isinstance(self.value, str) + if not isinstance(self.value, str): + msg = f"Expected str for MAYBE selector, got {type(self.value)}" + raise ClanError(msg) return {"type": self.type.value, "value": self.value} msg = f"Invalid selector type: {self.type}" raise ValueError(msg) @@ -385,8 +391,12 @@ class FlakeCacheEntry: # if we have a string selector, that means we are usually on a dict or a list, since we cannot walk down scalar values # so we passthrough the value to the next level if selector.type == SelectorType.STR: - assert isinstance(selector.value, str) - assert isinstance(self.value, dict) + if not isinstance(selector.value, str): + msg = f"Expected str for STR selector value, got {type(selector.value)}" + raise ClanError(msg) + if not isinstance(self.value, dict): + msg = f"Expected dict for cache value, got {type(self.value)}" + raise ClanError(msg) if selector.value not in self.value: self.value[selector.value] = FlakeCacheEntry() self.value[selector.value].insert(value, selectors[1:]) @@ -395,9 +405,17 @@ class FlakeCacheEntry: # otherwise we just insert the value into the current dict # we can skip creating the non existing entry if we already fetched all keys elif selector.type == SelectorType.MAYBE: - assert isinstance(self.value, dict) - assert isinstance(value, dict) - assert isinstance(selector.value, str) + if not isinstance(self.value, dict): + msg = f"Expected dict for cache value in MAYBE, got {type(self.value)}" + raise ClanError(msg) + if not isinstance(value, dict): + msg = f"Expected dict for value in MAYBE, got {type(value)}" + raise ClanError(msg) + if not isinstance(selector.value, str): + msg = ( + f"Expected str for MAYBE selector value, got {type(selector.value)}" + ) + raise ClanError(msg) if selector.value in value: if selector.value not in self.value: self.value[selector.value] = FlakeCacheEntry() @@ -409,7 +427,9 @@ class FlakeCacheEntry: # insert a dict is pretty straight forward elif isinstance(value, dict): - assert isinstance(self.value, dict) + if not isinstance(self.value, dict): + msg = f"Expected dict for cache value in dict insert, got {type(self.value)}" + raise ClanError(msg) for key, value_ in value.items(): if key not in self.value: self.value[key] = FlakeCacheEntry() @@ -421,21 +441,31 @@ class FlakeCacheEntry: fetched_indices: list[str] = [] # if we are in a set, we take all the selectors if selector.type == SelectorType.SET: - assert isinstance(selector.value, list) + if not isinstance(selector.value, list): + msg = f"Expected list for SET selector value, got {type(selector.value)}" + raise ClanError(msg) for subselector in selector.value: fetched_indices.append(subselector.value) # if it's just a str, that is the index elif selector.type == SelectorType.STR: - assert isinstance(selector.value, str) + if not isinstance(selector.value, str): + msg = f"Expected str for STR selector value, got {type(selector.value)}" + raise ClanError(msg) fetched_indices = [selector.value] # otherwise we just take all the indices, which is the length of the list elif selector.type == SelectorType.ALL: fetched_indices = list(map(str, range(len(value)))) # insert is the same is insert a dict - assert isinstance(self.value, dict) + if not isinstance(self.value, dict): + msg = f"Expected dict for cache value in list insert, got {type(self.value)}" + raise ClanError(msg) for i, requested_index in enumerate(fetched_indices): - assert isinstance(requested_index, str) + if not isinstance(requested_index, str): + msg = ( + f"Expected str for requested index, got {type(requested_index)}" + ) + raise ClanError(msg) if requested_index not in self.value: self.value[requested_index] = FlakeCacheEntry() self.value[requested_index].insert(value[i], selectors[1:]) @@ -444,13 +474,17 @@ class FlakeCacheEntry: # if they are, we store them as a dict with the outPath key # this is to mirror nix behavior, where the outPath of an attrset is used if no further key is specified elif isinstance(value, str) and is_pure_store_path(value): - assert selectors == [] + if selectors != []: + msg = "Expected empty selectors for pure store path" + raise ClanError(msg) self.value = {"outPath": FlakeCacheEntry(value)} # if we have a normal scalar, we check if it conflicts with a maybe already store value # since an empty attrset is the default value, we cannot check that, so we just set it to the value elif isinstance(value, float | int | str) or value is None: - assert selectors == [] + if selectors != []: + msg = "Expected empty selectors for scalar value" + raise ClanError(msg) if self.value == {}: self.value = value # Only check for outPath wrapping conflicts for strings (store paths) @@ -491,7 +525,9 @@ class FlakeCacheEntry: # we just fetch all subkeys, so we need to check of we inserted all keys at this level before if selector.type == SelectorType.ALL: - assert isinstance(self.value, dict) + if not isinstance(self.value, dict): + msg = f"Expected dict for ALL selector caching, got {type(self.value)}" + raise ClanError(msg) if self.fetched_all: result = all( self.value[sel].is_cached(selectors[1:]) for sel in self.value @@ -520,7 +556,9 @@ class FlakeCacheEntry: if (selector.type in (SelectorType.STR, SelectorType.MAYBE)) and isinstance( self.value, dict ): - assert isinstance(selector.value, str) + if not isinstance(selector.value, str): + msg = f"Expected str for STR/MAYBE selector value in caching, got {type(selector.value)}" + raise ClanError(msg) val = selector.value if val not in self.value: # if we fetched all keys and we are not in there, refetching won't help, so we can assume we are cached @@ -548,12 +586,16 @@ class FlakeCacheEntry: # if we fetch a specific key, we return the recurse into that value in the dict if selector.type == SelectorType.STR and isinstance(self.value, dict): - assert isinstance(selector.value, str) + if not isinstance(selector.value, str): + msg = f"Expected str for STR selector value in select, got {type(selector.value)}" + raise ClanError(msg) return self.value[selector.value].select(selectors[1:]) # if we are a MAYBE selector, we check if the key exists in the dict if selector.type == SelectorType.MAYBE: - assert isinstance(selector.value, str) + if not isinstance(selector.value, str): + msg = f"Expected str for MAYBE selector value in select, got {type(selector.value)}" + raise ClanError(msg) if isinstance(self.value, dict): if selector.value in self.value: if self.value[selector.value].exists: @@ -570,7 +612,11 @@ class FlakeCacheEntry: # Handle SET selector on non-dict values if selector.type == SelectorType.SET and not isinstance(self.value, dict): - assert isinstance(selector.value, list) + if not isinstance(selector.value, list): + msg = ( + f"Expected list for SET selector value, got {type(selector.value)}" + ) + raise ClanError(msg) # Empty set or all sub-selectors are MAYBE if len(selector.value) == 0: # Empty set, return empty dict @@ -595,7 +641,9 @@ class FlakeCacheEntry: # if we want to select a set of keys, we take the keys from the selector if selector.type == SelectorType.SET: - assert isinstance(selector.value, list) + if not isinstance(selector.value, list): + msg = f"Expected list for SET selector value in select, got {type(selector.value)}" + raise ClanError(msg) for subselector in selector.value: # make sure the keys actually exist if we have a maybe selector if subselector.type == SetSelectorType.MAYBE: @@ -634,12 +682,16 @@ class FlakeCacheEntry: str_selector = "*" elif selector.type == SelectorType.SET: subselectors: list[str] = [] - assert isinstance(selector.value, list) + if not isinstance(selector.value, list): + msg = f"Expected list for SET selector value in error handling, got {type(selector.value)}" + raise ClanError(msg) for subselector in selector.value: subselectors.append(subselector.value) str_selector = "{" + ",".join(subselectors) + "}" else: - assert isinstance(selector.value, str) + if not isinstance(selector.value, str): + msg = f"Expected str for selector value in error handling, got {type(selector.value)}" + raise ClanError(msg) str_selector = selector.value raise KeyError(str_selector) @@ -769,7 +821,9 @@ class Flake: def is_local(self) -> bool: if self._is_local is None: self.invalidate_cache() - assert isinstance(self._is_local, bool) + if not isinstance(self._is_local, bool): + msg = f"Expected bool for is_local, got {type(self._is_local)}" + raise ClanError(msg) return self._is_local def get_input_names(self) -> list[str]: @@ -781,7 +835,9 @@ class Flake: def path(self) -> Path: if self._path is None: self.invalidate_cache() - assert isinstance(self._path, Path) + if not isinstance(self._path, Path): + msg = f"Expected Path for path, got {type(self._path)}" + raise ClanError(msg) return self._path def load_cache(self) -> None: @@ -847,7 +903,9 @@ class Flake: self.prefetch() self._cache = FlakeCache() - assert self.hash is not None + if self.hash is None: + msg = "Hash cannot be None" + raise ClanError(msg) hashed_hash = sha1(self.hash.encode()).hexdigest() self.flake_cache_path = ( Path(user_cache_dir()) / "clan" / "flakes-v2" / hashed_hash @@ -867,7 +925,9 @@ class Flake: self._path = Path(self.flake_metadata["original"]["path"]) else: self._is_local = False - assert self.store_path is not None + if self.store_path is None: + msg = "Store path cannot be None" + raise ClanError(msg) self._path = Path(self.store_path) def get_from_nix( @@ -901,7 +961,9 @@ class Flake: if self._cache is None: self.invalidate_cache() - assert self._cache is not None + if self._cache is None: + msg = "Cache cannot be None after invalidation" + raise ClanError(msg) nix_options = self.nix_options[:] if self.nix_options is not None else [] @@ -915,9 +977,9 @@ class Flake: if not select_hash.startswith("sha256-"): select_flake = Flake(str(select_source()), nix_options=nix_options) select_flake.invalidate_cache() - assert select_flake.hash is not None, ( - "this should be impossible as invalidate_cache() should always set `hash`" - ) + if select_flake.hash is None: + msg = "this should be impossible as invalidate_cache() should always set `hash`" + raise ClanError(msg) select_hash = select_flake.hash # fmt: off @@ -1011,8 +1073,12 @@ class Flake: """ if self._cache is None: self.invalidate_cache() - assert self._cache is not None - assert self.flake_cache_path is not None + if self._cache is None: + msg = "Cache cannot be None after invalidation" + raise ClanError(msg) + if self.flake_cache_path is None: + msg = "Flake cache path cannot be None" + raise ClanError(msg) not_fetched_selectors = [] for selector in selectors: if not self._cache.is_cached(selector): @@ -1034,8 +1100,12 @@ class Flake: """ if self._cache is None: self.invalidate_cache() - assert self._cache is not None - assert self.flake_cache_path is not None + if self._cache is None: + msg = "Cache cannot be None after invalidation" + raise ClanError(msg) + if self.flake_cache_path is None: + msg = "Flake cache path cannot be None" + raise ClanError(msg) if not self._cache.is_cached(selector): log.debug(f"(cached) $ clan select {shlex.quote(selector)}") diff --git a/pkgs/clan-cli/clan_lib/import_utils/__init__.py b/pkgs/clan-cli/clan_lib/import_utils/__init__.py index fdebd2774..149717f27 100644 --- a/pkgs/clan-cli/clan_lib/import_utils/__init__.py +++ b/pkgs/clan-cli/clan_lib/import_utils/__init__.py @@ -5,6 +5,8 @@ from dataclasses import dataclass from pathlib import Path from typing import Any, TypeVar, cast +from clan_lib.errors import ClanError + T = TypeVar("T") @@ -76,7 +78,9 @@ def import_with_source[T]( # Get the file path file_path_str = module.__file__ - assert file_path_str is not None, f"Module {module_name} file path cannot be None" + if file_path_str is None: + msg = f"Module {module_name} file path cannot be None" + raise ClanError(msg) # Make the path relative to home for better readability try: diff --git a/pkgs/clan-cli/clan_lib/machines/actions.py b/pkgs/clan-cli/clan_lib/machines/actions.py index bbc6e7219..56dfe3099 100644 --- a/pkgs/clan-cli/clan_lib/machines/actions.py +++ b/pkgs/clan-cli/clan_lib/machines/actions.py @@ -94,7 +94,9 @@ def get_machine(flake: Flake, name: str) -> InventoryMachine: @API.register def set_machine(machine: Machine, update: InventoryMachine) -> None: """Update the machine information in the inventory.""" - assert machine.name == update.get("name", machine.name), "Machine name mismatch" + if machine.name != update.get("name", machine.name): + msg = "Machine name mismatch" + raise ClanError(msg) inventory_store = InventoryStore(flake=machine.flake) inventory = inventory_store.read() diff --git a/pkgs/clan-cli/clan_lib/services/modules.py b/pkgs/clan-cli/clan_lib/services/modules.py index ed1b5a2b6..f69917bb6 100644 --- a/pkgs/clan-cli/clan_lib/services/modules.py +++ b/pkgs/clan-cli/clan_lib/services/modules.py @@ -184,11 +184,15 @@ def get_service_module( avilable_modules = list_service_modules(flake) module_set = avilable_modules.get("modules", {}).get(input_name) - assert module_set is not None # Since check_service_module_ref already checks this + if module_set is None: + msg = f"Module set for input '{input_name}' not found" + raise ClanError(msg) module = module_set.get(module_name) - assert module is not None # Since check_service_module_ref already checks this + if module is None: + msg = f"Module '{module_name}' not found in input '{input_name}'" + raise ClanError(msg) return module @@ -217,7 +221,9 @@ def check_service_module_ref( raise ClanError(msg) module_name = module_ref.get("name") - assert module_name + if not module_name: + msg = "Module name is required in module_ref" + raise ClanError(msg) module = module_set.get(module_name) if module is None: msg = f"module with name '{module_name}' not found" diff --git a/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py b/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py index 5e7235276..d5cf3a9b5 100644 --- a/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py +++ b/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py @@ -82,7 +82,9 @@ class SudoAskpassProxy: prompt = line[len("PASSWORD_REQUESTED:") :].strip() password = self.handle_password_request(prompt) print(password, file=ssh_process.stdin) - assert ssh_process.stdin is not None, "SSH process stdin is None" + if ssh_process.stdin is None: + msg = "SSH process stdin is None" + raise ClanError(msg) ssh_process.stdin.flush() else: print(line) @@ -107,7 +109,9 @@ class SudoAskpassProxy: raise ClanError(msg) from e # Monitor SSH output for password requests - assert self.ssh_process.stdout is not None, "SSH process stdout is None" + if self.ssh_process.stdout is None: + msg = "SSH process stdout is None" + raise ClanError(msg) for line in self.ssh_process.stdout: line = line.strip() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py b/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py index cee2337f8..d5ab02efe 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py @@ -16,6 +16,7 @@ from clan_cli import vms from clan_cli.vms.inspect import inspect_vm from clan_cli.vms.qemu import QMPWrapper from clan_lib.dirs import vm_state_dir +from clan_lib.errors import ClanError from clan_lib.machines.machines import Machine from clan_vm_manager.clan_uri import ClanURI @@ -158,14 +159,18 @@ class VMObject(GObject.Object): name=self.data.flake.flake_attr, flake=uri.flake, ) - assert self.machine is not None + if self.machine is None: + msg = "Machine object is not available" + raise ClanError(msg) state_dir = vm_state_dir( flake_url=self.machine.flake.identifier, vm_name=self.machine.name, ) self.qmp_wrap = QMPWrapper(state_dir) - assert self.machine is not None + if self.machine is None: + msg = "Machine object is not available" + raise ClanError(msg) yield self.machine self.machine = None @@ -332,7 +337,9 @@ class VMObject(GObject.Object): # Try to shutdown the VM gracefully using QMP try: - assert self.qmp_wrap is not None + if self.qmp_wrap is None: + msg = "QMP wrapper is not available" + raise ClanError(msg) with self.qmp_wrap.qmp_ctx() as qmp: qmp.command("system_powerdown") except Exception as ex: diff --git a/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py index 637ac9c30..785dfd46e 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py @@ -4,6 +4,7 @@ from collections.abc import Callable from typing import Any, ClassVar, cast import gi +from clan_lib.errors import ClanError from clan_lib.machines.machines import Machine from clan_vm_manager.clan_uri import ClanURI @@ -109,7 +110,9 @@ class JoinList: def _on_join_finished(self, source: JoinValue) -> None: log.info(f"Join finished: {source.url}") self.discard(source) - assert source.entry is not None + if source.entry is None: + msg = "Join entry is not available" + raise ClanError(msg) ClanStore.use().push_history_entry(source.entry) def discard(self, value: JoinValue) -> None: diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py index a2868ca7d..e5ef3511d 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py @@ -64,7 +64,9 @@ class ClanList(Gtk.Box): super().__init__(orientation=Gtk.Orientation.VERTICAL) app = Gio.Application.get_default() - assert app is not None + if app is None: + msg = "Application is not available" + raise ClanError(msg) app.connect("join_request", self.on_join_request) self.log_label: Gtk.Label = Gtk.Label() @@ -306,7 +308,9 @@ class ClanList(Gtk.Box): # Can't do this here because clan store is empty at this point if vm is not None: sub = row.get_subtitle() - assert sub is not None + if sub is None: + msg = "Subtitle is not available" + raise ClanError(msg) ToastOverlay.use().add_toast_unique( WarningToast( diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py b/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py index 4c6b7c6d3..e298a9c7b 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/logs.py @@ -1,6 +1,7 @@ import logging import gi +from clan_lib.errors import ClanError gi.require_version("Adw", "1") from gi.repository import Adw, Gio, Gtk @@ -19,7 +20,9 @@ class Logs(Gtk.Box): super().__init__(orientation=Gtk.Orientation.VERTICAL) app = Gio.Application.get_default() - assert app is not None + if app is None: + msg = "Application is not available" + raise ClanError(msg) self.banner = Adw.Banner.new("") self.banner.set_use_markup(True) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py b/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py index f7fd2948e..41ba5c821 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py @@ -2,6 +2,7 @@ import logging import threading import gi +from clan_lib.errors import ClanError from clan_vm_manager.components.interfaces import ClanConfig from clan_vm_manager.history import list_history @@ -37,7 +38,9 @@ class MainWindow(Adw.ApplicationWindow): view.add_top_bar(header) app = Gio.Application.get_default() - assert app is not None + if app is None: + msg = "Application is not available" + raise ClanError(msg) self.tray_icon: TrayIcon = TrayIcon(app) # Initialize all ClanStore diff --git a/pkgs/classgen/main.py b/pkgs/classgen/main.py index 2e8c26152..828299f43 100644 --- a/pkgs/classgen/main.py +++ b/pkgs/classgen/main.py @@ -69,10 +69,14 @@ def map_json_type( if json_type == "number": return ["float"] if json_type == "array": - assert nested_types, f"Array type not found for {parent}" + if not nested_types: + msg = f"Array type not found for {parent}" + raise Error(msg) return [f"""list[{" | ".join(sort_types(nested_types))}]"""] if json_type == "object": - assert nested_types, f"dict type not found for {parent}" + if not nested_types: + msg = f"dict type not found for {parent}" + raise Error(msg) return [f"""dict[str, {" | ".join(sort_types(nested_types))}]"""] if json_type == "null": return ["None"] @@ -324,7 +328,9 @@ def generate_dataclass( parent=field_name, ) - assert field_types, f"Python type not found for {prop} {prop_info}" + if not field_types: + msg = f"Python type not found for {prop} {prop_info}" + raise Error(msg) field_meta = None if field_name != prop: diff --git a/pkgs/zerotier-members/zerotier-members.py b/pkgs/zerotier-members/zerotier-members.py index 78393288f..3743db7c4 100755 --- a/pkgs/zerotier-members/zerotier-members.py +++ b/pkgs/zerotier-members/zerotier-members.py @@ -14,9 +14,9 @@ class ClanError(Exception): def compute_zerotier_ip(network_id: str, identity: str) -> ipaddress.IPv6Address: - assert len(network_id) == 16, ( - f"network_id must be 16 characters long, got {network_id}" - ) + if len(network_id) != 16: + msg = f"network_id must be 16 characters long, got {network_id}" + raise ClanError(msg) nwid = int(network_id, 16) node_id = int(identity, 16) addr_parts = bytearray(