clan-cli: secrets machines remove: update secrets after removing the key
Quick follow up to PR #2781, this commit does the same kind of logic but for machines instead of users and groups. Note that this only affects the `clan secrets machines remove` sub-command, and that `clan machines delete` still leaves unusable secrets & vars behind. This can be addressed in a different change.
This commit is contained in:
33
pkgs/clan-cli/clan_cli/secrets/filters.py
Normal file
33
pkgs/clan-cli/clan_cli/secrets/filters.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
import functools
|
||||||
|
from collections.abc import Callable
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .groups import get_groups
|
||||||
|
|
||||||
|
|
||||||
|
def get_secrets_filter_for_users_or_machines(
|
||||||
|
what: str,
|
||||||
|
flake_dir: Path,
|
||||||
|
name: str,
|
||||||
|
) -> Callable[[Path], bool]:
|
||||||
|
groups_names = get_groups(flake_dir, what, name)
|
||||||
|
|
||||||
|
def filter_secrets(secret: Path) -> bool:
|
||||||
|
if (secret / what / name).is_symlink():
|
||||||
|
return True
|
||||||
|
groups_folder = secret / "groups"
|
||||||
|
return any((groups_folder / name).is_symlink() for name in groups_names)
|
||||||
|
|
||||||
|
return filter_secrets
|
||||||
|
|
||||||
|
|
||||||
|
get_secrets_filter_for_user = functools.partial(
|
||||||
|
get_secrets_filter_for_users_or_machines,
|
||||||
|
"users",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
get_secrets_filter_for_machine = functools.partial(
|
||||||
|
get_secrets_filter_for_users_or_machines,
|
||||||
|
"machines",
|
||||||
|
)
|
||||||
@@ -246,18 +246,18 @@ def add_secret(flake_dir: Path, group: str, name: str) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_groups(
|
def get_groups(flake_dir: Path, what: str, name: str) -> list[str]:
|
||||||
flake_dir: Path,
|
"""Returns the list of group names the given user or machine is part of."""
|
||||||
type_name: str,
|
assert what == "users" or what == "machines"
|
||||||
name: str,
|
|
||||||
) -> list[Path]:
|
|
||||||
groups_dir = sops_groups_folder(flake_dir)
|
groups_dir = sops_groups_folder(flake_dir)
|
||||||
|
if not groups_dir.exists():
|
||||||
|
return []
|
||||||
|
|
||||||
groups = []
|
groups = []
|
||||||
if groups_dir.exists():
|
|
||||||
for group in groups_dir.iterdir():
|
for group in groups_dir.iterdir():
|
||||||
if group.is_dir() and (group / type_name / name).exists():
|
if group.is_dir() and (group / what / name).is_symlink():
|
||||||
groups.append(group)
|
groups.append(group.name)
|
||||||
return groups
|
return groups
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -11,13 +11,13 @@ from clan_cli.git import commit_files
|
|||||||
from clan_cli.machines.types import machine_name_type, validate_hostname
|
from clan_cli.machines.types import machine_name_type, validate_hostname
|
||||||
|
|
||||||
from . import secrets, sops
|
from . import secrets, sops
|
||||||
|
from .filters import get_secrets_filter_for_machine
|
||||||
from .folders import (
|
from .folders import (
|
||||||
list_objects,
|
list_objects,
|
||||||
remove_object,
|
remove_object,
|
||||||
sops_machines_folder,
|
sops_machines_folder,
|
||||||
sops_secrets_folder,
|
sops_secrets_folder,
|
||||||
)
|
)
|
||||||
from .groups import get_groups
|
|
||||||
from .secrets import update_secrets
|
from .secrets import update_secrets
|
||||||
from .sops import read_key, write_key
|
from .sops import read_key, write_key
|
||||||
from .types import public_or_private_age_key_type, secret_name_type
|
from .types import public_or_private_age_key_type, secret_name_type
|
||||||
@@ -28,14 +28,8 @@ def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None:
|
|||||||
write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
|
write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
|
||||||
paths = [machine_path]
|
paths = [machine_path]
|
||||||
|
|
||||||
groups = get_groups(flake_dir, "machines", name)
|
filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name)
|
||||||
|
paths.extend(update_secrets(flake_dir, filter_machine_secrets))
|
||||||
def filter_machine_secrets(secret: Path) -> bool:
|
|
||||||
if (secret / "machines" / name).exists():
|
|
||||||
return True
|
|
||||||
return any(secret.joinpath("groups", group.name).exists() for group in groups)
|
|
||||||
|
|
||||||
paths.extend(update_secrets(flake_dir, filter_secrets=filter_machine_secrets))
|
|
||||||
commit_files(
|
commit_files(
|
||||||
paths,
|
paths,
|
||||||
flake_dir,
|
flake_dir,
|
||||||
@@ -45,6 +39,8 @@ def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None:
|
|||||||
|
|
||||||
def remove_machine(flake_dir: Path, name: str) -> None:
|
def remove_machine(flake_dir: Path, name: str) -> None:
|
||||||
removed_paths = remove_object(sops_machines_folder(flake_dir), name)
|
removed_paths = remove_object(sops_machines_folder(flake_dir), name)
|
||||||
|
filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name)
|
||||||
|
removed_paths.extend(update_secrets(flake_dir, filter_machine_secrets))
|
||||||
commit_files(
|
commit_files(
|
||||||
removed_paths,
|
removed_paths,
|
||||||
flake_dir,
|
flake_dir,
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from clan_cli.errors import ClanError
|
|||||||
from clan_cli.git import commit_files
|
from clan_cli.git import commit_files
|
||||||
|
|
||||||
from . import groups, secrets, sops
|
from . import groups, secrets, sops
|
||||||
|
from .filters import get_secrets_filter_for_user
|
||||||
from .folders import (
|
from .folders import (
|
||||||
list_objects,
|
list_objects,
|
||||||
remove_object,
|
remove_object,
|
||||||
@@ -38,19 +39,13 @@ def add_user(
|
|||||||
) -> None:
|
) -> None:
|
||||||
path = sops_users_folder(flake_dir) / name
|
path = sops_users_folder(flake_dir) / name
|
||||||
|
|
||||||
groupnames = [p.name for p in groups.get_groups(flake_dir, "users", name)]
|
|
||||||
|
|
||||||
def filter_user_secrets(secret: Path) -> bool:
|
|
||||||
if secret.joinpath("users", name).exists():
|
|
||||||
return True
|
|
||||||
return any(secret.joinpath("groups", name).exists() for name in groupnames)
|
|
||||||
|
|
||||||
write_key(path, key, key_type, overwrite=force)
|
write_key(path, key, key_type, overwrite=force)
|
||||||
paths = [path]
|
updated_paths = [path]
|
||||||
|
|
||||||
paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets))
|
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
|
||||||
|
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
|
||||||
commit_files(
|
commit_files(
|
||||||
paths,
|
updated_paths,
|
||||||
flake_dir,
|
flake_dir,
|
||||||
f"Add user {name} to secrets",
|
f"Add user {name} to secrets",
|
||||||
)
|
)
|
||||||
@@ -75,7 +70,8 @@ def remove_user(flake_dir: Path, name: str) -> None:
|
|||||||
# Remove the user's key:
|
# Remove the user's key:
|
||||||
updated_paths.extend(remove_object(sops_users_folder(flake_dir), name))
|
updated_paths.extend(remove_object(sops_users_folder(flake_dir), name))
|
||||||
# Remove the user from any secret where it was used:
|
# Remove the user from any secret where it was used:
|
||||||
updated_paths.extend(update_secrets(flake_dir))
|
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
|
||||||
|
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
|
||||||
commit_files(updated_paths, flake_dir, f"Remove user {name}")
|
commit_files(updated_paths, flake_dir, f"Remove user {name}")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -144,18 +144,13 @@ def _test_identities(
|
|||||||
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
|
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
|
||||||
assert "foo" not in output.out
|
assert "foo" not in output.out
|
||||||
|
|
||||||
if what == "machines":
|
user_or_machine_symlink = sops_folder / "secrets" / test_secret_name / what / "foo"
|
||||||
# lopter@(2025-02-02): Let's address #2659 first and then figure out
|
|
||||||
# what we wanna do about secrets when a machine is deleted.
|
|
||||||
return
|
|
||||||
|
|
||||||
user_symlink = sops_folder / "secrets" / test_secret_name / what / "foo"
|
|
||||||
err_msg = (
|
err_msg = (
|
||||||
f"Symlink to {what_singular} foo's key in secret "
|
f"Symlink to {what_singular} foo's key in secret "
|
||||||
f"{test_secret_name} was not cleaned up after "
|
f"{test_secret_name} was not cleaned up after "
|
||||||
f"{what_singular} foo was removed."
|
f"{what_singular} foo was removed."
|
||||||
)
|
)
|
||||||
assert not user_symlink.exists(follow_symlinks=False), err_msg
|
assert not user_or_machine_symlink.exists(follow_symlinks=False), err_msg
|
||||||
|
|
||||||
|
|
||||||
def test_users(
|
def test_users(
|
||||||
|
|||||||
Reference in New Issue
Block a user