PERF401: fix

This commit is contained in:
Jörg Thalheim
2025-08-20 19:56:18 +02:00
committed by a-kenji
parent d8bc5269ee
commit 830da48943
9 changed files with 75 additions and 79 deletions

View File

@@ -77,26 +77,28 @@ class SecretStore(SecretStoreBase):
check=False, check=False,
).stdout.strip(), ).stdout.strip(),
) )
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"): hashes.extend(
if symlink.is_symlink(): subprocess.run(
hashes.append( nix_shell(
subprocess.run( ["git"],
nix_shell( [
["git"], "git",
[ "-C",
"git", password_store,
"-C", "log",
password_store, "-1",
"log", "--format=%H",
"-1", str(symlink),
"--format=%H", ],
str(symlink), ),
], stdout=subprocess.PIPE,
), check=False,
stdout=subprocess.PIPE, ).stdout.strip()
check=False, for symlink in Path(password_store).glob(
).stdout.strip(), f"machines/{self.machine.name}/**/*",
) )
if symlink.is_symlink()
)
# we sort the hashes to make sure that the order is always the same # we sort the hashes to make sure that the order is always the same
hashes.sort() hashes.sort()

View File

@@ -23,13 +23,9 @@ sops_groups_folder = gen_sops_subfolder("groups")
def list_objects(path: Path, is_valid: Callable[[str], bool]) -> list[str]: def list_objects(path: Path, is_valid: Callable[[str], bool]) -> list[str]:
objs: list[str] = []
if not path.exists(): if not path.exists():
return objs return []
for f in path.iterdir(): return [f.name for f in path.iterdir() if is_valid(f.name)]
if is_valid(f.name):
objs.append(f.name)
return objs
def remove_object(path: Path, name: str) -> list[Path]: def remove_object(path: Path, name: str) -> list[Path]:

View File

@@ -64,17 +64,17 @@ def list_groups(flake_dir: Path) -> list[Group]:
if not group_folder.is_dir(): if not group_folder.is_dir():
continue continue
machines_path = machines_folder(flake_dir, group.name) machines_path = machines_folder(flake_dir, group.name)
machines = [] machines = (
if machines_path.is_dir(): [f.name for f in machines_path.iterdir() if validate_hostname(f.name)]
for f in machines_path.iterdir(): if machines_path.is_dir()
if validate_hostname(f.name): else []
machines.append(f.name) )
users_path = users_folder(flake_dir, group.name) users_path = users_folder(flake_dir, group.name)
users = [] users = (
if users_path.is_dir(): [f.name for f in users_path.iterdir() if VALID_USER_NAME.match(f.name)]
for f in users_path.iterdir(): if users_path.is_dir()
if VALID_USER_NAME.match(f.name): else []
users.append(f.name) )
groups.append(Group(flake_dir, group.name, machines, users)) groups.append(Group(flake_dir, group.name, machines, users))
return groups return groups
@@ -270,11 +270,11 @@ def get_groups(flake_dir: Path, what: str, name: str) -> list[str]:
if not groups_dir.exists(): if not groups_dir.exists():
return [] return []
groups = [] return [
for group in groups_dir.iterdir(): group.name
if group.is_dir() and (group / what / name).is_symlink(): for group in groups_dir.iterdir()
groups.append(group.name) if group.is_dir() and (group / what / name).is_symlink()
return groups ]
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:

View File

@@ -41,7 +41,7 @@ log = logging.getLogger(__name__)
def list_generators_secrets(generators_path: Path) -> list[Path]: def list_generators_secrets(generators_path: Path) -> list[Path]:
paths = [] paths: list[Path] = []
for generator_path in generators_path.iterdir(): for generator_path in generators_path.iterdir():
if not generator_path.is_dir(): if not generator_path.is_dir():
continue continue
@@ -49,11 +49,13 @@ def list_generators_secrets(generators_path: Path) -> list[Path]:
def validate(generator_path: Path, name: str) -> bool: def validate(generator_path: Path, name: str) -> bool:
return has_secret(generator_path / name) return has_secret(generator_path / name)
for obj in list_objects( paths.extend(
generator_path, generator_path / obj
functools.partial(validate, generator_path), for obj in list_objects(
): generator_path,
paths.append(generator_path / obj) functools.partial(validate, generator_path),
)
)
return paths return paths

View File

@@ -63,7 +63,7 @@ def find_dataclasses_in_directory(
and isinstance(deco.func, ast.Name) and isinstance(deco.func, ast.Name)
and deco.func.id == "dataclass" and deco.func.id == "dataclass"
): ):
dataclass_files.append((file_path, node.name)) dataclass_files.append((file_path, node.name)) # noqa: PERF401
except (SyntaxError, UnicodeDecodeError) as e: except (SyntaxError, UnicodeDecodeError) as e:
print(f"Error parsing {file_path}: {e}") print(f"Error parsing {file_path}: {e}")

View File

@@ -164,11 +164,12 @@ class SecretStore(StoreBase):
from clan_cli.vars.generator import Generator from clan_cli.vars.generator import Generator
manifest = []
generators = Generator.get_machine_generators(machine, self.flake) generators = Generator.get_machine_generators(machine, self.flake)
for generator in generators: manifest = [
for file in generator.files: f"{generator.name}/{file.name}".encode()
manifest.append(f"{generator.name}/{file.name}".encode()) for generator in generators
for file in generator.files
]
manifest.append(git_hash) manifest.append(git_hash)
return b"\n".join(manifest) return b"\n".join(manifest)

