diff --git a/pkgs/clan-cli/clan_cli/secrets/filters.py b/pkgs/clan-cli/clan_cli/secrets/filters.py new file mode 100644 index 000000000..801069e82 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/filters.py @@ -0,0 +1,33 @@ +import functools +from collections.abc import Callable +from pathlib import Path + +from .groups import get_groups + + +def get_secrets_filter_for_users_or_machines( + what: str, + flake_dir: Path, + name: str, +) -> Callable[[Path], bool]: + groups_names = get_groups(flake_dir, what, name) + + def filter_secrets(secret: Path) -> bool: + if (secret / what / name).is_symlink(): + return True + groups_folder = secret / "groups" + return any((groups_folder / name).is_symlink() for name in groups_names) + + return filter_secrets + + +get_secrets_filter_for_user = functools.partial( + get_secrets_filter_for_users_or_machines, + "users", +) + + +get_secrets_filter_for_machine = functools.partial( + get_secrets_filter_for_users_or_machines, + "machines", +) diff --git a/pkgs/clan-cli/clan_cli/secrets/groups.py b/pkgs/clan-cli/clan_cli/secrets/groups.py index ee7b4cf06..1a63ca6a5 100644 --- a/pkgs/clan-cli/clan_cli/secrets/groups.py +++ b/pkgs/clan-cli/clan_cli/secrets/groups.py @@ -246,18 +246,18 @@ def add_secret(flake_dir: Path, group: str, name: str) -> None: ) -def get_groups( - flake_dir: Path, - type_name: str, - name: str, -) -> list[Path]: +def get_groups(flake_dir: Path, what: str, name: str) -> list[str]: + """Returns the list of group names the given user or machine is part of.""" + assert what == "users" or what == "machines" + groups_dir = sops_groups_folder(flake_dir) + if not groups_dir.exists(): + return [] groups = [] - if groups_dir.exists(): - for group in groups_dir.iterdir(): - if group.is_dir() and (group / type_name / name).exists(): - groups.append(group) + for group in groups_dir.iterdir(): + if group.is_dir() and (group / what / name).is_symlink(): + groups.append(group.name) return groups diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index e77de53cd..a3c72ca49 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -11,13 +11,13 @@ from clan_cli.git import commit_files from clan_cli.machines.types import machine_name_type, validate_hostname from . import secrets, sops +from .filters import get_secrets_filter_for_machine from .folders import ( list_objects, remove_object, sops_machines_folder, sops_secrets_folder, ) -from .groups import get_groups from .secrets import update_secrets from .sops import read_key, write_key from .types import public_or_private_age_key_type, secret_name_type @@ -28,14 +28,8 @@ def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None: write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force) paths = [machine_path] - groups = get_groups(flake_dir, "machines", name) - - def filter_machine_secrets(secret: Path) -> bool: - if (secret / "machines" / name).exists(): - return True - return any(secret.joinpath("groups", group.name).exists() for group in groups) - - paths.extend(update_secrets(flake_dir, filter_secrets=filter_machine_secrets)) + filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name) + paths.extend(update_secrets(flake_dir, filter_machine_secrets)) commit_files( paths, flake_dir, @@ -45,6 +39,8 @@ def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None: def remove_machine(flake_dir: Path, name: str) -> None: removed_paths = remove_object(sops_machines_folder(flake_dir), name) + filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name) + removed_paths.extend(update_secrets(flake_dir, filter_machine_secrets)) commit_files( removed_paths, flake_dir, diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 44a37417d..957c030f1 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -10,6 +10,7 @@ from clan_cli.errors import ClanError from clan_cli.git import commit_files from . import groups, secrets, sops +from .filters import get_secrets_filter_for_user from .folders import ( list_objects, remove_object, @@ -38,19 +39,13 @@ def add_user( ) -> None: path = sops_users_folder(flake_dir) / name - groupnames = [p.name for p in groups.get_groups(flake_dir, "users", name)] - - def filter_user_secrets(secret: Path) -> bool: - if secret.joinpath("users", name).exists(): - return True - return any(secret.joinpath("groups", name).exists() for name in groupnames) - write_key(path, key, key_type, overwrite=force) - paths = [path] + updated_paths = [path] - paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets)) + filter_user_secrets = get_secrets_filter_for_user(flake_dir, name) + updated_paths.extend(update_secrets(flake_dir, filter_user_secrets)) commit_files( - paths, + updated_paths, flake_dir, f"Add user {name} to secrets", ) @@ -75,7 +70,8 @@ def remove_user(flake_dir: Path, name: str) -> None: # Remove the user's key: updated_paths.extend(remove_object(sops_users_folder(flake_dir), name)) # Remove the user from any secret where it was used: - updated_paths.extend(update_secrets(flake_dir)) + filter_user_secrets = get_secrets_filter_for_user(flake_dir, name) + updated_paths.extend(update_secrets(flake_dir, filter_user_secrets)) commit_files(updated_paths, flake_dir, f"Remove user {name}") diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index a28f5153b..322f16f2e 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -144,18 +144,13 @@ def _test_identities( cli.run(["secrets", what, "list", "--flake", str(test_flake.path)]) assert "foo" not in output.out - if what == "machines": - # lopter@(2025-02-02): Let's address #2659 first and then figure out - # what we wanna do about secrets when a machine is deleted. - return - - user_symlink = sops_folder / "secrets" / test_secret_name / what / "foo" + user_or_machine_symlink = sops_folder / "secrets" / test_secret_name / what / "foo" err_msg = ( f"Symlink to {what_singular} foo's key in secret " f"{test_secret_name} was not cleaned up after " f"{what_singular} foo was removed." ) - assert not user_symlink.exists(follow_symlinks=False), err_msg + assert not user_or_machine_symlink.exists(follow_symlinks=False), err_msg def test_users(