Merge pull request 'ruff-6-warnings' (#4937) from ruff-6-warnings into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4937
This commit is contained in:
Mic92
2025-08-25 14:06:20 +00:00
14 changed files with 87 additions and 114 deletions

View File

@@ -1,21 +1,14 @@
import os
import shutil
from functools import cache
from pathlib import Path
from clan_lib.cmd import Log, RunOpts, run
from clan_lib.nix import nix_shell
_works: bool | None = None
@cache
def bubblewrap_works() -> bool:
global _works
if _works is None:
_works = _bubblewrap_works()
return _works
def _bubblewrap_works() -> bool:
real_bash_path = Path("bash")
if os.environ.get("IN_NIX_SANDBOX"):
bash_executable_path = Path(str(shutil.which("bash")))

View File

@@ -90,7 +90,7 @@ def indent_command(command_list: list[str]) -> str:
return final_command
DEBUG_COMMANDS = os.environ.get("CLAN_DEBUG_COMMANDS", False)
DEBUG_COMMANDS = os.environ.get("CLAN_DEBUG_COMMANDS") == "1"
@dataclass

View File

@@ -869,7 +869,7 @@ class Flake:
self.identifier,
]
trace_prefetch = os.environ.get("CLAN_DEBUG_NIX_PREFETCH", False) == "1"
trace_prefetch = os.environ.get("CLAN_DEBUG_NIX_PREFETCH") == "1"
if not trace_prefetch:
log.debug(f"Prefetching flake {self.identifier}")
try:
@@ -1018,7 +1018,7 @@ class Flake:
];
}}
"""
trace = os.environ.get("CLAN_DEBUG_NIX_SELECTORS", False) == "1"
trace = os.environ.get("CLAN_DEBUG_NIX_SELECTORS") == "1"
try:
build_output = Path(
run(

View File

@@ -138,15 +138,6 @@ class LogFile:
return path / self.func_name / f"{self.date_second}_{self.op_key}.log"
def __eq__(self, other: object) -> bool:
"""Check equality with another LogFile instance.
Args:
other: The object to compare with.
Returns:
True if all significant fields are equal, False otherwise.
"""
if not isinstance(other, LogFile):
return NotImplemented
# Compare all significant fields for equality
@@ -158,18 +149,18 @@ class LogFile:
and self._base_dir == other._base_dir
)
def __hash__(self) -> int:
return hash(
(
self._datetime_obj,
self.group,
self.func_name,
self.op_key,
self._base_dir,
),
)
def __lt__(self, other: object) -> bool:
"""Compare LogFile instances for sorting.
Sorting order: datetime (newest first), then group, func_name, op_key (all ascending).
Args:
other: The object to compare with.
Returns:
True if this instance should be sorted before the other.
"""
if not isinstance(other, LogFile):
return NotImplemented
# Primary sort: datetime (newest first). self is "less than" other if self is newer.
@@ -228,31 +219,14 @@ class LogDayDir:
return self._base_dir / self.date_day
def __eq__(self, other: object) -> bool:
"""Check equality with another LogDayDir instance.
Args:
other: The object to compare with.
Returns:
True if date_day and base_dir are equal, False otherwise.
"""
if not isinstance(other, LogDayDir):
return NotImplemented
return self.date_day == other.date_day and self._base_dir == other._base_dir
def __hash__(self) -> int:
return hash((self.date_day, self._base_dir))
def __lt__(self, other: object) -> bool:
"""Compare LogDayDir instances for sorting.
Sorting order: date (newest first).
Args:
other: The object to compare with.
Returns:
True if this instance should be sorted before the other.
"""
if not isinstance(other, LogDayDir):
return NotImplemented
# Primary sort: date (newest first)

View File

@@ -247,10 +247,10 @@ def get_network_overview(networks: dict[str, Network]) -> dict:
if module.is_running():
result[network_name]["status"] = True
else:
with module.connection(network) as network:
for peer_name in network.peers:
with module.connection(network) as conn:
for peer_name in conn.peers:
try:
result[network_name]["peers"][peer_name] = network.ping(
result[network_name]["peers"][peer_name] = conn.ping(
peer_name,
)
except ClanError:

View File

@@ -498,12 +498,14 @@ def retrieve_typed_field_names(obj: type, prefix: str = "") -> set[str]:
# Unwrap Required/NotRequired
if origin in {NotRequired, Required}:
field_type = args[0]
origin = get_origin(field_type)
args = get_args(field_type)
unwrapped_type = args[0]
origin = get_origin(unwrapped_type)
args = get_args(unwrapped_type)
else:
unwrapped_type = field_type
if is_typeddict_class(field_type):
fields |= retrieve_typed_field_names(field_type, prefix=full_key)
if is_typeddict_class(unwrapped_type):
fields |= retrieve_typed_field_names(unwrapped_type, prefix=full_key)
else:
fields.add(full_key)

View File

@@ -79,9 +79,9 @@ class SudoAskpassProxy:
raise ClanError(msg)
try:
for line in ssh_process.stdout:
line = line.strip()
if line.startswith("PASSWORD_REQUESTED:"):
prompt = line[len("PASSWORD_REQUESTED:") :].strip()
stripped_line = line.strip()
if stripped_line.startswith("PASSWORD_REQUESTED:"):
prompt = stripped_line[len("PASSWORD_REQUESTED:") :].strip()
password = self.handle_password_request(prompt)
if ssh_process.stdin is None:
msg = "SSH process stdin is None"
@@ -89,7 +89,7 @@ class SudoAskpassProxy:
print(password, file=ssh_process.stdin)
ssh_process.stdin.flush()
else:
print(line)
print(stripped_line)
except Exception as e:
logger.error(f"Error processing passwords requests output: {e}")
@@ -116,9 +116,9 @@ class SudoAskpassProxy:
raise ClanError(msg)
for line in self.ssh_process.stdout:
line = line.strip()
if line.startswith("ASKPASS_SCRIPT:"):
askpass_script = line[len("ASKPASS_SCRIPT:") :].strip()
stripped_line = line.strip()
if stripped_line.startswith("ASKPASS_SCRIPT:"):
askpass_script = stripped_line[len("ASKPASS_SCRIPT:") :].strip()
break
else:
msg = f"Failed to create askpass script on {self.host.target}. Did not receive ASKPASS_SCRIPT line."