Merge pull request 'ruff-2-security-fixes' (#4931) from ruff-2-security-fixes into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4931
This commit is contained in:
Mic92
2025-08-25 12:30:20 +00:00
30 changed files with 135 additions and 75 deletions

View File

@@ -50,7 +50,7 @@ def init_test_environment() -> None:
passwd_content = """root:x:0:0:Root:/root:/bin/sh passwd_content = """root:x:0:0:Root:/root:/bin/sh
nixbld:x:1000:100:Nix build user:/tmp:/bin/sh nixbld:x:1000:100:Nix build user:/tmp:/bin/sh
nobody:x:65534:65534:Nobody:/:/bin/sh nobody:x:65534:65534:Nobody:/:/bin/sh
""" """ # noqa: S105 - This is not a password, it's a Unix passwd file format for testing
with NamedTemporaryFile(mode="w", delete=False, prefix="test-passwd-") as f: with NamedTemporaryFile(mode="w", delete=False, prefix="test-passwd-") as f:
f.write(passwd_content) f.write(passwd_content)
@@ -639,7 +639,7 @@ class Driver:
def test_script(self) -> None: def test_script(self) -> None:
"""Run the test script""" """Run the test script"""
exec(self.testscript, self.test_symbols(), None) exec(self.testscript, self.test_symbols(), None) # noqa: S102
def run_tests(self) -> None: def run_tests(self) -> None:
"""Run the test script (for non-interactive test runs)""" """Run the test script (for non-interactive test runs)"""

View File

@@ -84,8 +84,9 @@ class ZerotierController:
headers["Content-Type"] = "application/json" headers["Content-Type"] = "application/json"
headers["X-ZT1-AUTH"] = self.authtoken headers["X-ZT1-AUTH"] = self.authtoken
url = f"http://127.0.0.1:{self.port}{path}" url = f"http://127.0.0.1:{self.port}{path}"
req = urllib.request.Request(url, headers=headers, method=method, data=body) # Safe: only connecting to localhost zerotier API
resp = urllib.request.urlopen(req) req = urllib.request.Request(url, headers=headers, method=method, data=body) # noqa: S310
resp = urllib.request.urlopen(req) # noqa: S310
return json.load(resp) return json.load(resp)
def status(self) -> dict[str, Any]: def status(self) -> dict[str, Any]:

View File

@@ -151,14 +151,14 @@ class TestHttpApiServer:
try: try:
# Test root endpoint # Test root endpoint
response = urlopen("http://127.0.0.1:8081/") response = urlopen("http://127.0.0.1:8081/") # noqa: S310
data: dict = json.loads(response.read().decode()) data: dict = json.loads(response.read().decode())
assert data["body"]["status"] == "success" assert data["body"]["status"] == "success"
assert data["body"]["data"]["message"] == "Clan API Server" assert data["body"]["data"]["message"] == "Clan API Server"
assert data["body"]["data"]["version"] == "1.0.0" assert data["body"]["data"]["version"] == "1.0.0"
# Test methods endpoint # Test methods endpoint
response = urlopen("http://127.0.0.1:8081/api/methods") response = urlopen("http://127.0.0.1:8081/api/methods") # noqa: S310
data = json.loads(response.read().decode()) data = json.loads(response.read().decode())
assert data["body"]["status"] == "success" assert data["body"]["status"] == "success"
assert "test_method" in data["body"]["data"]["methods"] assert "test_method" in data["body"]["data"]["methods"]
@@ -171,7 +171,7 @@ class TestHttpApiServer:
data=json.dumps(request_data).encode(), data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
response = urlopen(req) response = urlopen(req) # noqa: S310
data = json.loads(response.read().decode()) data = json.loads(response.read().decode())
# Response should be BackendResponse format # Response should be BackendResponse format
@@ -194,7 +194,7 @@ class TestHttpApiServer:
try: try:
# Test 404 error # Test 404 error
res = urlopen("http://127.0.0.1:8081/nonexistent") res = urlopen("http://127.0.0.1:8081/nonexistent") # noqa: S310
assert res.status == 200 assert res.status == 200
body = json.loads(res.read().decode())["body"] body = json.loads(res.read().decode())["body"]
assert body["status"] == "error" assert body["status"] == "error"
@@ -207,7 +207,7 @@ class TestHttpApiServer:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
res = urlopen(req) res = urlopen(req) # noqa: S310
assert res.status == 200 assert res.status == 200
body = json.loads(res.read().decode())["body"] body = json.loads(res.read().decode())["body"]
assert body["status"] == "error" assert body["status"] == "error"
@@ -219,7 +219,7 @@ class TestHttpApiServer:
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
res = urlopen(req) res = urlopen(req) # noqa: S310
assert res.status == 200 assert res.status == 200
body = json.loads(res.read().decode())["body"] body = json.loads(res.read().decode())["body"]
assert body["status"] == "error" assert body["status"] == "error"
@@ -240,7 +240,7 @@ class TestHttpApiServer:
return "OPTIONS" return "OPTIONS"
req: Request = OptionsRequest("http://127.0.0.1:8081/api/call/test_method") req: Request = OptionsRequest("http://127.0.0.1:8081/api/call/test_method")
response = urlopen(req) response = urlopen(req) # noqa: S310
# Check CORS headers # Check CORS headers
headers = response.info() headers = response.info()
@@ -290,7 +290,7 @@ class TestIntegration:
data=json.dumps(request_data).encode(), data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
response = urlopen(req) response = urlopen(req) # noqa: S310
data: dict = json.loads(response.read().decode()) data: dict = json.loads(response.read().decode())
# Verify response in BackendResponse format # Verify response in BackendResponse format
@@ -341,7 +341,7 @@ class TestIntegration:
data=json.dumps(request_data).encode(), data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
response = urlopen(req) response = urlopen(req) # noqa: S310
data: dict = json.loads(response.read().decode()) data: dict = json.loads(response.read().decode())
# thread.join() # thread.join()
@@ -365,7 +365,7 @@ class TestIntegration:
data=json.dumps(request_data).encode(), data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
response = urlopen(req) response = urlopen(req) # noqa: S310
data: dict = json.loads(response.read().decode()) data: dict = json.loads(response.read().decode())
assert "body" in data assert "body" in data

View File

@@ -359,7 +359,7 @@ def complete_vars_for_machine(
var_id = f"{generator_name}/{var_name}" var_id = f"{generator_name}/{var_name}"
vars_list.append(var_id) vars_list.append(var_id)
except Exception: except (OSError, PermissionError):
pass pass
vars_dict = dict.fromkeys(vars_list, "var") vars_dict = dict.fromkeys(vars_list, "var")

View File

@@ -70,7 +70,7 @@ def requires_explicit_update(m: Machine) -> bool:
try: try:
if m.select("config.clan.deployment.requireExplicitUpdate"): if m.select("config.clan.deployment.requireExplicitUpdate"):
return False return False
except Exception: except (ClanError, AttributeError):
pass pass
try: try:

View File

@@ -262,7 +262,9 @@ def add_secret(
def get_groups(flake_dir: Path, what: str, name: str) -> list[str]: def get_groups(flake_dir: Path, what: str, name: str) -> list[str]:
"""Returns the list of group names the given user or machine is part of.""" """Returns the list of group names the given user or machine is part of."""
assert what in {"users", "machines"} if what not in {"users", "machines"}:
msg = f"Invalid 'what' parameter: {what}. Must be 'users' or 'machines'"
raise ClanError(msg)
groups_dir = sops_groups_folder(flake_dir) groups_dir = sops_groups_folder(flake_dir)
if not groups_dir.exists(): if not groups_dir.exists():

View File

@@ -176,7 +176,7 @@ class ClanFlake:
self.temporary_home = temporary_home self.temporary_home = temporary_home
self.path = temporary_home / "flake" self.path = temporary_home / "flake"
if not suppress_tmp_home_warning: if not suppress_tmp_home_warning:
if "/tmp" not in str(os.environ.get("HOME")): if "/tmp" not in str(os.environ.get("HOME")): # noqa: S108 - Checking if HOME is in temp directory
log.warning( log.warning(
f"!! $HOME does not point to a temp directory!! HOME={os.environ['HOME']}", f"!! $HOME does not point to a temp directory!! HOME={os.environ['HOME']}",
) )
@@ -368,7 +368,7 @@ def create_flake(
check=True, check=True,
) )
if "/tmp" not in str(os.environ.get("HOME")): if "/tmp" not in str(os.environ.get("HOME")): # noqa: S108 - Checking if HOME is in temp directory
log.warning( log.warning(
f"!! $HOME does not point to a temp directory!! HOME={os.environ['HOME']}", f"!! $HOME does not point to a temp directory!! HOME={os.environ['HOME']}",
) )
@@ -441,14 +441,44 @@ def writable_clan_core(
# Copy all tracked and untracked files (excluding ignored) # Copy all tracked and untracked files (excluding ignored)
# Using git ls-files with -z for null-terminated output to handle filenames with spaces # Using git ls-files with -z for null-terminated output to handle filenames with spaces
sp.run(
f"(git ls-files -z; git ls-files -z --others --exclude-standard) | " # Get tracked files
f"xargs -0 cp --parents -t {temp_flake}/", tracked_files = (
shell=True, sp.run(
cwd=clan_core, ["git", "ls-files", "-z"],
check=True, cwd=clan_core,
capture_output=True,
text=True,
check=True,
)
.stdout.rstrip("\0")
.split("\0")
) )
# Get untracked files (excluding ignored)
untracked_files = (
sp.run(
["git", "ls-files", "-z", "--others", "--exclude-standard"],
cwd=clan_core,
capture_output=True,
text=True,
check=True,
)
.stdout.rstrip("\0")
.split("\0")
)
# Combine and filter out empty strings
all_files = [f for f in tracked_files + untracked_files if f]
# Copy files preserving directory structure
if all_files:
sp.run(
["cp", "--parents", "-t", str(temp_flake), "--", *all_files],
cwd=clan_core,
check=True,
)
# Copy .git directory to maintain git functionality # Copy .git directory to maintain git functionality
if (clan_core / ".git").is_dir(): if (clan_core / ".git").is_dir():
shutil.copytree( shutil.copytree(

View File

@@ -14,7 +14,7 @@ TEMPDIR = None
# macOS' default temporary directory is too long for unix sockets # macOS' default temporary directory is too long for unix sockets
# This can break applications such as gpg-agent # This can break applications such as gpg-agent
if platform == "darwin": if platform == "darwin":
TEMPDIR = Path("/tmp") TEMPDIR = Path("/tmp") # noqa: S108 - Required on macOS due to socket path length limits
@pytest.fixture @pytest.fixture

View File

@@ -235,7 +235,7 @@ def test_multiple_user_keys(
# let's do some setting and getting of secrets # let's do some setting and getting of secrets
def random_str() -> str: def random_str() -> str:
return "".join(random.choices(string.ascii_letters, k=10)) return "".join(random.choices(string.ascii_letters, k=10)) # noqa: S311 - Test data generation, not cryptographic
for user_key in user_keys: for user_key in user_keys:
# set a secret using each of the user's private keys # set a secret using each of the user's private keys

View File

@@ -10,7 +10,7 @@ def test_run_environment(runtime: AsyncRuntime) -> None:
None, None,
host.run_local, host.run_local,
["echo $env_var"], ["echo $env_var"],
RunOpts(shell=True, log=Log.STDERR), RunOpts(shell=True, log=Log.STDERR), # noqa: S604 - Testing shell env var expansion
extra_env={"env_var": "true"}, extra_env={"env_var": "true"},
) )
@@ -20,7 +20,7 @@ def test_run_environment(runtime: AsyncRuntime) -> None:
None, None,
host.run_local, host.run_local,
["env"], ["env"],
RunOpts(shell=True, log=Log.STDERR), RunOpts(shell=True, log=Log.STDERR), # noqa: S604 - Testing shell env passing
extra_env={"env_var": "true"}, extra_env={"env_var": "true"},
) )
assert "env_var=true" in p3.wait().result.stdout assert "env_var=true" in p3.wait().result.stdout
@@ -43,7 +43,7 @@ def test_timeout(runtime: AsyncRuntime) -> None:
def test_run_exception(runtime: AsyncRuntime) -> None: def test_run_exception(runtime: AsyncRuntime) -> None:
p1 = runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) p1 = runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) # noqa: S604 - Testing shell error handling
assert p1.wait().error is not None assert p1.wait().error is not None

View File

@@ -80,7 +80,9 @@ def migrate_files(
files_to_commit = [] files_to_commit = []
for file in generator.files: for file in generator.files:
if _migration_file_exists(machine, generator, file.name): if _migration_file_exists(machine, generator, file.name):
assert generator.migrate_fact is not None if generator.migrate_fact is None:
msg = f"Generator {generator.name} has no migrate_fact defined"
raise ClanError(msg)
files_to_commit += _migrate_file( files_to_commit += _migrate_file(
machine, machine,
generator, generator,

View File

@@ -1,5 +1,5 @@
import platform import platform
import random import secrets
from collections.abc import Generator from collections.abc import Generator
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
@@ -27,7 +27,7 @@ def graphics_options(vm: VmConfig) -> GraphicOptions:
if vm.waypipe.enable: if vm.waypipe.enable:
# FIXME: check for collisions # FIXME: check for collisions
cid = random.randint(1, 2**32) cid = secrets.randbelow(2**32 - 1) + 1 # Generate random CID between 1 and 2^32
# fmt: off # fmt: off
return GraphicOptions([ return GraphicOptions([
*common, *common,

View File

@@ -223,7 +223,9 @@ def construct_value(
# If the field is another dataclass # If the field is another dataclass
# Field_value must be a dictionary # Field_value must be a dictionary
if is_dataclass(t) and isinstance(field_value, dict): if is_dataclass(t) and isinstance(field_value, dict):
assert isinstance(t, type) if not isinstance(t, type):
msg = f"Expected a type, got {t}"
raise ClanError(msg)
return construct_dataclass(t, field_value) return construct_dataclass(t, field_value)
# If the field expects a path # If the field expects a path

View File

@@ -21,7 +21,6 @@ from typing import (
) )
from clan_lib.api.serde import dataclass_to_dict from clan_lib.api.serde import dataclass_to_dict
from clan_lib.errors import ClanError
class JSchemaTypeError(Exception): class JSchemaTypeError(Exception):
@@ -126,7 +125,7 @@ def type_to_dict(
continue continue
if isinstance(f.type, str): if isinstance(f.type, str):
msg = f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" msg = f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?"
raise ClanError(msg) raise JSchemaTypeError(msg)
properties[f.metadata.get("alias", f.name)] = type_to_dict( properties[f.metadata.get("alias", f.name)] = type_to_dict(
f.type, f.type,
f"{scope} {t.__name__}.{f.name}", # type: ignore f"{scope} {t.__name__}.{f.name}", # type: ignore

View File

@@ -281,7 +281,9 @@ class AsyncRuntime:
for name, task in self.tasks.items(): for name, task in self.tasks.items():
if task.finished and task.async_opts.check: if task.finished and task.async_opts.check:
assert task.result is not None if task.result is None:
msg = f"Task {name} finished but has no result"
raise ClanError(msg)
error = task.result.error error = task.result.error
if error is not None: if error is not None:
if log.isEnabledFor(logging.DEBUG): if log.isEnabledFor(logging.DEBUG):

View File

@@ -13,7 +13,7 @@ def test_get_clan_details_invalid_flake() -> None:
get_clan_details(invalid_flake) get_clan_details(invalid_flake)
with pytest.raises(FlakeInvalidError): with pytest.raises(FlakeInvalidError):
get_clan_details(Flake("/tmp")) get_clan_details(Flake("/tmp")) # noqa: S108
@pytest.mark.with_core @pytest.mark.with_core

View File

@@ -906,7 +906,7 @@ class Flake:
if self.hash is None: if self.hash is None:
msg = "Hash cannot be None" msg = "Hash cannot be None"
raise ClanError(msg) raise ClanError(msg)
hashed_hash = sha1(self.hash.encode()).hexdigest() hashed_hash = sha1(self.hash.encode()).hexdigest() # noqa: S324 - SHA1 used only for cache directory naming, not security
self.flake_cache_path = ( self.flake_cache_path = (
Path(user_cache_dir()) / "clan" / "flakes-v2" / hashed_hash Path(user_cache_dir()) / "clan" / "flakes-v2" / hashed_hash
) )

View File

@@ -13,10 +13,11 @@ def list_log_days() -> list[str]:
A list of date strings in YYYY-MM-DD format representing all available log days. A list of date strings in YYYY-MM-DD format representing all available log days.
Raises: Raises:
AssertionError: If LOG_MANAGER_INSTANCE is not initialized. ClanError: If LOG_MANAGER_INSTANCE is not initialized.
""" """
assert LOG_MANAGER_INSTANCE is not None if LOG_MANAGER_INSTANCE is None:
msg = "LOG_MANAGER_INSTANCE is not initialized"
raise ClanError(msg)
return [day.date_day for day in LOG_MANAGER_INSTANCE.list_log_days()] return [day.date_day for day in LOG_MANAGER_INSTANCE.list_log_days()]
@@ -35,10 +36,12 @@ def list_log_groups(
A list of folder names (decoded) at the specified path level. A list of folder names (decoded) at the specified path level.
Raises: Raises:
AssertionError: If LOG_MANAGER_INSTANCE is not initialized. ClanError: If LOG_MANAGER_INSTANCE is not initialized.
""" """
assert LOG_MANAGER_INSTANCE is not None if LOG_MANAGER_INSTANCE is None:
msg = "LOG_MANAGER_INSTANCE is not initialized"
raise ClanError(msg)
return LOG_MANAGER_INSTANCE.filter(selector, date_day=date_day) return LOG_MANAGER_INSTANCE.filter(selector, date_day=date_day)
@@ -59,11 +62,11 @@ def get_log_file(
The contents of the log file as a string. The contents of the log file as a string.
Raises: Raises:
ClanError: If the log file is not found. ClanError: If the log file is not found or LOG_MANAGER_INSTANCE is not initialized.
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
""" """
assert LOG_MANAGER_INSTANCE is not None if LOG_MANAGER_INSTANCE is None:
msg = "LOG_MANAGER_INSTANCE is not initialized"
raise ClanError(msg)
log_file = LOG_MANAGER_INSTANCE.get_log_file( log_file = LOG_MANAGER_INSTANCE.get_log_file(
op_key=id_key, op_key=id_key,

View File

@@ -1,8 +1,8 @@
import json import json
import logging import logging
import os import os
import random
import re import re
import secrets
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -34,8 +34,8 @@ def is_local_input(node: dict[str, dict[str, str]]) -> bool:
def random_hostname() -> str: def random_hostname() -> str:
adjectives = ["wacky", "happy", "fluffy", "silly", "quirky", "zany", "bouncy"] adjectives = ["wacky", "happy", "fluffy", "silly", "quirky", "zany", "bouncy"]
nouns = ["unicorn", "penguin", "goose", "ninja", "octopus", "hamster", "robot"] nouns = ["unicorn", "penguin", "goose", "ninja", "octopus", "hamster", "robot"]
adjective = random.choice(adjectives) adjective = secrets.choice(adjectives)
noun = random.choice(nouns) noun = secrets.choice(nouns)
return f"{adjective}-{noun}" return f"{adjective}-{noun}"

View File

@@ -53,10 +53,10 @@ def get_metrics(
encoded_credentials = b64encode(credentials.encode("utf-8")).decode("utf-8") encoded_credentials = b64encode(credentials.encode("utf-8")).decode("utf-8")
headers = {"Authorization": f"Basic {encoded_credentials}"} headers = {"Authorization": f"Basic {encoded_credentials}"}
req = urllib.request.Request(url, headers=headers) req = urllib.request.Request(url, headers=headers) # noqa: S310
try: try:
response = urllib.request.urlopen(req) response = urllib.request.urlopen(req) # noqa: S310
for line in response: for line in response:
line_str = line.decode("utf-8").strip() line_str = line.decode("utf-8").strip()
if line_str: if line_str:

View File

@@ -139,7 +139,7 @@ class Remote:
if sys.platform == "darwin" and os.environ.get("TMPDIR", "").startswith( if sys.platform == "darwin" and os.environ.get("TMPDIR", "").startswith(
"/var/folders/", "/var/folders/",
): ):
directory = "/tmp/" directory = "/tmp/" # noqa: S108 - Required on macOS due to bugs with default TMPDIR
with TemporaryDirectory(prefix="clan-ssh", dir=directory) as temp_dir: with TemporaryDirectory(prefix="clan-ssh", dir=directory) as temp_dir:
remote = Remote( remote = Remote(
address=self.address, address=self.address,

View File

@@ -157,7 +157,7 @@ def test_run_environment(hosts: list[Remote], runtime: AsyncRuntime) -> None:
None, None,
host.run_local, host.run_local,
["echo $env_var"], ["echo $env_var"],
RunOpts(shell=True, log=Log.STDERR), RunOpts(shell=True, log=Log.STDERR), # noqa: S604
extra_env={"env_var": "true"}, extra_env={"env_var": "true"},
) )
assert proc.wait().result.stdout == "true\n" assert proc.wait().result.stdout == "true\n"
@@ -230,16 +230,16 @@ def test_run_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None:
None, None,
host.run_local, host.run_local,
["exit 1"], ["exit 1"],
RunOpts(shell=True, check=False), RunOpts(shell=True, check=False), # noqa: S604
) )
assert proc.wait().result.returncode == 1 assert proc.wait().result.returncode == 1
try: try:
for host in hosts: for host in hosts:
runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) # noqa: S604
runtime.join_all() runtime.join_all()
runtime.check_all() runtime.check_all()
except Exception: except Exception: # noqa: S110
pass pass
else: else:
msg = "should have raised Exception" msg = "should have raised Exception"
@@ -248,14 +248,14 @@ def test_run_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None:
def test_run_function_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None: def test_run_function_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None:
def some_func(h: Remote) -> CmdOut: def some_func(h: Remote) -> CmdOut:
return h.run_local(["exit 1"], RunOpts(shell=True)) return h.run_local(["exit 1"], RunOpts(shell=True)) # noqa: S604
try: try:
for host in hosts: for host in hosts:
runtime.async_run(None, some_func, host) runtime.async_run(None, some_func, host)
runtime.join_all() runtime.join_all()
runtime.check_all() runtime.check_all()
except Exception: except Exception: # noqa: S110
pass pass
else: else:
msg = "should have raised Exception" msg = "should have raised Exception"

View File

@@ -74,17 +74,19 @@ class SudoAskpassProxy:
def _process(self, ssh_process: subprocess.Popen) -> None: def _process(self, ssh_process: subprocess.Popen) -> None:
"""Execute the remote command with password proxying""" """Execute the remote command with password proxying"""
# Monitor SSH output for password requests # Monitor SSH output for password requests
assert ssh_process.stdout is not None, "SSH process stdout is None" if ssh_process.stdout is None:
msg = "SSH process stdout is None"
raise ClanError(msg)
try: try:
for line in ssh_process.stdout: for line in ssh_process.stdout:
line = line.strip() line = line.strip()
if line.startswith("PASSWORD_REQUESTED:"): if line.startswith("PASSWORD_REQUESTED:"):
prompt = line[len("PASSWORD_REQUESTED:") :].strip() prompt = line[len("PASSWORD_REQUESTED:") :].strip()
password = self.handle_password_request(prompt) password = self.handle_password_request(prompt)
print(password, file=ssh_process.stdin)
if ssh_process.stdin is None: if ssh_process.stdin is None:
msg = "SSH process stdin is None" msg = "SSH process stdin is None"
raise ClanError(msg) raise ClanError(msg)
print(password, file=ssh_process.stdin)
ssh_process.stdin.flush() ssh_process.stdin.flush()
else: else:
print(line) print(line)
@@ -137,10 +139,10 @@ class SudoAskpassProxy:
pass pass
# Unclear why we have to close this manually, but pytest reports unclosed fd # Unclear why we have to close this manually, but pytest reports unclosed fd
assert self.ssh_process.stdout is not None if self.ssh_process.stdout is not None:
self.ssh_process.stdout.close() self.ssh_process.stdout.close()
assert self.ssh_process.stdin is not None if self.ssh_process.stdin is not None:
self.ssh_process.stdin.close() self.ssh_process.stdin.close()
self.ssh_process = None self.ssh_process = None
if self.thread: if self.thread:
self.thread.join() self.thread.join()

View File

@@ -33,7 +33,7 @@ def upload(
# Exceptions: Allow depth 2 if the path starts with /tmp/, /root/, or /etc/. # Exceptions: Allow depth 2 if the path starts with /tmp/, /root/, or /etc/.
# This allows destinations like /tmp/mydir or /etc/conf.d, but not /tmp or /etc directly. # This allows destinations like /tmp/mydir or /etc/conf.d, but not /tmp or /etc directly.
is_allowed_exception = depth >= 2 and ( is_allowed_exception = depth >= 2 and (
str(remote_dest).startswith("/tmp/") str(remote_dest).startswith("/tmp/") # noqa: S108 - Path validation check
or str(remote_dest).startswith("/root/") or str(remote_dest).startswith("/root/")
or str(remote_dest).startswith("/etc/") or str(remote_dest).startswith("/etc/")
) )

View File

@@ -59,7 +59,7 @@ def machine_template(
try: try:
template = template_flake.select(template_selector) template = template_flake.select(template_selector)
except ClanError as e: except ClanError as e:
msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" # noqa: S608
raise ClanError(msg) from e raise ClanError(msg) from e
src = template.get("path") src = template.get("path")
@@ -152,7 +152,7 @@ def clan_template(
template_flake = builtin_flake template_flake = builtin_flake
printable_template_ref = f"{clan_templates()}#{builtin_selector}" printable_template_ref = f"{clan_templates()}#{builtin_selector}"
except ClanError: except ClanError:
msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" msg = f"Failed to select template '{template_ident}' from flake '{flake_ref}' (via attribute path: {printable_template_ref})" # noqa: S608
raise ClanError(msg) from e raise ClanError(msg) from e
src = template.get("path") src = template.get("path")

View File

@@ -11,6 +11,7 @@ gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1") gi.require_version("Adw", "1")
from clan_lib.custom_logger import setup_logging from clan_lib.custom_logger import setup_logging
from clan_lib.errors import ClanError
from gi.repository import Adw, Gdk, Gio, Gtk from gi.repository import Adw, Gdk, Gio, Gtk
from clan_vm_manager.components.interfaces import ClanConfig from clan_vm_manager.components.interfaces import ClanConfig
@@ -118,7 +119,9 @@ class MainApplication(Adw.Application):
css_provider = Gtk.CssProvider() css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(resource_path)) css_provider.load_from_path(str(resource_path))
display = Gdk.Display.get_default() display = Gdk.Display.get_default()
assert display is not None if display is None:
msg = "Could not get default display"
raise ClanError(msg)
Gtk.StyleContext.add_provider_for_display( Gtk.StyleContext.add_provider_for_display(
display, display,
css_provider, css_provider,

View File

@@ -116,7 +116,9 @@ class ClanList(Gtk.Box):
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s")) add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add) add_action.connect("activate", self.on_add)
app = Gio.Application.get_default() app = Gio.Application.get_default()
assert app is not None if app is None:
msg = "Could not get default application"
raise ClanError(msg)
app.add_action(add_action) app.add_action(add_action)
# menu_model = Gio.Menu() # menu_model = Gio.Menu()
@@ -214,7 +216,9 @@ class ClanList(Gtk.Box):
build_logs_action.set_enabled(False) build_logs_action.set_enabled(False)
app = Gio.Application.get_default() app = Gio.Application.get_default()
assert app is not None if app is None:
msg = "Could not get default application"
raise ClanError(msg)
app.add_action(open_action) app.add_action(open_action)
app.add_action(build_logs_action) app.add_action(build_logs_action)

View File

@@ -15,7 +15,7 @@
mkdir -p "$CLAN_TEST_STORE/nix/store" mkdir -p "$CLAN_TEST_STORE/nix/store"
mkdir -p "$CLAN_TEST_STORE/nix/var/nix/gcroots" mkdir -p "$CLAN_TEST_STORE/nix/var/nix/gcroots"
if [[ -n "''${closureInfo-}" ]]; then if [[ -n "''${closureInfo-}" ]]; then
${pkgs.findutils}/bin/xargs -P"$(nproc)" ${pkgs.coreutils}/bin/cp --recursive --reflink=auto --target-directory "$CLAN_TEST_STORE/nix/store" < "$closureInfo/store-paths" ${pkgs.findutils}/bin/xargs -r -P"$(nproc)" ${pkgs.coreutils}/bin/cp --recursive --no-dereference --reflink=auto --target-directory "$CLAN_TEST_STORE/nix/store" < "$closureInfo/store-paths"
${pkgs.nix}/bin/nix-store --load-db --store "$CLAN_TEST_STORE" < "$closureInfo/registration" ${pkgs.nix}/bin/nix-store --load-db --store "$CLAN_TEST_STORE" < "$closureInfo/registration"
fi fi
''; '';

View File

@@ -53,8 +53,10 @@ def setup_nix_in_nix(closure_info: str | None) -> None:
subprocess.run( # noqa: S603 subprocess.run( # noqa: S603
[ [
XARGS_BIN, XARGS_BIN,
"-r",
f"-P{num_cpus}", # Use all available CPUs f"-P{num_cpus}", # Use all available CPUs
CP_BIN, CP_BIN,
"--no-dereference",
"--recursive", "--recursive",
"--reflink=auto", # Use copy-on-write if available "--reflink=auto", # Use copy-on-write if available
"--target-directory", "--target-directory",

View File

@@ -17,8 +17,16 @@ def compute_zerotier_ip(network_id: str, identity: str) -> ipaddress.IPv6Address
if len(network_id) != 16: if len(network_id) != 16:
msg = f"network_id must be 16 characters long, got {network_id}" msg = f"network_id must be 16 characters long, got {network_id}"
raise ClanError(msg) raise ClanError(msg)
nwid = int(network_id, 16) try:
node_id = int(identity, 16) nwid = int(network_id, 16)
except ValueError:
msg = f"network_id must be a valid hexadecimal string, got {network_id}"
raise ClanError(msg) from None
try:
node_id = int(identity, 16)
except ValueError:
msg = f"identity must be a valid hexadecimal string, got {identity}"
raise ClanError(msg) from None
addr_parts = bytearray( addr_parts = bytearray(
[ [
0xFD, 0xFD,