clan-cli: secrets: properly update secrets when an user is removed
Fixes #2659.
This commit is contained in:
@@ -82,6 +82,11 @@ def update_secrets(
|
|||||||
for path in secret_paths:
|
for path in secret_paths:
|
||||||
if not filter_secrets(path):
|
if not filter_secrets(path):
|
||||||
continue
|
continue
|
||||||
|
# clean-up non-existent users, groups, and machines
|
||||||
|
# from the secret before we update it:
|
||||||
|
changed_files.extend(cleanup_dangling_symlinks(path / "users"))
|
||||||
|
changed_files.extend(cleanup_dangling_symlinks(path / "groups"))
|
||||||
|
changed_files.extend(cleanup_dangling_symlinks(path / "machines"))
|
||||||
changed_files.extend(
|
changed_files.extend(
|
||||||
update_keys(
|
update_keys(
|
||||||
path,
|
path,
|
||||||
@@ -91,6 +96,17 @@ def update_secrets(
|
|||||||
return changed_files
|
return changed_files
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_dangling_symlinks(folder: Path) -> list[Path]:
|
||||||
|
if not folder.exists():
|
||||||
|
return []
|
||||||
|
removed: list[Path] = []
|
||||||
|
for link in folder.iterdir():
|
||||||
|
if link.is_symlink() and not link.exists():
|
||||||
|
link.unlink()
|
||||||
|
removed.append(folder / link)
|
||||||
|
return removed
|
||||||
|
|
||||||
|
|
||||||
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
||||||
if not folder.exists():
|
if not folder.exists():
|
||||||
return set()
|
return set()
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -7,14 +9,14 @@ from clan_cli.completions import add_dynamic_completer, complete_secrets, comple
|
|||||||
from clan_cli.errors import ClanError
|
from clan_cli.errors import ClanError
|
||||||
from clan_cli.git import commit_files
|
from clan_cli.git import commit_files
|
||||||
|
|
||||||
from . import secrets, sops
|
from . import groups, secrets, sops
|
||||||
from .folders import (
|
from .folders import (
|
||||||
list_objects,
|
list_objects,
|
||||||
remove_object,
|
remove_object,
|
||||||
|
sops_groups_folder,
|
||||||
sops_secrets_folder,
|
sops_secrets_folder,
|
||||||
sops_users_folder,
|
sops_users_folder,
|
||||||
)
|
)
|
||||||
from .groups import get_groups
|
|
||||||
from .secrets import update_secrets
|
from .secrets import update_secrets
|
||||||
from .sops import read_key, write_key
|
from .sops import read_key, write_key
|
||||||
from .types import (
|
from .types import (
|
||||||
@@ -24,6 +26,8 @@ from .types import (
|
|||||||
user_name_type,
|
user_name_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def add_user(
|
def add_user(
|
||||||
flake_dir: Path,
|
flake_dir: Path,
|
||||||
@@ -34,12 +38,12 @@ def add_user(
|
|||||||
) -> None:
|
) -> None:
|
||||||
path = sops_users_folder(flake_dir) / name
|
path = sops_users_folder(flake_dir) / name
|
||||||
|
|
||||||
groups = get_groups(flake_dir, "users", name)
|
groupnames = [p.name for p in groups.get_groups(flake_dir, "users", name)]
|
||||||
|
|
||||||
def filter_user_secrets(secret: Path) -> bool:
|
def filter_user_secrets(secret: Path) -> bool:
|
||||||
if secret.joinpath("users", name).exists():
|
if secret.joinpath("users", name).exists():
|
||||||
return True
|
return True
|
||||||
return any(secret.joinpath("groups", group.name).exists() for group in groups)
|
return any(secret.joinpath("groups", name).exists() for name in groupnames)
|
||||||
|
|
||||||
write_key(path, key, key_type, overwrite=force)
|
write_key(path, key, key_type, overwrite=force)
|
||||||
paths = [path]
|
paths = [path]
|
||||||
@@ -53,12 +57,24 @@ def add_user(
|
|||||||
|
|
||||||
|
|
||||||
def remove_user(flake_dir: Path, name: str) -> None:
|
def remove_user(flake_dir: Path, name: str) -> None:
|
||||||
removed_paths = remove_object(sops_users_folder(flake_dir), name)
|
updated_paths: list[Path] = []
|
||||||
commit_files(
|
# Remove the user from any group where it belonged:
|
||||||
removed_paths,
|
groups_dir = sops_groups_folder(flake_dir)
|
||||||
flake_dir,
|
if groups_dir.exists():
|
||||||
f"Remove user {name}",
|
for group in os.listdir(groups_dir):
|
||||||
)
|
group_folder = groups_dir / group
|
||||||
|
if not group_folder.is_dir():
|
||||||
|
continue
|
||||||
|
memberships = group_folder / "users"
|
||||||
|
if not (memberships / name).exists():
|
||||||
|
continue
|
||||||
|
log.info(f"Removing user {name} from group {group}")
|
||||||
|
updated_paths.extend(groups.remove_member(flake_dir, memberships, name))
|
||||||
|
# Remove the user's key:
|
||||||
|
updated_paths.extend(remove_object(sops_users_folder(flake_dir), name))
|
||||||
|
# Remove the user from any secret where it was used:
|
||||||
|
updated_paths.extend(update_secrets(flake_dir))
|
||||||
|
commit_files(updated_paths, flake_dir, f"Remove user {name}")
|
||||||
|
|
||||||
|
|
||||||
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
|
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
|
||||||
|
|||||||
Reference in New Issue
Block a user