View File

@@ -14,7 +14,6 @@ class Backup:
def list_provider(machine: Machine, host: Remote, provider: str) -> list[Backup]: def list_provider(machine: Machine, host: Remote, provider: str) -> list[Backup]:
results = []
backup_metadata = machine.select("config.clan.core.backups") backup_metadata = machine.select("config.clan.core.backups")
list_command = backup_metadata["providers"][provider]["list"] list_command = backup_metadata["providers"][provider]["list"]
proc = host.run( proc = host.run(
@@ -35,8 +34,11 @@ def list_provider(machine: Machine, host: Remote, provider: str) -> list[Backup]
msg = f"Failed to parse json output from provider {provider}:\n{proc.stdout}" msg = f"Failed to parse json output from provider {provider}:\n{proc.stdout}"
raise ClanError(msg) from e raise ClanError(msg) from e
for archive in parsed_json: results: list[Backup] = []
results.append(Backup(name=archive["name"], job_name=archive.get("job_name"))) results.extend(
Backup(name=archive["name"], job_name=archive.get("job_name"))
for archive in parsed_json
)
return results return results

View File

@@ -444,8 +444,9 @@ class FlakeCacheEntry:
if not isinstance(selector.value, list): if not isinstance(selector.value, list):
msg = f"Expected list for SET selector value, got {type(selector.value)}" msg = f"Expected list for SET selector value, got {type(selector.value)}"
raise ClanError(msg) raise ClanError(msg)
for subselector in selector.value: fetched_indices.extend(
fetched_indices.append(subselector.value) subselector.value for subselector in selector.value
)
# if it's just a str, that is the index # if it's just a str, that is the index
elif selector.type == SelectorType.STR: elif selector.type == SelectorType.STR:
if not isinstance(selector.value, str): if not isinstance(selector.value, str):
@@ -635,9 +636,9 @@ class FlakeCacheEntry:
keys_to_select: list[str] = [] keys_to_select: list[str] = []
# if we want to select all keys, we take all existing sub elements # if we want to select all keys, we take all existing sub elements
if selector.type == SelectorType.ALL: if selector.type == SelectorType.ALL:
for key in self.value: keys_to_select.extend(
if self.value[key].exists: key for key in self.value if self.value[key].exists
keys_to_select.append(key) )
# if we want to select a set of keys, we take the keys from the selector # if we want to select a set of keys, we take the keys from the selector
if selector.type == SelectorType.SET: if selector.type == SelectorType.SET:
@@ -657,9 +658,9 @@ class FlakeCacheEntry:
# if we are a list, return a list # if we are a list, return a list
if self.is_list: if self.is_list:
result_list: list[Any] = [] result_list: list[Any] = [
for index in keys_to_select: self.value[index].select(selectors[1:]) for index in keys_to_select
result_list.append(self.value[index].select(selectors[1:])) ]
return result_list return result_list
# otherwise return a dict # otherwise return a dict
@@ -681,12 +682,10 @@ class FlakeCacheEntry:
if selector.type == SelectorType.ALL: if selector.type == SelectorType.ALL:
str_selector = "*" str_selector = "*"
elif selector.type == SelectorType.SET: elif selector.type == SelectorType.SET:
subselectors: list[str] = []
if not isinstance(selector.value, list): if not isinstance(selector.value, list):
msg = f"Expected list for SET selector value in error handling, got {type(selector.value)}" msg = f"Expected list for SET selector value in error handling, got {type(selector.value)}"
raise ClanError(msg) raise ClanError(msg)
for subselector in selector.value: subselectors = [subselector.value for subselector in selector.value]
subselectors.append(subselector.value)
str_selector = "{" + ",".join(subselectors) + "}" str_selector = "{" + ",".join(subselectors) + "}"
else: else:
if not isinstance(selector.value, str): if not isinstance(selector.value, str):
@@ -967,9 +966,9 @@ class Flake:
nix_options = self.nix_options[:] if self.nix_options is not None else [] nix_options = self.nix_options[:] if self.nix_options is not None else []
str_selectors: list[str] = [] str_selectors = [
for selector in selectors: selectors_as_json(parse_selector(selector)) for selector in selectors
str_selectors.append(selectors_as_json(parse_selector(selector))) ]
config = nix_config() config = nix_config()
@@ -1079,10 +1078,9 @@ class Flake:
if self.flake_cache_path is None: if self.flake_cache_path is None:
msg = "Flake cache path cannot be None" msg = "Flake cache path cannot be None"
raise ClanError(msg) raise ClanError(msg)
not_fetched_selectors = [] not_fetched_selectors = [
for selector in selectors: selector for selector in selectors if not self._cache.is_cached(selector)
if not self._cache.is_cached(selector): ]
not_fetched_selectors.append(selector)
if not_fetched_selectors: if not_fetched_selectors:
self.get_from_nix(not_fetched_selectors) self.get_from_nix(not_fetched_selectors)

View File

@@ -133,12 +133,7 @@ def list_difference(all_items: list, filter_items: list) -> list:
""" """
# Unmerge the lists # Unmerge the lists
res = [] return [value for value in all_items if value not in filter_items]
for value in all_items:
if value not in filter_items:
res.append(value)
return res
def find_duplicates(string_list: list[str]) -> list[str]: def find_duplicates(string_list: list[str]) -> list[str]: