diff --git a/lib/test/container-test-driver/test_driver/__init__.py b/lib/test/container-test-driver/test_driver/__init__.py index b17af3a8c..cc5a8d713 100644 --- a/lib/test/container-test-driver/test_driver/__init__.py +++ b/lib/test/container-test-driver/test_driver/__init__.py @@ -50,7 +50,7 @@ def init_test_environment() -> None: passwd_content = """root:x:0:0:Root:/root:/bin/sh nixbld:x:1000:100:Nix build user:/tmp:/bin/sh nobody:x:65534:65534:Nobody:/:/bin/sh -""" +""" # noqa: S105 - This is not a password, it's a Unix passwd file format for testing with NamedTemporaryFile(mode="w", delete=False, prefix="test-passwd-") as f: f.write(passwd_content) @@ -639,7 +639,7 @@ class Driver: def test_script(self) -> None: """Run the test script""" - exec(self.testscript, self.test_symbols(), None) + exec(self.testscript, self.test_symbols(), None) # noqa: S102 def run_tests(self) -> None: """Run the test script (for non-interactive test runs)""" diff --git a/nixosModules/clanCore/zerotier/generate.py b/nixosModules/clanCore/zerotier/generate.py index e4386b46d..07ece35ee 100644 --- a/nixosModules/clanCore/zerotier/generate.py +++ b/nixosModules/clanCore/zerotier/generate.py @@ -84,8 +84,9 @@ class ZerotierController: headers["Content-Type"] = "application/json" headers["X-ZT1-AUTH"] = self.authtoken url = f"http://127.0.0.1:{self.port}{path}" - req = urllib.request.Request(url, headers=headers, method=method, data=body) - resp = urllib.request.urlopen(req) + # Safe: only connecting to localhost zerotier API + req = urllib.request.Request(url, headers=headers, method=method, data=body) # noqa: S310 + resp = urllib.request.urlopen(req) # noqa: S310 return json.load(resp) def status(self) -> dict[str, Any]: diff --git a/pkgs/clan-app/clan_app/deps/http/test_http_api.py b/pkgs/clan-app/clan_app/deps/http/test_http_api.py index 1895ba880..b68a6c2ee 100644 --- a/pkgs/clan-app/clan_app/deps/http/test_http_api.py +++ b/pkgs/clan-app/clan_app/deps/http/test_http_api.py @@ -151,14 +151,14 @@ class TestHttpApiServer: try: # Test root endpoint - response = urlopen("http://127.0.0.1:8081/") + response = urlopen("http://127.0.0.1:8081/") # noqa: S310 data: dict = json.loads(response.read().decode()) assert data["body"]["status"] == "success" assert data["body"]["data"]["message"] == "Clan API Server" assert data["body"]["data"]["version"] == "1.0.0" # Test methods endpoint - response = urlopen("http://127.0.0.1:8081/api/methods") + response = urlopen("http://127.0.0.1:8081/api/methods") # noqa: S310 data = json.loads(response.read().decode()) assert data["body"]["status"] == "success" assert "test_method" in data["body"]["data"]["methods"] @@ -171,7 +171,7 @@ class TestHttpApiServer: data=json.dumps(request_data).encode(), headers={"Content-Type": "application/json"}, ) - response = urlopen(req) + response = urlopen(req) # noqa: S310 data = json.loads(response.read().decode()) # Response should be BackendResponse format @@ -194,7 +194,7 @@ class TestHttpApiServer: try: # Test 404 error - res = urlopen("http://127.0.0.1:8081/nonexistent") + res = urlopen("http://127.0.0.1:8081/nonexistent") # noqa: S310 assert res.status == 200 body = json.loads(res.read().decode())["body"] assert body["status"] == "error" @@ -207,7 +207,7 @@ class TestHttpApiServer: headers={"Content-Type": "application/json"}, ) - res = urlopen(req) + res = urlopen(req) # noqa: S310 assert res.status == 200 body = json.loads(res.read().decode())["body"] assert body["status"] == "error" @@ -219,7 +219,7 @@ class TestHttpApiServer: headers={"Content-Type": "application/json"}, ) - res = urlopen(req) + res = urlopen(req) # noqa: S310 assert res.status == 200 body = json.loads(res.read().decode())["body"] assert body["status"] == "error" @@ -240,7 +240,7 @@ class TestHttpApiServer: return "OPTIONS" req: Request = OptionsRequest("http://127.0.0.1:8081/api/call/test_method") - response = urlopen(req) + response = urlopen(req) # noqa: S310 # Check CORS headers headers = response.info() @@ -290,7 +290,7 @@ class TestIntegration: data=json.dumps(request_data).encode(), headers={"Content-Type": "application/json"}, ) - response = urlopen(req) + response = urlopen(req) # noqa: S310 data: dict = json.loads(response.read().decode()) # Verify response in BackendResponse format @@ -341,7 +341,7 @@ class TestIntegration: data=json.dumps(request_data).encode(), headers={"Content-Type": "application/json"}, ) - response = urlopen(req) + response = urlopen(req) # noqa: S310 data: dict = json.loads(response.read().decode()) # thread.join() @@ -365,7 +365,7 @@ class TestIntegration: data=json.dumps(request_data).encode(), headers={"Content-Type": "application/json"}, ) - response = urlopen(req) + response = urlopen(req) # noqa: S310 data: dict = json.loads(response.read().decode()) assert "body" in data diff --git a/pkgs/clan-cli/clan_cli/completions.py b/pkgs/clan-cli/clan_cli/completions.py index 2f0781874..bd89eb524 100644 --- a/pkgs/clan-cli/clan_cli/completions.py +++ b/pkgs/clan-cli/clan_cli/completions.py @@ -359,7 +359,7 @@ def complete_vars_for_machine( var_id = f"{generator_name}/{var_name}" vars_list.append(var_id) - except Exception: + except (OSError, PermissionError): pass vars_dict = dict.fromkeys(vars_list, "var") diff --git a/pkgs/clan-cli/clan_cli/machines/update.py b/pkgs/clan-cli/clan_cli/machines/update.py index 53308cf76..eb274e3e6 100644 --- a/pkgs/clan-cli/clan_cli/machines/update.py +++ b/pkgs/clan-cli/clan_cli/machines/update.py @@ -70,7 +70,7 @@ def requires_explicit_update(m: Machine) -> bool: try: if m.select("config.clan.deployment.requireExplicitUpdate"): return False - except Exception: + except (ClanError, AttributeError): pass try: diff --git a/pkgs/clan-cli/clan_cli/secrets/groups.py b/pkgs/clan-cli/clan_cli/secrets/groups.py index e245bbb51..6f04a0aba 100644 --- a/pkgs/clan-cli/clan_cli/secrets/groups.py +++ b/pkgs/clan-cli/clan_cli/secrets/groups.py @@ -262,7 +262,9 @@ def add_secret( def get_groups(flake_dir: Path, what: str, name: str) -> list[str]: """Returns the list of group names the given user or machine is part of.""" - assert what in {"users", "machines"} + if what not in {"users", "machines"}: + msg = f"Invalid 'what' parameter: {what}. Must be 'users' or 'machines'" + raise ClanError(msg) groups_dir = sops_groups_folder(flake_dir) if not groups_dir.exists(): diff --git a/pkgs/clan-cli/clan_cli/tests/fixtures_flakes.py b/pkgs/clan-cli/clan_cli/tests/fixtures_flakes.py index 6198ac25d..c34b01b10 100644 --- a/pkgs/clan-cli/clan_cli/tests/fixtures_flakes.py +++ b/pkgs/clan-cli/clan_cli/tests/fixtures_flakes.py @@ -176,7 +176,7 @@ class ClanFlake: self.temporary_home = temporary_home self.path = temporary_home / "flake" if not suppress_tmp_home_warning: - if "/tmp" not in str(os.environ.get("HOME")): + if "/tmp" not in str(os.environ.get("HOME")): # noqa: S108 - Checking if HOME is in temp directory log.warning( f"!! $HOME does not point to a temp directory!! HOME={os.environ['HOME']}", ) @@ -368,7 +368,7 @@ def create_flake( check=True, ) - if "/tmp" not in str(os.environ.get("HOME")): + if "/tmp" not in str(os.environ.get("HOME")): # noqa: S108 - Checking if HOME is in temp directory log.warning( f"!! $HOME does not point to a temp directory!! HOME={os.environ['HOME']}", ) @@ -441,14 +441,44 @@ def writable_clan_core( # Copy all tracked and untracked files (excluding ignored) # Using git ls-files with -z for null-terminated output to handle filenames with spaces - sp.run( - f"(git ls-files -z; git ls-files -z --others --exclude-standard) | " - f"xargs -0 cp --parents -t {temp_flake}/", - shell=True, - cwd=clan_core, - check=True, + + # Get tracked files + tracked_files = ( + sp.run( + ["git", "ls-files", "-z"], + cwd=clan_core, + capture_output=True, + text=True, + check=True, + ) + .stdout.rstrip("\0") + .split("\0") ) + # Get untracked files (excluding ignored) + untracked_files = ( + sp.run( + ["git", "ls-files", "-z", "--others", "--exclude-standard"], + cwd=clan_core, + capture_output=True, + text=True, + check=True, + ) + .stdout.rstrip("\0") + .split("\0") + ) + + # Combine and filter out empty strings + all_files = [f for f in tracked_files + untracked_files if f] + + # Copy files preserving directory structure + if all_files: + sp.run( + ["cp", "--parents", "-t", str(temp_flake), "--", *all_files], + cwd=clan_core, + check=True, + ) + # Copy .git directory to maintain git functionality if (clan_core / ".git").is_dir(): shutil.copytree( diff --git a/pkgs/clan-cli/clan_cli/tests/temporary_dir.py b/pkgs/clan-cli/clan_cli/tests/temporary_dir.py index c6d5ce2f8..47bb433ac 100644 --- a/pkgs/clan-cli/clan_cli/tests/temporary_dir.py +++ b/pkgs/clan-cli/clan_cli/tests/temporary_dir.py @@ -14,7 +14,7 @@ TEMPDIR = None # macOS' default temporary directory is too long for unix sockets # This can break applications such as gpg-agent if platform == "darwin": - TEMPDIR = Path("/tmp") + TEMPDIR = Path("/tmp") # noqa: S108 - Required on macOS due to socket path length limits @pytest.fixture diff --git a/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py b/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py index ef80a661a..c71db38f8 100644 --- a/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py @@ -235,7 +235,7 @@ def test_multiple_user_keys( # let's do some setting and getting of secrets def random_str() -> str: - return "".join(random.choices(string.ascii_letters, k=10)) + return "".join(random.choices(string.ascii_letters, k=10)) # noqa: S311 - Test data generation, not cryptographic for user_key in user_keys: # set a secret using each of the user's private keys diff --git a/pkgs/clan-cli/clan_cli/tests/test_ssh_local.py b/pkgs/clan-cli/clan_cli/tests/test_ssh_local.py index ba535696c..271f6d131 100644 --- a/pkgs/clan-cli/clan_cli/tests/test_ssh_local.py +++ b/pkgs/clan-cli/clan_cli/tests/test_ssh_local.py @@ -10,7 +10,7 @@ def test_run_environment(runtime: AsyncRuntime) -> None: None, host.run_local, ["echo $env_var"], - RunOpts(shell=True, log=Log.STDERR), + RunOpts(shell=True, log=Log.STDERR), # noqa: S604 - Testing shell env var expansion extra_env={"env_var": "true"}, ) @@ -20,7 +20,7 @@ def test_run_environment(runtime: AsyncRuntime) -> None: None, host.run_local, ["env"], - RunOpts(shell=True, log=Log.STDERR), + RunOpts(shell=True, log=Log.STDERR), # noqa: S604 - Testing shell env passing extra_env={"env_var": "true"}, ) assert "env_var=true" in p3.wait().result.stdout @@ -43,7 +43,7 @@ def test_timeout(runtime: AsyncRuntime) -> None: def test_run_exception(runtime: AsyncRuntime) -> None: - p1 = runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) + p1 = runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) # noqa: S604 - Testing shell error handling assert p1.wait().error is not None diff --git a/pkgs/clan-cli/clan_cli/vars/migration.py b/pkgs/clan-cli/clan_cli/vars/migration.py index 5057d3cb5..7b1b40736 100644 --- a/pkgs/clan-cli/clan_cli/vars/migration.py +++ b/pkgs/clan-cli/clan_cli/vars/migration.py @@ -80,7 +80,9 @@ def migrate_files( files_to_commit = [] for file in generator.files: if _migration_file_exists(machine, generator, file.name): - assert generator.migrate_fact is not None + if generator.migrate_fact is None: + msg = f"Generator {generator.name} has no migrate_fact defined" + raise ClanError(msg) files_to_commit += _migrate_file( machine, generator, diff --git a/pkgs/clan-cli/clan_cli/vms/qemu.py b/pkgs/clan-cli/clan_cli/vms/qemu.py index f60713ab7..fd4818c46 100644 --- a/pkgs/clan-cli/clan_cli/vms/qemu.py +++ b/pkgs/clan-cli/clan_cli/vms/qemu.py @@ -1,5 +1,5 @@ import platform -import random +import secrets from collections.abc import Generator from contextlib import contextmanager from dataclasses import dataclass @@ -27,7 +27,7 @@ def graphics_options(vm: VmConfig) -> GraphicOptions: if vm.waypipe.enable: # FIXME: check for collisions - cid = random.randint(1, 2**32) + cid = secrets.randbelow(2**32 - 1) + 1 # Generate random CID between 1 and 2^32 # fmt: off return GraphicOptions([ *common, diff --git a/pkgs/clan-cli/clan_lib/api/serde.py b/pkgs/clan-cli/clan_lib/api/serde.py index 35b9cae3e..463ddc6e6 100644 --- a/pkgs/clan-cli/clan_lib/api/serde.py +++ b/pkgs/clan-cli/clan_lib/api/serde.py @@ -223,7 +223,9 @@ def construct_value( # If the field is another dataclass # Field_value must be a dictionary if is_dataclass(t) and isinstance(field_value, dict): - assert isinstance(t, type) + if not isinstance(t, type): + msg = f"Expected a type, got {t}" + raise ClanError(msg) return construct_dataclass(t, field_value) # If the field expects a path diff --git a/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py b/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py index 0ea6b2d47..6f4910082 100644 --- a/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py +++ b/pkgs/clan-cli/clan_lib/api/type_to_jsonschema.py @@ -21,7 +21,6 @@ from typing import ( ) from clan_lib.api.serde import dataclass_to_dict -from clan_lib.errors import ClanError class JSchemaTypeError(Exception): @@ -126,7 +125,7 @@ def type_to_dict( continue if isinstance(f.type, str): msg = f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" - raise ClanError(msg) + raise JSchemaTypeError(msg) properties[f.metadata.get("alias", f.name)] = type_to_dict( f.type, f"{scope} {t.__name__}.{f.name}", # type: ignore diff --git a/pkgs/clan-cli/clan_lib/async_run/__init__.py b/pkgs/clan-cli/clan_lib/async_run/__init__.py index 3642abc6c..515c59fc1 100644 --- a/pkgs/clan-cli/clan_lib/async_run/__init__.py +++ b/pkgs/clan-cli/clan_lib/async_run/__init__.py @@ -281,7 +281,9 @@ class AsyncRuntime: for name, task in self.tasks.items(): if task.finished and task.async_opts.check: - assert task.result is not None + if task.result is None: + msg = f"Task {name} finished but has no result" + raise ClanError(msg) error = task.result.error if error is not None: if log.isEnabledFor(logging.DEBUG): diff --git a/pkgs/clan-cli/clan_lib/clan/get_test.py b/pkgs/clan-cli/clan_lib/clan/get_test.py index aa3f5d009..f28736da9 100644 --- a/pkgs/clan-cli/clan_lib/clan/get_test.py +++ b/pkgs/clan-cli/clan_lib/clan/get_test.py @@ -13,7 +13,7 @@ def test_get_clan_details_invalid_flake() -> None: get_clan_details(invalid_flake) with pytest.raises(FlakeInvalidError): - get_clan_details(Flake("/tmp")) + get_clan_details(Flake("/tmp")) # noqa: S108 @pytest.mark.with_core diff --git a/pkgs/clan-cli/clan_lib/flake/flake.py b/pkgs/clan-cli/clan_lib/flake/flake.py index f3b58ad92..6aab9d066 100644 --- a/pkgs/clan-cli/clan_lib/flake/flake.py +++ b/pkgs/clan-cli/clan_lib/flake/flake.py @@ -906,7 +906,7 @@ class Flake: if self.hash is None: msg = "Hash cannot be None" raise ClanError(msg) - hashed_hash = sha1(self.hash.encode()).hexdigest() + hashed_hash = sha1(self.hash.encode()).hexdigest() # noqa: S324 - SHA1 used only for cache directory naming, not security self.flake_cache_path = ( Path(user_cache_dir()) / "clan" / "flakes-v2" / hashed_hash ) diff --git a/pkgs/clan-cli/clan_lib/log_manager/api.py b/pkgs/clan-cli/clan_lib/log_manager/api.py index eb2608f36..22c5eca64 100644 --- a/pkgs/clan-cli/clan_lib/log_manager/api.py +++ b/pkgs/clan-cli/clan_lib/log_manager/api.py @@ -13,10 +13,11 @@ def list_log_days() -> list[str]: A list of date strings in YYYY-MM-DD format representing all available log days. Raises: - AssertionError: If LOG_MANAGER_INSTANCE is not initialized. - + ClanError: If LOG_MANAGER_INSTANCE is not initialized. """ - assert LOG_MANAGER_INSTANCE is not None + if LOG_MANAGER_INSTANCE is None: + msg = "LOG_MANAGER_INSTANCE is not initialized" + raise ClanError(msg) return [day.date_day for day in LOG_MANAGER_INSTANCE.list_log_days()] @@ -35,10 +36,12 @@ def list_log_groups( A list of folder names (decoded) at the specified path level. Raises: - AssertionError: If LOG_MANAGER_INSTANCE is not initialized. + ClanError: If LOG_MANAGER_INSTANCE is not initialized. """ - assert LOG_MANAGER_INSTANCE is not None + if LOG_MANAGER_INSTANCE is None: + msg = "LOG_MANAGER_INSTANCE is not initialized" + raise ClanError(msg) return LOG_MANAGER_INSTANCE.filter(selector, date_day=date_day) @@ -59,11 +62,11 @@ def get_log_file( The contents of the log file as a string. Raises: - ClanError: If the log file is not found. - AssertionError: If LOG_MANAGER_INSTANCE is not initialized. - + ClanError: If the log file is not found or LOG_MANAGER_INSTANCE is not initialized. """ - assert LOG_MANAGER_INSTANCE is not None + if LOG_MANAGER_INSTANCE is None: + msg = "LOG_MANAGER_INSTANCE is not initialized" + raise ClanError(msg) log_file = LOG_MANAGER_INSTANCE.get_log_file( op_key=id_key, diff --git a/pkgs/clan-cli/clan_lib/machines/morph.py b/pkgs/clan-cli/clan_lib/machines/morph.py index 009b0afb5..7604c7b3e 100644 --- a/pkgs/clan-cli/clan_lib/machines/morph.py +++ b/pkgs/clan-cli/clan_lib/machines/morph.py @@ -1,8 +1,8 @@ import json import logging import os -import random import re +import secrets from pathlib import Path from tempfile import TemporaryDirectory @@ -34,8 +34,8 @@ def is_local_input(node: dict[str, dict[str, str]]) -> bool: def random_hostname() -> str: adjectives = ["wacky", "happy", "fluffy", "silly", "quirky", "zany", "bouncy"] nouns = ["unicorn", "penguin", "goose", "ninja", "octopus", "hamster", "robot"] - adjective = random.choice(adjectives) - noun = random.choice(nouns) + adjective = secrets.choice(adjectives) + noun = secrets.choice(nouns) return f"{adjective}-{noun}" diff --git a/pkgs/clan-cli/clan_lib/metrics/telegraf.py b/pkgs/clan-cli/clan_lib/metrics/telegraf.py index 4272752e0..2dbfb4315 100644 --- a/pkgs/clan-cli/clan_lib/metrics/telegraf.py +++ b/pkgs/clan-cli/clan_lib/metrics/telegraf.py @@ -53,10 +53,10 @@ def get_metrics( encoded_credentials = b64encode(credentials.encode("utf-8")).decode("utf-8") headers = {"Authorization": f"Basic {encoded_credentials}"} - req = urllib.request.Request(url, headers=headers) + req = urllib.request.Request(url, headers=headers) # noqa: S310 try: - response = urllib.request.urlopen(req) + response = urllib.request.urlopen(req) # noqa: S310 for line in response: line_str = line.decode("utf-8").strip() if line_str: diff --git a/pkgs/clan-cli/clan_lib/ssh/remote.py b/pkgs/clan-cli/clan_lib/ssh/remote.py index d7cec5d68..f89f62617 100644 --- a/pkgs/clan-cli/clan_lib/ssh/remote.py +++ b/pkgs/clan-cli/clan_lib/ssh/remote.py @@ -139,7 +139,7 @@ class Remote: if sys.platform == "darwin" and os.environ.get("TMPDIR", "").startswith( "/var/folders/", ): - directory = "/tmp/" + directory = "/tmp/" # noqa: S108 - Required on macOS due to bugs with default TMPDIR with TemporaryDirectory(prefix="clan-ssh", dir=directory) as temp_dir: remote = Remote( address=self.address, diff --git a/pkgs/clan-cli/clan_lib/ssh/remote_test.py b/pkgs/clan-cli/clan_lib/ssh/remote_test.py index 40d93205b..62ee64753 100644 --- a/pkgs/clan-cli/clan_lib/ssh/remote_test.py +++ b/pkgs/clan-cli/clan_lib/ssh/remote_test.py @@ -157,7 +157,7 @@ def test_run_environment(hosts: list[Remote], runtime: AsyncRuntime) -> None: None, host.run_local, ["echo $env_var"], - RunOpts(shell=True, log=Log.STDERR), + RunOpts(shell=True, log=Log.STDERR), # noqa: S604 extra_env={"env_var": "true"}, ) assert proc.wait().result.stdout == "true\n" @@ -230,16 +230,16 @@ def test_run_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None: None, host.run_local, ["exit 1"], - RunOpts(shell=True, check=False), + RunOpts(shell=True, check=False), # noqa: S604 ) assert proc.wait().result.returncode == 1 try: for host in hosts: - runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) + runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) # noqa: S604 runtime.join_all() runtime.check_all() - except Exception: + except Exception: # noqa: S110 pass else: msg = "should have raised Exception" @@ -248,14 +248,14 @@ def test_run_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None: def test_run_function_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None: def some_func(h: Remote) -> CmdOut: - return h.run_local(["exit 1"], RunOpts(shell=True)) + return h.run_local(["exit 1"], RunOpts(shell=True)) # noqa: S604 try: for host in hosts: runtime.async_run(None, some_func, host) runtime.join_all() runtime.check_all() - except Exception: + except Exception: # noqa: S110 pass else: msg = "should have raised Exception" diff --git a/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py b/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py index d5cf3a9b5..df87ac360 100644 --- a/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py +++ b/pkgs/clan-cli/clan_lib/ssh/sudo_askpass_proxy.py @@ -74,17 +74,19 @@ class SudoAskpassProxy: def _process(self, ssh_process: subprocess.Popen) -> None: """Execute the remote command with password proxying""" # Monitor SSH output for password requests - assert ssh_process.stdout is not None, "SSH process stdout is None" + if ssh_process.stdout is None: + msg = "SSH process stdout is None" + raise ClanError(msg) try: for line in ssh_process.stdout: line = line.strip() if line.startswith("PASSWORD_REQUESTED:"): prompt = line[len("PASSWORD_REQUESTED:") :].strip() password = self.handle_password_request(prompt) - print(password, file=ssh_process.stdin) if ssh_process.stdin is None: msg = "SSH process stdin is None" raise ClanError(msg) + print(password, file=ssh_process.stdin) ssh_process.stdin.flush() else: print(line) @@ -137,10 +139,10 @@ class SudoAskpassProxy: pass # Unclear why we have to close this manually, but pytest reports unclosed fd - assert self.ssh_process.stdout is not None - self.ssh_process.stdout.close() - assert self.ssh_process.stdin is not None - self.ssh_process.stdin.close() + if self.ssh_process.stdout is not None: + self.ssh_process.stdout.close() + if self.ssh_process.stdin is not None: + self.ssh_process.stdin.close() self.ssh_process = None if self.thread: self.thread.join() diff --git a/pkgs/clan-cli/clan_lib/ssh/upload.py b/pkgs/clan-cli/clan_lib/ssh/upload.py index f75a44b29..457eb2e33 100644 --- a/pkgs/clan-cli/clan_lib/ssh/upload.py +++ b/pkgs/clan-cli/clan_lib/ssh/upload.py @@ -33,7 +33,7 @@ def upload( # Exceptions: Allow depth 2 if the path starts with /tmp/, /root/, or /etc/. # This allows destinations like /tmp/mydir or /etc/conf.d, but not /tmp or /etc directly. is_allowed_exception = depth >= 2 and ( - str(remote_dest).startswith("/tmp/") + str(remote_dest).startswith("/tmp/") # noqa: S108 - Path validation check or str(remote_dest).startswith("/root/") or str(remote_dest).startswith("/etc/") ) diff --git a/pkgs/clan-cli/clan_lib/templates/handler.py b/pkgs/clan-cli/clan_lib/templates/handler.py index 0347d6ddb..2ceefd814 100644 --- a/pkgs/clan-cli/clan_lib/templates/handler.py +++ b/pkgs/clan-cli/clan_lib/templates/handler.py @@ -59,7 +59,7 @@ def machine_template( try: template = template_flake.select(template_selector) except ClanError as e: - msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" + msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" # noqa: S608 raise ClanError(msg) from e src = template.get("path") @@ -152,7 +152,7 @@ def clan_template( template_flake = builtin_flake printable_template_ref = f"{clan_templates()}#{builtin_selector}" except ClanError: - msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" + msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" # noqa: S608 raise ClanError(msg) from e src = template.get("path") diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py index 4a3781420..3f19e8c15 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/app.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -11,6 +11,7 @@ gi.require_version("Gtk", "4.0") gi.require_version("Adw", "1") from clan_lib.custom_logger import setup_logging +from clan_lib.errors import ClanError from gi.repository import Adw, Gdk, Gio, Gtk from clan_vm_manager.components.interfaces import ClanConfig @@ -118,7 +119,9 @@ class MainApplication(Adw.Application): css_provider = Gtk.CssProvider() css_provider.load_from_path(str(resource_path)) display = Gdk.Display.get_default() - assert display is not None + if display is None: + msg = "Could not get default display" + raise ClanError(msg) Gtk.StyleContext.add_provider_for_display( display, css_provider, diff --git a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py index e5ef3511d..97b40db4d 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/views/list.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/views/list.py @@ -116,7 +116,9 @@ class ClanList(Gtk.Box): add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s")) add_action.connect("activate", self.on_add) app = Gio.Application.get_default() - assert app is not None + if app is None: + msg = "Could not get default application" + raise ClanError(msg) app.add_action(add_action) # menu_model = Gio.Menu() @@ -214,7 +216,9 @@ class ClanList(Gtk.Box): build_logs_action.set_enabled(False) app = Gio.Application.get_default() - assert app is not None + if app is None: + msg = "Could not get default application" + raise ClanError(msg) app.add_action(open_action) app.add_action(build_logs_action) diff --git a/pkgs/testing/flake-module.nix b/pkgs/testing/flake-module.nix index 848e68ea5..36dc2a23d 100644 --- a/pkgs/testing/flake-module.nix +++ b/pkgs/testing/flake-module.nix @@ -15,7 +15,7 @@ mkdir -p "$CLAN_TEST_STORE/nix/store" mkdir -p "$CLAN_TEST_STORE/nix/var/nix/gcroots" if [[ -n "''${closureInfo-}" ]]; then - ${pkgs.findutils}/bin/xargs -P"$(nproc)" ${pkgs.coreutils}/bin/cp --recursive --reflink=auto --target-directory "$CLAN_TEST_STORE/nix/store" < "$closureInfo/store-paths" + ${pkgs.findutils}/bin/xargs -r -P"$(nproc)" ${pkgs.coreutils}/bin/cp --recursive --no-dereference --reflink=auto --target-directory "$CLAN_TEST_STORE/nix/store" < "$closureInfo/store-paths" ${pkgs.nix}/bin/nix-store --load-db --store "$CLAN_TEST_STORE" < "$closureInfo/registration" fi ''; diff --git a/pkgs/testing/nixos_test_lib/nix_setup.py b/pkgs/testing/nixos_test_lib/nix_setup.py index a93cba3f5..2280ed650 100644 --- a/pkgs/testing/nixos_test_lib/nix_setup.py +++ b/pkgs/testing/nixos_test_lib/nix_setup.py @@ -53,8 +53,10 @@ def setup_nix_in_nix(closure_info: str | None) -> None: subprocess.run( # noqa: S603 [ XARGS_BIN, + "-r", f"-P{num_cpus}", # Use all available CPUs CP_BIN, + "--no-dereference", "--recursive", "--reflink=auto", # Use copy-on-write if available "--target-directory", diff --git a/pkgs/zerotier-members/zerotier-members.py b/pkgs/zerotier-members/zerotier-members.py index 3743db7c4..d1f267c36 100755 --- a/pkgs/zerotier-members/zerotier-members.py +++ b/pkgs/zerotier-members/zerotier-members.py @@ -17,8 +17,16 @@ def compute_zerotier_ip(network_id: str, identity: str) -> ipaddress.IPv6Address if len(network_id) != 16: msg = f"network_id must be 16 characters long, got {network_id}" raise ClanError(msg) - nwid = int(network_id, 16) - node_id = int(identity, 16) + try: + nwid = int(network_id, 16) + except ValueError: + msg = f"network_id must be a valid hexadecimal string, got {network_id}" + raise ClanError(msg) from None + try: + node_id = int(identity, 16) + except ValueError: + msg = f"identity must be a valid hexadecimal string, got {identity}" + raise ClanError(msg) from None addr_parts = bytearray( [ 0xFD,