Merge pull request 'apply TRY lint' (#2035) from joerg-ci into main
This commit is contained in:
@@ -10,6 +10,10 @@ from tempfile import TemporaryDirectory
|
||||
from typing import Any
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def prepare_machine_root(machinename: str, root: Path) -> None:
|
||||
root.mkdir(parents=True, exist_ok=True)
|
||||
root.joinpath("etc").mkdir(parents=True, exist_ok=True)
|
||||
@@ -34,7 +38,7 @@ def retry(fn: Callable, timeout: int = 900) -> None:
|
||||
|
||||
if not fn(True):
|
||||
msg = f"action timed out after {timeout} seconds"
|
||||
raise Exception(msg)
|
||||
raise Error(msg)
|
||||
|
||||
|
||||
class Machine:
|
||||
@@ -100,7 +104,7 @@ class Machine:
|
||||
f'retrieving systemctl info for unit "{unit}"'
|
||||
f" failed with exit code {proc.returncode}"
|
||||
)
|
||||
raise Exception(msg)
|
||||
raise Error(msg)
|
||||
|
||||
line_pattern = re.compile(r"^([^=]+)=(.*)$")
|
||||
|
||||
@@ -208,7 +212,7 @@ class Machine:
|
||||
state = info["ActiveState"]
|
||||
if state == "failed":
|
||||
msg = f'unit "{unit}" reached state "{state}"'
|
||||
raise Exception(msg)
|
||||
raise Error(msg)
|
||||
|
||||
if state == "inactive":
|
||||
proc = self.systemctl("list-jobs --full 2>&1")
|
||||
@@ -216,7 +220,7 @@ class Machine:
|
||||
info = self.get_unit_info(unit)
|
||||
if info["ActiveState"] == state:
|
||||
msg = f'unit "{unit}" is inactive and there are no pending jobs'
|
||||
raise Exception(msg)
|
||||
raise Error(msg)
|
||||
|
||||
return state == "active"
|
||||
|
||||
@@ -267,7 +271,7 @@ class Driver:
|
||||
name_match = re.match(r".*-nixos-system-(.+)-(.+)", container.name)
|
||||
if not name_match:
|
||||
msg = f"Unable to extract hostname from {container.name}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
name = name_match.group(1)
|
||||
self.machines.append(
|
||||
Machine(
|
||||
|
||||
@@ -29,6 +29,7 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.api.modules import Frontmatter, extract_frontmatter, get_roles
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
# Get environment variables
|
||||
CLAN_CORE_PATH = Path(os.environ["CLAN_CORE_PATH"])
|
||||
@@ -153,11 +154,11 @@ options_head = "\n## Module Options\n"
|
||||
def produce_clan_core_docs() -> None:
|
||||
if not CLAN_CORE_DOCS:
|
||||
msg = f"Environment variables are not set correctly: $CLAN_CORE_DOCS={CLAN_CORE_DOCS}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
if not OUT:
|
||||
msg = f"Environment variables are not set correctly: $out={OUT}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
# A mapping of output file to content
|
||||
core_outputs: dict[str, str] = {}
|
||||
@@ -228,15 +229,15 @@ clan_modules_descr = """Clan modules are [NixOS modules](https://wiki.nixos.org/
|
||||
def produce_clan_modules_docs() -> None:
|
||||
if not CLAN_MODULES:
|
||||
msg = f"Environment variables are not set correctly: $out={CLAN_MODULES}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
if not CLAN_CORE_PATH:
|
||||
msg = f"Environment variables are not set correctly: $CLAN_CORE_PATH={CLAN_CORE_PATH}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
if not OUT:
|
||||
msg = f"Environment variables are not set correctly: $out={OUT}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
with Path(CLAN_MODULES).open() as f:
|
||||
links: dict[str, str] = json.load(f)
|
||||
|
||||
@@ -28,9 +28,10 @@ def try_bind_port(port: int) -> bool:
|
||||
try:
|
||||
tcp.bind(("127.0.0.1", port))
|
||||
udp.bind(("127.0.0.1", port))
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def try_connect_port(port: int) -> bool:
|
||||
@@ -216,7 +217,7 @@ def main() -> None:
|
||||
case "network":
|
||||
if args.network_id is None:
|
||||
msg = "network_id parameter is required"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
controller = create_network_controller()
|
||||
identity = controller.identity
|
||||
network_id = controller.networkid
|
||||
@@ -226,7 +227,7 @@ def main() -> None:
|
||||
network_id = args.network_id
|
||||
case _:
|
||||
msg = f"unknown mode {args.mode}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
ip = compute_zerotier_ip(network_id, identity)
|
||||
|
||||
args.identity_secret.write_text(identity.private)
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import (
|
||||
cast,
|
||||
)
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from gi.repository import GLib, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -54,8 +55,8 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
result = GLib.SOURCE_REMOVE
|
||||
try:
|
||||
result = self.async_run(**data)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
except Exception:
|
||||
log.exception("Error in async_run")
|
||||
# TODO: send error to js
|
||||
return result
|
||||
|
||||
@@ -78,8 +79,8 @@ class MethodExecutor(threading.Thread):
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.result = self.function(*self.args, **self.kwargs)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
except Exception:
|
||||
log.exception("Error in MethodExecutor")
|
||||
finally:
|
||||
self.finished = True
|
||||
|
||||
@@ -94,7 +95,7 @@ class GObjApi:
|
||||
|
||||
if fn_name in self._obj_registry:
|
||||
msg = f"Function '{fn_name}' already registered"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
self._obj_registry[fn_name] = obj
|
||||
|
||||
def check_signature(self, fn_signatures: dict[str, inspect.Signature]) -> None:
|
||||
@@ -121,7 +122,7 @@ class GObjApi:
|
||||
log.error(f"Expected signature: {exp_signature}")
|
||||
log.error(f"Actual signature: {got_signature}")
|
||||
msg = f"Overwritten method '{m_name}' has different signature than the implementation"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
def get_obj(self, fn_name: str) -> type[ImplFunc]:
|
||||
result = self._obj_registry.get(fn_name, None)
|
||||
@@ -131,7 +132,7 @@ class GObjApi:
|
||||
plain_fn = self._methods.get(fn_name, None)
|
||||
if plain_fn is None:
|
||||
msg = f"Method '{fn_name}' not found in Api"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
class GenericFnRuntime(ImplFunc[..., Any]):
|
||||
def __init__(self) -> None:
|
||||
|
||||
@@ -402,15 +402,12 @@ def main() -> None:
|
||||
try:
|
||||
args.func(args)
|
||||
except ClanError as e:
|
||||
if args.debug:
|
||||
log.exception(e)
|
||||
sys.exit(1)
|
||||
if isinstance(e, ClanCmdError):
|
||||
if e.cmd.msg:
|
||||
log.error(e.cmd.msg)
|
||||
log.exception(e.cmd.msg)
|
||||
sys.exit(1)
|
||||
|
||||
log.error(e)
|
||||
log.exception("An error occurred")
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt:
|
||||
log.warning("Interrupted by user")
|
||||
|
||||
@@ -110,10 +110,10 @@ API.register(open_file)
|
||||
def register(self, fn: Callable[..., T]) -> Callable[..., T]:
|
||||
if fn.__name__ in self._registry:
|
||||
msg = f"Function {fn.__name__} already registered"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
if fn.__name__ in self._orig_signature:
|
||||
msg = f"Function {fn.__name__} already registered"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
# make copy of original function
|
||||
self._orig_signature[fn.__name__] = signature(fn)
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.inventory import (
|
||||
AdminConfig,
|
||||
ServiceAdmin,
|
||||
@@ -39,7 +40,7 @@ def set_admin_service(
|
||||
|
||||
if not allowed_keys:
|
||||
msg = "At least one key must be provided to ensure access"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
instance = ServiceAdmin(
|
||||
meta=ServiceMeta(name=instance_name),
|
||||
|
||||
@@ -176,7 +176,7 @@ def set_service_instance(
|
||||
|
||||
if module_name not in service_keys:
|
||||
msg = f"{module_name} is not a valid Service attribute. Expected one of {', '.join(service_keys)}."
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
inventory = load_inventory_json(base_path)
|
||||
target_type = get_args(get_type_hints(Service)[module_name])[1]
|
||||
|
||||
@@ -203,8 +203,8 @@ def generate_facts(
|
||||
was_regenerated |= _generate_facts_for_machine(
|
||||
machine, service, regenerate, tmpdir, prompt
|
||||
)
|
||||
except (OSError, ClanError) as exc:
|
||||
log.error(f"Failed to generate facts for {machine.name}: {exc}")
|
||||
except (OSError, ClanError):
|
||||
log.exception(f"Failed to generate facts for {machine.name}")
|
||||
errors += 1
|
||||
if errors > 0:
|
||||
msg = (
|
||||
|
||||
@@ -125,10 +125,11 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
||||
res = proc.stdout.strip()
|
||||
data = json.loads(res)
|
||||
inventory = from_dict(Inventory, data)
|
||||
return inventory
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Error decoding inventory from flake: {e}"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return inventory
|
||||
|
||||
|
||||
def load_inventory_json(
|
||||
|
||||
@@ -200,7 +200,7 @@ def generate_machine_hardware_info(
|
||||
try:
|
||||
show_machine_hardware_platform(clan_dir, machine_name)
|
||||
except ClanCmdError as e:
|
||||
log.error(e)
|
||||
log.exception("Failed to evaluate hardware-configuration.nix")
|
||||
# Restore the backup file
|
||||
print(f"Restoring backup file {backup_file}")
|
||||
if backup_file:
|
||||
|
||||
@@ -72,10 +72,11 @@ def list_nixos_machines(flake_url: str | Path) -> list[str]:
|
||||
try:
|
||||
res = proc.stdout.strip()
|
||||
data = json.loads(res)
|
||||
return data
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Error decoding machines from flake: {e}"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return data
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -123,10 +124,10 @@ def check_machine_online(
|
||||
proc = run_no_stdout(cmd)
|
||||
if proc.returncode != 0:
|
||||
return "Offline"
|
||||
|
||||
return "Online"
|
||||
except ClanCmdError:
|
||||
return "Offline"
|
||||
else:
|
||||
return "Online"
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any
|
||||
|
||||
from clan_cli.cmd import run, run_no_stdout
|
||||
from clan_cli.dirs import nixpkgs_flake, nixpkgs_source
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
def nix_command(flags: list[str]) -> list[str]:
|
||||
@@ -145,7 +146,7 @@ def run_cmd(programs: list[str], cmd: list[str]) -> list[str]:
|
||||
for program in programs:
|
||||
if not Programs.is_allowed(program):
|
||||
msg = f"Program not allowed: {program}"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
return cmd
|
||||
missing_packages = [
|
||||
|
||||
@@ -108,9 +108,9 @@ def profile(func: Callable) -> Callable:
|
||||
profiler.enable()
|
||||
res = func(*args, **kwargs)
|
||||
profiler.disable()
|
||||
except Exception as ex:
|
||||
except Exception:
|
||||
profiler.disable()
|
||||
raise ex
|
||||
raise
|
||||
return res
|
||||
|
||||
if os.getenv("PERF", "0") == "1":
|
||||
|
||||
@@ -4,6 +4,8 @@ import socket
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
# qga is almost like qmp, but not quite, because:
|
||||
# - server doesn't send initial message
|
||||
@@ -16,9 +18,10 @@ class QgaSession:
|
||||
for _ in range(100):
|
||||
try:
|
||||
self.sock.connect(str(socket_file))
|
||||
return
|
||||
except ConnectionRefusedError:
|
||||
sleep(0.1)
|
||||
else:
|
||||
return
|
||||
self.sock.connect(str(socket_file))
|
||||
|
||||
def get_response(self) -> dict:
|
||||
@@ -59,7 +62,7 @@ class QgaSession:
|
||||
result = self.get_response()
|
||||
if "error" in result and result["error"]["desc"].startswith("PID"):
|
||||
msg = "PID could not be found"
|
||||
raise Exception(msg)
|
||||
raise ClanError(msg)
|
||||
if result["return"]["exited"]:
|
||||
break
|
||||
sleep(0.1)
|
||||
@@ -77,5 +80,5 @@ class QgaSession:
|
||||
)
|
||||
if check and exitcode != 0:
|
||||
msg = f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}"
|
||||
raise Exception(msg)
|
||||
raise ClanError(msg)
|
||||
return exitcode, stdout, stderr
|
||||
|
||||
@@ -16,6 +16,8 @@ import socket
|
||||
import types
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
|
||||
class QMPError(Exception):
|
||||
"""
|
||||
@@ -159,7 +161,6 @@ class QEMUMonitorProtocol:
|
||||
) -> None:
|
||||
# Implement context manager exit function.
|
||||
self.close()
|
||||
return False
|
||||
|
||||
def connect(self, negotiate: bool = True) -> dict[str, Any] | None:
|
||||
"""
|
||||
@@ -211,7 +212,7 @@ class QEMUMonitorProtocol:
|
||||
except OSError as err:
|
||||
if err.errno == errno.EPIPE:
|
||||
return None
|
||||
raise err
|
||||
raise
|
||||
resp = self.__json_read()
|
||||
self.logger.debug("<<< %s", resp)
|
||||
return resp
|
||||
@@ -242,7 +243,7 @@ class QEMUMonitorProtocol:
|
||||
"""
|
||||
ret = self.cmd(cmd, kwds)
|
||||
if "error" in ret:
|
||||
raise Exception(ret["error"]["desc"])
|
||||
raise ClanError(ret["error"]["desc"])
|
||||
return ret["return"]
|
||||
|
||||
def pull_event(self, wait: bool | float = False) -> dict[str, Any] | None:
|
||||
|
||||
@@ -56,10 +56,11 @@ def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
if out_file:
|
||||
out_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_file.write_text(res)
|
||||
return private_key, pubkey
|
||||
except subprocess.CalledProcessError as e:
|
||||
msg = "Failed to generate private sops key"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return private_key, pubkey
|
||||
|
||||
|
||||
def get_user_name(flake_dir: Path, user: str) -> str:
|
||||
|
||||
@@ -751,7 +751,7 @@ class HostGroup:
|
||||
"""
|
||||
threads = []
|
||||
results: list[HostResult[T]] = [
|
||||
HostResult(h, Exception(f"No result set for thread {i}"))
|
||||
HostResult(h, ClanError(f"No result set for thread {i}"))
|
||||
for (i, h) in enumerate(self.hosts)
|
||||
]
|
||||
for i, host in enumerate(self.hosts):
|
||||
@@ -800,7 +800,7 @@ def parse_deployment_address(
|
||||
result = urllib.parse.urlsplit("//" + hostname)
|
||||
if not result.hostname:
|
||||
msg = f"Invalid hostname: {hostname}"
|
||||
raise Exception(msg)
|
||||
raise ClanError(msg)
|
||||
hostname = result.hostname
|
||||
port = result.port
|
||||
meta = meta.copy()
|
||||
|
||||
@@ -82,9 +82,10 @@ def is_reachable(host: str) -> bool:
|
||||
try:
|
||||
sock.connect((host, 22))
|
||||
sock.close()
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def connect_ssh_from_json(ssh_data: dict[str, str]) -> None:
|
||||
|
||||
@@ -259,7 +259,7 @@ def generate_vars(
|
||||
machine, generator_name, regenerate
|
||||
)
|
||||
except Exception as exc:
|
||||
log.error(f"Failed to generate facts for {machine.name}: {exc}")
|
||||
log.exception(f"Failed to generate facts for {machine.name}")
|
||||
errors += [exc]
|
||||
if len(errors) > 0:
|
||||
msg = f"Failed to generate facts for {len(errors)} hosts. Check the logs above"
|
||||
|
||||
@@ -58,10 +58,11 @@ def build_vm(
|
||||
try:
|
||||
vm_data = json.loads(Path(nixos_config_file).read_text())
|
||||
vm_data["secrets_dir"] = str(secrets_dir)
|
||||
return vm_data
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to parse vm config: {e}"
|
||||
raise ClanError(msg) from e
|
||||
else:
|
||||
return vm_data
|
||||
|
||||
|
||||
def get_secrets(
|
||||
|
||||
@@ -15,9 +15,10 @@ def test_vsock_port(port: int) -> bool:
|
||||
s = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
|
||||
s.connect((VMADDR_CID_HYPERVISOR, port))
|
||||
s.close()
|
||||
return True
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
||||
2
pkgs/clan-cli/tests/fixture_error.py
Normal file
2
pkgs/clan-cli/tests/fixture_error.py
Normal file
@@ -0,0 +1,2 @@
|
||||
class FixtureError(Exception):
|
||||
pass
|
||||
@@ -10,6 +10,7 @@ from typing import NamedTuple
|
||||
|
||||
import pytest
|
||||
from clan_cli.dirs import nixpkgs_source
|
||||
from fixture_error import FixtureError
|
||||
from root import CLAN_CORE
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -226,7 +227,7 @@ def test_flake(
|
||||
if git_proc.returncode != 0:
|
||||
log.error(git_proc.stderr.decode())
|
||||
msg = "git diff on ./sops is not empty. This should not happen as all changes should be committed"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -235,7 +236,7 @@ def test_flake_with_core(
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
msg = "clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
yield from create_flake(
|
||||
temporary_home,
|
||||
"test_flake_with_core",
|
||||
@@ -252,11 +253,11 @@ def test_local_democlan(
|
||||
msg = (
|
||||
"DEMOCLAN_ROOT not set. This test requires the democlan flake to be present"
|
||||
)
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
democlan_p = Path(democlan).resolve()
|
||||
if not democlan_p.is_dir():
|
||||
msg = f"DEMOCLAN_ROOT ({democlan_p}) is not a directory. This test requires the democlan directory to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
|
||||
return FlakeForTest(democlan_p)
|
||||
|
||||
@@ -267,7 +268,7 @@ def test_flake_with_core_and_pass(
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
msg = "clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
yield from create_flake(
|
||||
temporary_home,
|
||||
"test_flake_with_core_and_pass",
|
||||
@@ -281,7 +282,7 @@ def test_flake_minimal(
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
msg = "clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
yield from create_flake(
|
||||
temporary_home,
|
||||
CLAN_CORE / "templates" / "minimal",
|
||||
|
||||
@@ -2,6 +2,10 @@ import subprocess
|
||||
import tempfile
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def is_valid_age_key(secret_key: str) -> bool:
|
||||
# Run the age-keygen command with the -y flag to check the key format
|
||||
result = subprocess.run(
|
||||
@@ -11,7 +15,7 @@ def is_valid_age_key(secret_key: str) -> bool:
|
||||
if result.returncode == 0:
|
||||
return True
|
||||
msg = f"Invalid age key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
|
||||
|
||||
def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool:
|
||||
@@ -27,7 +31,7 @@ def is_valid_ssh_key(secret_key: str, ssh_pub: str) -> bool:
|
||||
if result.returncode == 0:
|
||||
if result.stdout != ssh_pub:
|
||||
msg = f"Expected '{ssh_pub}' got '{result.stdout}' for ssh key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
return True
|
||||
msg = f"Invalid ssh key: {secret_key}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
|
||||
@@ -10,6 +10,7 @@ from tempfile import TemporaryDirectory
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixture_error import FixtureError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from command import Command
|
||||
@@ -134,5 +135,5 @@ def sshd(
|
||||
rc = proc.poll()
|
||||
if rc is not None:
|
||||
msg = f"sshd processes was terminated with {rc}"
|
||||
raise Exception(msg)
|
||||
raise FixtureError(msg)
|
||||
time.sleep(0.1)
|
||||
|
||||
@@ -5,6 +5,7 @@ from typing import Any, Generic, TypeVar
|
||||
import gi
|
||||
|
||||
gi.require_version("Gio", "2.0")
|
||||
from clan_cli.errors import ClanError
|
||||
from gi.repository import Gio, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -77,7 +78,7 @@ class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
key = self.key_gen(item)
|
||||
if key in self._items:
|
||||
msg = "Key already exists in the dictionary"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
if position < 0 or position > len(self._items):
|
||||
msg = "Index out of range"
|
||||
raise IndexError(msg)
|
||||
|
||||
@@ -60,10 +60,11 @@ class EmptySplash(Gtk.Box):
|
||||
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
|
||||
try:
|
||||
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
|
||||
return pixbuf
|
||||
except Exception as e:
|
||||
log.error(f"Failed to load image: {e}")
|
||||
except Exception:
|
||||
log.exception("Failed to load image")
|
||||
return None
|
||||
else:
|
||||
return pixbuf
|
||||
|
||||
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
|
||||
"""
|
||||
|
||||
@@ -280,8 +280,8 @@ class VMObject(GObject.Object):
|
||||
if not self._log_file:
|
||||
try:
|
||||
self._log_file = Path(proc.out_file).open() # noqa: SIM115
|
||||
except Exception as ex:
|
||||
log.exception(ex)
|
||||
except Exception:
|
||||
log.exception(f"Failed to open log file {proc.out_file}")
|
||||
self._log_file = None
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any, TypeVar
|
||||
|
||||
import gi
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
from clan_vm_manager.components.gkvstore import GKVStore
|
||||
from clan_vm_manager.components.interfaces import ClanConfig
|
||||
@@ -259,7 +260,7 @@ class ClanList(Gtk.Box):
|
||||
vm = ClanStore.use().set_logging_vm(target)
|
||||
if vm is None:
|
||||
msg = f"VM {target} not found"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
views = ViewStack.use().view
|
||||
# Reset the logs view
|
||||
@@ -267,7 +268,7 @@ class ClanList(Gtk.Box):
|
||||
|
||||
if logs is None:
|
||||
msg = "Logs view not found"
|
||||
raise ValueError(msg)
|
||||
raise ClanError(msg)
|
||||
|
||||
name = vm.machine.name if vm.machine else "Unknown"
|
||||
|
||||
|
||||
@@ -1,12 +1,17 @@
|
||||
# ruff: noqa: RUF001
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# Function to map JSON schemas and types to Python types
|
||||
def map_json_type(
|
||||
json_type: Any, nested_types: set[str] | None = None, parent: Any = None
|
||||
@@ -35,7 +40,7 @@ def map_json_type(
|
||||
if json_type == "null":
|
||||
return {"None"}
|
||||
msg = f"Python type not found for {json_type}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
|
||||
|
||||
known_classes = set()
|
||||
@@ -98,7 +103,7 @@ def field_def_from_default_type(
|
||||
Or report this problem to the clan team. So the class generator can be improved.
|
||||
#################################################
|
||||
"""
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
|
||||
return None
|
||||
|
||||
@@ -148,7 +153,7 @@ def field_def_from_default_value(
|
||||
)
|
||||
# Other default values unhandled yet.
|
||||
msg = f"Unhandled default value for field '{field_name}' - default value: {default_value}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
|
||||
|
||||
def get_field_def(
|
||||
@@ -200,7 +205,7 @@ def generate_dataclass(schema: dict[str, Any], class_name: str = root_class) ->
|
||||
|
||||
if (prop_type is None) and (not union_variants):
|
||||
msg = f"Type not found for property {prop} {prop_info}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
|
||||
if union_variants:
|
||||
field_types = map_json_type(union_variants)
|
||||
@@ -337,7 +342,11 @@ def main() -> None:
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
args.func(args)
|
||||
try:
|
||||
args.func(args)
|
||||
except Error as e:
|
||||
print(e)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
class Error(Exception):
|
||||
pass
|
||||
@@ -48,11 +48,12 @@ def send_join_request_api(host: str, port: int) -> bool:
|
||||
body = response.split("\n")[-1]
|
||||
print(body)
|
||||
moonlight.terminate()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
moonlight.terminate()
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
||||
@@ -78,9 +79,10 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
||||
lines = response.split("\n")
|
||||
body = "\n".join(lines[2:])[2:]
|
||||
print(body)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
else:
|
||||
return True
|
||||
# TODO: fix
|
||||
try:
|
||||
print(f"response: {response}")
|
||||
|
||||
@@ -23,21 +23,23 @@ class MoonlightPairing:
|
||||
)
|
||||
thread.start()
|
||||
print("Thread started")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(
|
||||
"Error occurred while starting the process: ", str(e), file=sys.stderr
|
||||
)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def check(self, host: str) -> bool:
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["moonlight", "list", "localhost", host], check=True
|
||||
)
|
||||
return result.returncode == 0
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
else:
|
||||
return result.returncode == 0
|
||||
|
||||
def terminate(self) -> None:
|
||||
if self.process:
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from moonlight_sunshine_accept.errors import Error
|
||||
|
||||
|
||||
def parse_moonlight_uri(uri: str) -> tuple[str, int | None]:
|
||||
print(uri)
|
||||
@@ -11,10 +13,10 @@ def parse_moonlight_uri(uri: str) -> tuple[str, int | None]:
|
||||
parsed = urlparse(uri)
|
||||
if parsed.scheme != "moonlight":
|
||||
msg = f"Invalid moonlight URI: {uri}"
|
||||
raise ValueError(msg)
|
||||
raise Error(msg)
|
||||
hostname = parsed.hostname
|
||||
if hostname is None:
|
||||
msg = f"Invalid moonlight URI: {uri}"
|
||||
raise ValueError
|
||||
raise Error(msg)
|
||||
port = parsed.port
|
||||
return (hostname, port)
|
||||
|
||||
@@ -39,6 +39,7 @@ lint.select = [
|
||||
"SLOT",
|
||||
"T10",
|
||||
"TID",
|
||||
"TRY",
|
||||
"U",
|
||||
"YTT",
|
||||
]
|
||||
@@ -55,4 +56,5 @@ lint.ignore = [
|
||||
"SIM102",
|
||||
"SIM108",
|
||||
"SIM112",
|
||||
"ISC001",
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user