ruff: replace asserts outside of tests with Exceptions

This commit is contained in:
Jörg Thalheim
2025-08-20 15:45:49 +02:00
parent 5be9b8383b
commit dc5485d9f1
23 changed files with 257 additions and 91 deletions

View File

@@ -218,8 +218,12 @@ class Machine:
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, env=env)
def get_systemd_process(self) -> int:
assert self.process is not None, "Machine not started"
assert self.process.stdout is not None, "Machine has no stdout"
if self.process is None:
msg = "Machine not started"
raise RuntimeError(msg)
if self.process.stdout is None:
msg = "Machine has no stdout"
raise RuntimeError(msg)
for line in self.process.stdout:
print(line, end="")
@@ -236,9 +240,9 @@ class Machine:
.read_text()
.split()
)
assert len(childs) == 1, (
f"Expected exactly one child process for systemd-nspawn, got {childs}"
)
if len(childs) != 1:
msg = f"Expected exactly one child process for systemd-nspawn, got {childs}"
raise RuntimeError(msg)
try:
return int(childs[0])
except ValueError as e:
@@ -258,7 +262,9 @@ class Machine:
def tuple_from_line(line: str) -> tuple[str, str]:
match = line_pattern.match(line)
assert match is not None
if match is None:
msg = f"Failed to parse line: {line}"
raise RuntimeError(msg)
return match[1], match[2]
return dict(
@@ -575,7 +581,9 @@ class Driver:
# We lauch a sleep here, so we can pgrep the process cmdline for
# the uuid
sleep = shutil.which("sleep")
assert sleep is not None, "sleep command not found"
if sleep is None:
msg = "sleep command not found"
raise RuntimeError(msg)
machine.execute(
f"systemd-run /bin/sh -c '{sleep} 999999999 && echo {nspawn_uuid}'",
)

View File

@@ -55,9 +55,9 @@ class Identity:
def node_id(self) -> str:
nid = self.public.split(":")[0]
assert len(nid) == 10, (
f"node_id must be 10 characters long, got {len(nid)}: {nid}"
)
if len(nid) != 10:
msg = f"node_id must be 10 characters long, got {len(nid)}: {nid}"
raise ClanError(msg)
return nid
@@ -172,9 +172,9 @@ def create_identity() -> Identity:
def compute_zerotier_ip(network_id: str, identity: Identity) -> ipaddress.IPv6Address:
assert len(network_id) == 16, (
f"network_id must be 16 characters long, got '{network_id}'"
)
if len(network_id) != 16:
msg = f"network_id must be 16 characters long, got '{network_id}'"
raise ClanError(msg)
nwid = int(network_id, 16)
node_id = int(identity.node_id(), 16)
addr_parts = bytearray(

View File

@@ -99,7 +99,9 @@ class Webview:
"""Get the bridge, creating it if necessary."""
if self._bridge is None:
self.create_bridge()
assert self._bridge is not None, "Bridge should be created"
if self._bridge is None:
msg = "Bridge should be created"
raise RuntimeError(msg)
return self._bridge
def api_wrapper(

View File

@@ -3,6 +3,8 @@ import logging
from collections.abc import Sequence
from typing import Any
from clan_lib.errors import ClanError
log = logging.getLogger(__name__)
@@ -19,6 +21,8 @@ class AppendOptionAction(argparse.Action):
) -> None:
lst = getattr(namespace, self.dest)
lst.append("--option")
assert isinstance(values, list), "values must be a list"
if not values or not hasattr(values, "__getitem__"):
msg = "values must be indexable"
raise ClanError(msg)
lst.append(values[0])
lst.append(values[1])

View File

@@ -3,6 +3,7 @@ import logging
from collections.abc import Sequence
from typing import Any
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_lib.templates.disk import set_machine_disk_schema
@@ -27,7 +28,9 @@ class AppendSetAction(argparse.Action):
option_string: str | None = None,
) -> None:
lst = getattr(namespace, self.dest)
assert isinstance(values, list), "values must be a list"
if not values or not hasattr(values, "__getitem__"):
msg = "values must be indexable"
raise ClanError(msg)
lst.append((values[0], values[1]))

View File

@@ -4,6 +4,7 @@ from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING
from clan_lib.errors import ClanError
from clan_lib.nix import nix_test_store
from .check import check_vars
@@ -64,8 +65,12 @@ class Generator:
@cached_property
def exists(self) -> bool:
assert self.machine is not None
assert self._flake is not None
if self.machine is None:
msg = "Machine cannot be None"
raise ClanError(msg)
if self._flake is None:
msg = "Flake cannot be None"
raise ClanError(msg)
return check_vars(self.machine, self._flake, generator_name=self.name)
@classmethod
@@ -174,8 +179,12 @@ class Generator:
return None
def final_script(self) -> Path:
assert self.machine is not None
assert self._flake is not None
if self.machine is None:
msg = "Machine cannot be None"
raise ClanError(msg)
if self._flake is None:
msg = "Flake cannot be None"
raise ClanError(msg)
from clan_lib.machines.machines import Machine
machine = Machine(name=self.machine, flake=self._flake)
@@ -189,8 +198,12 @@ class Generator:
return output
def validation(self) -> str | None:
assert self.machine is not None
assert self._flake is not None
if self.machine is None:
msg = "Machine cannot be None"
raise ClanError(msg)
if self._flake is None:
msg = "Flake cannot be None"
raise ClanError(msg)
from clan_lib.machines.machines import Machine
machine = Machine(name=self.machine, flake=self._flake)

View File

@@ -2,6 +2,8 @@ from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from clan_lib.errors import ClanError
if TYPE_CHECKING:
from clan_cli.vars.generator import Generator
@@ -31,8 +33,12 @@ class Var:
@property
def value(self) -> bytes:
assert self._store is not None
assert self._generator is not None
if self._store is None:
msg = "Store cannot be None"
raise ClanError(msg)
if self._generator is None:
msg = "Generator cannot be None"
raise ClanError(msg)
if not self._store.exists(self._generator, self.name):
msg = f"Var {self.id} has not been generated yet"
raise ValueError(msg)
@@ -47,14 +53,22 @@ class Var:
return "<binary blob>"
def set(self, value: bytes) -> list[Path]:
assert self._store is not None
assert self._generator is not None
if self._store is None:
msg = "Store cannot be None"
raise ClanError(msg)
if self._generator is None:
msg = "Generator cannot be None"
raise ClanError(msg)
return self._store.set(self._generator, self, value)
@property
def exists(self) -> bool:
assert self._store is not None
assert self._generator is not None
if self._store is None:
msg = "Store cannot be None"
raise ClanError(msg)
if self._generator is None:
msg = "Generator cannot be None"
raise ClanError(msg)
return self._store.exists(self._generator, self.name)
def __str__(self) -> str:

View File

@@ -5,6 +5,7 @@ from dataclasses import dataclass
from clan_lib.api import API
from clan_lib.async_run import get_async_ctx, is_async_cancelled
from clan_lib.errors import ClanError
log = logging.getLogger(__name__)
@@ -21,7 +22,9 @@ BAKEND_THREADS: dict[str, WebThread] | None = None
@API.register
def delete_task(task_id: str) -> None:
"""Cancel a task by its op_key."""
assert BAKEND_THREADS is not None, "Backend threads not initialized"
if BAKEND_THREADS is None:
msg = "Backend threads not initialized"
raise ClanError(msg)
future = BAKEND_THREADS.get(task_id)
log.debug(f"Thread ID: {threading.get_ident()}")
@@ -53,5 +56,7 @@ def run_task_blocking(somearg: str) -> str:
@API.register
def list_tasks() -> list[str]:
"""List all tasks."""
assert BAKEND_THREADS is not None, "Backend threads not initialized"
if BAKEND_THREADS is None:
msg = "Backend threads not initialized"
raise ClanError(msg)
return list(BAKEND_THREADS.keys())

View File

@@ -21,6 +21,7 @@ from typing import (
)
from clan_lib.api.serde import dataclass_to_dict
from clan_lib.errors import ClanError
class JSchemaTypeError(Exception):
@@ -123,9 +124,9 @@ def type_to_dict(
for f in fields:
if f.name.startswith("_"):
continue
assert not isinstance(f.type, str), (
f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?"
)
if isinstance(f.type, str):
msg = f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?"
raise ClanError(msg)
properties[f.metadata.get("alias", f.name)] = type_to_dict(
f.type,
f"{scope} {t.__name__}.{f.name}", # type: ignore

View File

@@ -159,7 +159,9 @@ def handle_io(
if process.stdin in writelist:
if input_bytes:
try:
assert process.stdin is not None
if process.stdin is None:
msg = "Process stdin is unexpectedly None"
raise ClanError(msg)
written = os.write(process.stdin.fileno(), input_bytes)
except BrokenPipeError:
wlist.remove(process.stdin)

View File

@@ -157,7 +157,9 @@ def machines_dir(flake: "Flake") -> Path:
return flake.path / "machines"
store_path = flake.store_path
assert store_path is not None, "Invalid flake object. Doesn't have a store path"
if store_path is None:
msg = "Invalid flake object. Doesn't have a store path"
raise ClanError(msg)
return Path(store_path) / "machines"

View File

@@ -138,7 +138,9 @@ class Selector:
def as_dict(self) -> dict[str, Any]:
if self.type == SelectorType.SET:
assert isinstance(self.value, list)
if not isinstance(self.value, list):
msg = f"Expected list for SET selector, got {type(self.value)}"
raise ClanError(msg)
return {
"type": self.type.value,
"value": [asdict(selector) for selector in self.value],
@@ -146,10 +148,14 @@ class Selector:
if self.type == SelectorType.ALL:
return {"type": self.type.value}
if self.type == SelectorType.STR:
assert isinstance(self.value, str)
if not isinstance(self.value, str):
msg = f"Expected str for STR selector, got {type(self.value)}"
raise ClanError(msg)
return {"type": self.type.value, "value": self.value}
if self.type == SelectorType.MAYBE:
assert isinstance(self.value, str)
if not isinstance(self.value, str):
msg = f"Expected str for MAYBE selector, got {type(self.value)}"
raise ClanError(msg)
return {"type": self.type.value, "value": self.value}
msg = f"Invalid selector type: {self.type}"
raise ValueError(msg)
@@ -385,8 +391,12 @@ class FlakeCacheEntry:
# if we have a string selector, that means we are usually on a dict or a list, since we cannot walk down scalar values
# so we passthrough the value to the next level
if selector.type == SelectorType.STR:
assert isinstance(selector.value, str)
assert isinstance(self.value, dict)
if not isinstance(selector.value, str):
msg = f"Expected str for STR selector value, got {type(selector.value)}"
raise ClanError(msg)
if not isinstance(self.value, dict):
msg = f"Expected dict for cache value, got {type(self.value)}"
raise ClanError(msg)
if selector.value not in self.value:
self.value[selector.value] = FlakeCacheEntry()
self.value[selector.value].insert(value, selectors[1:])
@@ -395,9 +405,17 @@ class FlakeCacheEntry:
# otherwise we just insert the value into the current dict
# we can skip creating the non existing entry if we already fetched all keys
elif selector.type == SelectorType.MAYBE:
assert isinstance(self.value, dict)
assert isinstance(value, dict)
assert isinstance(selector.value, str)
if not isinstance(self.value, dict):
msg = f"Expected dict for cache value in MAYBE, got {type(self.value)}"
raise ClanError(msg)
if not isinstance(value, dict):
msg = f"Expected dict for value in MAYBE, got {type(value)}"
raise ClanError(msg)
if not isinstance(selector.value, str):
msg = (
f"Expected str for MAYBE selector value, got {type(selector.value)}"
)
raise ClanError(msg)
if selector.value in value:
if selector.value not in self.value:
self.value[selector.value] = FlakeCacheEntry()
@@ -409,7 +427,9 @@ class FlakeCacheEntry:
# insert a dict is pretty straight forward
elif isinstance(value, dict):
assert isinstance(self.value, dict)
if not isinstance(self.value, dict):
msg = f"Expected dict for cache value in dict insert, got {type(self.value)}"
raise ClanError(msg)
for key, value_ in value.items():
if key not in self.value:
self.value[key] = FlakeCacheEntry()
@@ -421,21 +441,31 @@ class FlakeCacheEntry:
fetched_indices: list[str] = []
# if we are in a set, we take all the selectors
if selector.type == SelectorType.SET:
assert isinstance(selector.value, list)
if not isinstance(selector.value, list):
msg = f"Expected list for SET selector value, got {type(selector.value)}"
raise ClanError(msg)
for subselector in selector.value:
fetched_indices.append(subselector.value)
# if it's just a str, that is the index
elif selector.type == SelectorType.STR:
assert isinstance(selector.value, str)
if not isinstance(selector.value, str):
msg = f"Expected str for STR selector value, got {type(selector.value)}"
raise ClanError(msg)
fetched_indices = [selector.value]
# otherwise we just take all the indices, which is the length of the list
elif selector.type == SelectorType.ALL:
fetched_indices = list(map(str, range(len(value))))
# insert is the same is insert a dict
assert isinstance(self.value, dict)
if not isinstance(self.value, dict):
msg = f"Expected dict for cache value in list insert, got {type(self.value)}"
raise ClanError(msg)
for i, requested_index in enumerate(fetched_indices):
assert isinstance(requested_index, str)
if not isinstance(requested_index, str):
msg = (
f"Expected str for requested index, got {type(requested_index)}"
)
raise ClanError(msg)
if requested_index not in self.value:
self.value[requested_index] = FlakeCacheEntry()
self.value[requested_index].insert(value[i], selectors[1:])
@@ -444,13 +474,17 @@ class FlakeCacheEntry:
# if they are, we store them as a dict with the outPath key
# this is to mirror nix behavior, where the outPath of an attrset is used if no further key is specified
elif isinstance(value, str) and is_pure_store_path(value):
assert selectors == []
if selectors != []:
msg = "Expected empty selectors for pure store path"
raise ClanError(msg)
self.value = {"outPath": FlakeCacheEntry(value)}
# if we have a normal scalar, we check if it conflicts with a maybe already store value
# since an empty attrset is the default value, we cannot check that, so we just set it to the value
elif isinstance(value, float | int | str) or value is None:
assert selectors == []
if selectors != []:
msg = "Expected empty selectors for scalar value"
raise ClanError(msg)
if self.value == {}:
self.value = value
# Only check for outPath wrapping conflicts for strings (store paths)
@@ -491,7 +525,9 @@ class FlakeCacheEntry:
# we just fetch all subkeys, so we need to check of we inserted all keys at this level before
if selector.type == SelectorType.ALL:
assert isinstance(self.value, dict)
if not isinstance(self.value, dict):
msg = f"Expected dict for ALL selector caching, got {type(self.value)}"
raise ClanError(msg)
if self.fetched_all:
result = all(
self.value[sel].is_cached(selectors[1:]) for sel in self.value
@@ -520,7 +556,9 @@ class FlakeCacheEntry:
if (selector.type in (SelectorType.STR, SelectorType.MAYBE)) and isinstance(
self.value, dict
):
assert isinstance(selector.value, str)
if not isinstance(selector.value, str):
msg = f"Expected str for STR/MAYBE selector value in caching, got {type(selector.value)}"
raise ClanError(msg)
val = selector.value
if val not in self.value:
# if we fetched all keys and we are not in there, refetching won't help, so we can assume we are cached
@@ -548,12 +586,16 @@ class FlakeCacheEntry:
# if we fetch a specific key, we return the recurse into that value in the dict
if selector.type == SelectorType.STR and isinstance(self.value, dict):
assert isinstance(selector.value, str)
if not isinstance(selector.value, str):
msg = f"Expected str for STR selector value in select, got {type(selector.value)}"
raise ClanError(msg)
return self.value[selector.value].select(selectors[1:])
# if we are a MAYBE selector, we check if the key exists in the dict
if selector.type == SelectorType.MAYBE:
assert isinstance(selector.value, str)
if not isinstance(selector.value, str):
msg = f"Expected str for MAYBE selector value in select, got {type(selector.value)}"
raise ClanError(msg)
if isinstance(self.value, dict):
if selector.value in self.value:
if self.value[selector.value].exists:
@@ -570,7 +612,11 @@ class FlakeCacheEntry:
# Handle SET selector on non-dict values
if selector.type == SelectorType.SET and not isinstance(self.value, dict):
assert isinstance(selector.value, list)
if not isinstance(selector.value, list):
msg = (
f"Expected list for SET selector value, got {type(selector.value)}"
)
raise ClanError(msg)
# Empty set or all sub-selectors are MAYBE
if len(selector.value) == 0:
# Empty set, return empty dict
@@ -595,7 +641,9 @@ class FlakeCacheEntry:
# if we want to select a set of keys, we take the keys from the selector
if selector.type == SelectorType.SET:
assert isinstance(selector.value, list)
if not isinstance(selector.value, list):
msg = f"Expected list for SET selector value in select, got {type(selector.value)}"
raise ClanError(msg)
for subselector in selector.value:
# make sure the keys actually exist if we have a maybe selector
if subselector.type == SetSelectorType.MAYBE:
@@ -634,12 +682,16 @@ class FlakeCacheEntry:
str_selector = "*"
elif selector.type == SelectorType.SET:
subselectors: list[str] = []
assert isinstance(selector.value, list)
if not isinstance(selector.value, list):
msg = f"Expected list for SET selector value in error handling, got {type(selector.value)}"
raise ClanError(msg)
for subselector in selector.value:
subselectors.append(subselector.value)
str_selector = "{" + ",".join(subselectors) + "}"
else:
assert isinstance(selector.value, str)
if not isinstance(selector.value, str):
msg = f"Expected str for selector value in error handling, got {type(selector.value)}"
raise ClanError(msg)
str_selector = selector.value
raise KeyError(str_selector)
@@ -769,7 +821,9 @@ class Flake:
def is_local(self) -> bool:
if self._is_local is None:
self.invalidate_cache()
assert isinstance(self._is_local, bool)
if not isinstance(self._is_local, bool):
msg = f"Expected bool for is_local, got {type(self._is_local)}"
raise ClanError(msg)
return self._is_local
def get_input_names(self) -> list[str]:
@@ -781,7 +835,9 @@ class Flake:
def path(self) -> Path:
if self._path is None:
self.invalidate_cache()
assert isinstance(self._path, Path)
if not isinstance(self._path, Path):
msg = f"Expected Path for path, got {type(self._path)}"
raise ClanError(msg)
return self._path
def load_cache(self) -> None:
@@ -847,7 +903,9 @@ class Flake:
self.prefetch()
self._cache = FlakeCache()
assert self.hash is not None
if self.hash is None:
msg = "Hash cannot be None"
raise ClanError(msg)
hashed_hash = sha1(self.hash.encode()).hexdigest()
self.flake_cache_path = (
Path(user_cache_dir()) / "clan" / "flakes-v2" / hashed_hash
@@ -867,7 +925,9 @@ class Flake:
self._path = Path(self.flake_metadata["original"]["path"])
else:
self._is_local = False
assert self.store_path is not None
if self.store_path is None:
msg = "Store path cannot be None"
raise ClanError(msg)
self._path = Path(self.store_path)
def get_from_nix(
@@ -901,7 +961,9 @@ class Flake:
if self._cache is None:
self.invalidate_cache()
assert self._cache is not None
if self._cache is None:
msg = "Cache cannot be None after invalidation"
raise ClanError(msg)
nix_options = self.nix_options[:] if self.nix_options is not None else []
@@ -915,9 +977,9 @@ class Flake:
if not select_hash.startswith("sha256-"):
select_flake = Flake(str(select_source()), nix_options=nix_options)
select_flake.invalidate_cache()
assert select_flake.hash is not None, (
"this should be impossible as invalidate_cache() should always set `hash`"
)
if select_flake.hash is None:
msg = "this should be impossible as invalidate_cache() should always set `hash`"
raise ClanError(msg)
select_hash = select_flake.hash
# fmt: off
@@ -1011,8 +1073,12 @@ class Flake:
"""
if self._cache is None:
self.invalidate_cache()
assert self._cache is not None
assert self.flake_cache_path is not None
if self._cache is None:
msg = "Cache cannot be None after invalidation"
raise ClanError(msg)
if self.flake_cache_path is None:
msg = "Flake cache path cannot be None"
raise ClanError(msg)
not_fetched_selectors = []
for selector in selectors:
if not self._cache.is_cached(selector):
@@ -1034,8 +1100,12 @@ class Flake:
"""
if self._cache is None:
self.invalidate_cache()
assert self._cache is not None
assert self.flake_cache_path is not None
if self._cache is None:
msg = "Cache cannot be None after invalidation"
raise ClanError(msg)
if self.flake_cache_path is None:
msg = "Flake cache path cannot be None"
raise ClanError(msg)
if not self._cache.is_cached(selector):
log.debug(f"(cached) $ clan select {shlex.quote(selector)}")

View File

@@ -5,6 +5,8 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Any, TypeVar, cast
from clan_lib.errors import ClanError
T = TypeVar("T")
@@ -76,7 +78,9 @@ def import_with_source[T](
# Get the file path
file_path_str = module.__file__
assert file_path_str is not None, f"Module {module_name} file path cannot be None"
if file_path_str is None:
msg = f"Module {module_name} file path cannot be None"
raise ClanError(msg)
# Make the path relative to home for better readability
try:

View File

@@ -94,7 +94,9 @@ def get_machine(flake: Flake, name: str) -> InventoryMachine:
@API.register
def set_machine(machine: Machine, update: InventoryMachine) -> None:
"""Update the machine information in the inventory."""
assert machine.name == update.get("name", machine.name), "Machine name mismatch"
if machine.name != update.get("name", machine.name):
msg = "Machine name mismatch"
raise ClanError(msg)
inventory_store = InventoryStore(flake=machine.flake)
inventory = inventory_store.read()

View File

@@ -184,11 +184,15 @@ def get_service_module(
avilable_modules = list_service_modules(flake)
module_set = avilable_modules.get("modules", {}).get(input_name)
assert module_set is not None # Since check_service_module_ref already checks this
if module_set is None:
msg = f"Module set for input '{input_name}' not found"
raise ClanError(msg)
module = module_set.get(module_name)
assert module is not None # Since check_service_module_ref already checks this
if module is None:
msg = f"Module '{module_name}' not found in input '{input_name}'"
raise ClanError(msg)
return module
@@ -217,7 +221,9 @@ def check_service_module_ref(
raise ClanError(msg)
module_name = module_ref.get("name")
assert module_name
if not module_name:
msg = "Module name is required in module_ref"
raise ClanError(msg)
module = module_set.get(module_name)
if module is None:
msg = f"module with name '{module_name}' not found"

View File

@@ -82,7 +82,9 @@ class SudoAskpassProxy:
prompt = line[len("PASSWORD_REQUESTED:") :].strip()
password = self.handle_password_request(prompt)
print(password, file=ssh_process.stdin)
assert ssh_process.stdin is not None, "SSH process stdin is None"
if ssh_process.stdin is None:
msg = "SSH process stdin is None"
raise ClanError(msg)
ssh_process.stdin.flush()
else:
print(line)
@@ -107,7 +109,9 @@ class SudoAskpassProxy:
raise ClanError(msg) from e
# Monitor SSH output for password requests
assert self.ssh_process.stdout is not None, "SSH process stdout is None"
if self.ssh_process.stdout is None:
msg = "SSH process stdout is None"
raise ClanError(msg)
for line in self.ssh_process.stdout:
line = line.strip()

View File

@@ -16,6 +16,7 @@ from clan_cli import vms
from clan_cli.vms.inspect import inspect_vm
from clan_cli.vms.qemu import QMPWrapper
from clan_lib.dirs import vm_state_dir
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_vm_manager.clan_uri import ClanURI
@@ -158,14 +159,18 @@ class VMObject(GObject.Object):
name=self.data.flake.flake_attr,
flake=uri.flake,
)
assert self.machine is not None
if self.machine is None:
msg = "Machine object is not available"
raise ClanError(msg)
state_dir = vm_state_dir(
flake_url=self.machine.flake.identifier,
vm_name=self.machine.name,
)
self.qmp_wrap = QMPWrapper(state_dir)
assert self.machine is not None
if self.machine is None:
msg = "Machine object is not available"
raise ClanError(msg)
yield self.machine
self.machine = None
@@ -332,7 +337,9 @@ class VMObject(GObject.Object):
# Try to shutdown the VM gracefully using QMP
try:
assert self.qmp_wrap is not None
if self.qmp_wrap is None:
msg = "QMP wrapper is not available"
raise ClanError(msg)
with self.qmp_wrap.qmp_ctx() as qmp:
qmp.command("system_powerdown")
except Exception as ex:

View File

@@ -4,6 +4,7 @@ from collections.abc import Callable
from typing import Any, ClassVar, cast
import gi
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_vm_manager.clan_uri import ClanURI
@@ -109,7 +110,9 @@ class JoinList:
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
assert source.entry is not None
if source.entry is None:
msg = "Join entry is not available"
raise ClanError(msg)
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:

View File

@@ -64,7 +64,9 @@ class ClanList(Gtk.Box):
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
if app is None:
msg = "Application is not available"
raise ClanError(msg)
app.connect("join_request", self.on_join_request)
self.log_label: Gtk.Label = Gtk.Label()
@@ -306,7 +308,9 @@ class ClanList(Gtk.Box):
# Can't do this here because clan store is empty at this point
if vm is not None:
sub = row.get_subtitle()
assert sub is not None
if sub is None:
msg = "Subtitle is not available"
raise ClanError(msg)
ToastOverlay.use().add_toast_unique(
WarningToast(

View File

@@ -1,6 +1,7 @@
import logging
import gi
from clan_lib.errors import ClanError
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
@@ -19,7 +20,9 @@ class Logs(Gtk.Box):
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
if app is None:
msg = "Application is not available"
raise ClanError(msg)
self.banner = Adw.Banner.new("")
self.banner.set_use_markup(True)

View File

@@ -2,6 +2,7 @@ import logging
import threading
import gi
from clan_lib.errors import ClanError
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.history import list_history
@@ -37,7 +38,9 @@ class MainWindow(Adw.ApplicationWindow):
view.add_top_bar(header)
app = Gio.Application.get_default()
assert app is not None
if app is None:
msg = "Application is not available"
raise ClanError(msg)
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore

View File

@@ -69,10 +69,14 @@ def map_json_type(
if json_type == "number":
return ["float"]
if json_type == "array":
assert nested_types, f"Array type not found for {parent}"
if not nested_types:
msg = f"Array type not found for {parent}"
raise Error(msg)
return [f"""list[{" | ".join(sort_types(nested_types))}]"""]
if json_type == "object":
assert nested_types, f"dict type not found for {parent}"
if not nested_types:
msg = f"dict type not found for {parent}"
raise Error(msg)
return [f"""dict[str, {" | ".join(sort_types(nested_types))}]"""]
if json_type == "null":
return ["None"]
@@ -324,7 +328,9 @@ def generate_dataclass(
parent=field_name,
)
assert field_types, f"Python type not found for {prop} {prop_info}"
if not field_types:
msg = f"Python type not found for {prop} {prop_info}"
raise Error(msg)
field_meta = None
if field_name != prop:

View File

@@ -14,9 +14,9 @@ class ClanError(Exception):
def compute_zerotier_ip(network_id: str, identity: str) -> ipaddress.IPv6Address:
assert len(network_id) == 16, (
f"network_id must be 16 characters long, got {network_id}"
)
if len(network_id) != 16:
msg = f"network_id must be 16 characters long, got {network_id}"
raise ClanError(msg)
nwid = int(network_id, 16)
node_id = int(identity, 16)
addr_parts = bytearray(