diff --git a/docs/site/getting-started/secrets.md b/docs/site/getting-started/secrets.md index bd8c449fe..25c9ce52e 100644 --- a/docs/site/getting-started/secrets.md +++ b/docs/site/getting-started/secrets.md @@ -37,9 +37,9 @@ Also add your age public key to the repository with 'clan secrets users add YOUR !!! note It's safe to add any secrets created by the clan CLI and placed in your repository to version control systems like `git`. -### Add Your Public Key +### Add Your Public Key(s) -```bash +```console clan secrets users add $USER --age-key ``` @@ -54,3 +54,46 @@ sops/ └── key.json ``` If you followed the quickstart tutorial all necessary secrets are initialized at this point. + +!!! note + You can add multiple age keys for a user by providing multiple `--age-key ` flags: + + ```console + clan secrets users add $USER \ + --age-key \ + --age-key \ + ... + ``` + +### Manage Your Public Key(s) + +You can list keys for your user with `clan secrets users get $USER`: + +```console +❯ bin/clan secrets users get alice + +[ + { + "publickey": "age1hrrcspp645qtlj29krjpq66pqg990ejaq0djcms6y6evnmgglv5sq0gewu", + "type": "age", + "username": "alice" + }, + { + "publickey": "age13kh4083t3g4x3ktr52nav6h7sy8ynrnky2x58pyp96c5s5nvqytqgmrt79", + "type": "age", + "username": "alice" + } +] +``` + +To add a new key to your user: + +```console +clan secrets users add-key $USER --age-key +``` + +To remove a key from your user: + +```console +clan secrets users remove-key $USER --age-key +``` \ No newline at end of file diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index a3c72ca49..fe8d20522 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -25,7 +25,7 @@ from .types import public_or_private_age_key_type, secret_name_type def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None: machine_path = sops_machines_folder(flake_dir) / name - write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force) + write_key(machine_path, sops.SopsKey(pubkey, "", sops.KeyType.AGE), overwrite=force) paths = [machine_path] filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name) @@ -49,8 +49,8 @@ def remove_machine(flake_dir: Path, name: str) -> None: def get_machine(flake_dir: Path, name: str) -> str: - key, _ = read_key(sops_machines_folder(flake_dir) / name) - return key + key = read_key(sops_machines_folder(flake_dir) / name) + return key.pubkey def has_machine(flake_dir: Path, name: str) -> bool: diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index ab9aa0713..56e54a1c7 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -30,7 +30,7 @@ from .folders import ( from .sops import ( decrypt_file, encrypt_file, - read_key, + read_keys, update_keys, ) from .types import VALID_SECRET_NAME, secret_name_type @@ -104,7 +104,7 @@ def cleanup_dangling_symlinks(folder: Path) -> list[Path]: return removed -def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]: +def collect_keys_for_type(folder: Path) -> set[sops.SopsKey]: if not folder.exists(): return set() keys = set() @@ -122,11 +122,11 @@ def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]: f"Expected {p} to point to {folder} but points to {target.parent}" ) continue - keys.add(read_key(target)) + keys.update(read_keys(target)) return keys -def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: +def collect_keys_for_path(path: Path) -> set[sops.SopsKey]: keys = set() keys.update(collect_keys_for_type(path / "machines")) keys.update(collect_keys_for_type(path / "users")) @@ -154,7 +154,16 @@ def encrypt_secret( add_machines = [] if add_users is None: add_users = [] - key = sops.ensure_admin_public_key(flake_dir) + + keys = sops.ensure_admin_public_key(flake_dir) + + if not keys: + # todo double check the correct command to run + msg = "No keys found. Please run 'clan secrets add-key' to add a key." + raise ClanError(msg) + + username = next(iter(keys)).username + recipient_keys = set() # encrypt_secret can be called before the secret has been created @@ -194,13 +203,14 @@ def encrypt_secret( recipient_keys = collect_keys_for_path(secret_path) - if (key.pubkey, key.key_type) not in recipient_keys: - recipient_keys.add((key.pubkey, key.key_type)) + if not keys.intersection(recipient_keys): + recipient_keys.update(keys) + files_to_commit.extend( allow_member( users_folder(secret_path), sops_users_folder(flake_dir), - key.username, + username, do_update_keys, ) ) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 6da0bdb3a..29d29a861 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -10,7 +10,7 @@ from collections.abc import Iterable, Sequence from contextlib import suppress from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO +from typing import IO, Any from clan_cli.api import API from clan_cli.cmd import Log, RunOpts, run @@ -99,7 +99,7 @@ class KeyType(enum.Enum): raise ClanError(msg) -@dataclasses.dataclass(frozen=True) +@dataclasses.dataclass(frozen=True, order=True) class SopsKey: pubkey: str # Two SopsKey are considered equal even @@ -115,11 +115,9 @@ class SopsKey: } @classmethod - def load_dir(cls, folder: Path) -> "SopsKey": + def load_dir(cls, folder: Path) -> set["SopsKey"]: """Load from the file named `keys.json` in the given directory.""" - pubkey, key_type = read_key(folder) - username = "" - return cls(pubkey, username, key_type) + return read_keys(folder) @classmethod def collect_public_keys(cls) -> Sequence["SopsKey"]: @@ -180,7 +178,7 @@ class Operation(enum.StrEnum): def sops_run( call: Operation, secret_path: Path, - public_keys: Iterable[tuple[str, KeyType]], + public_keys: Iterable[SopsKey], run_opts: RunOpts | None = None, ) -> tuple[int, str]: """Call the sops binary for the given operation.""" @@ -201,8 +199,8 @@ def sops_run( keys_by_type: dict[KeyType, list[str]] = {} keys_by_type = {key_type: [] for key_type in KeyType} - for key, key_type in public_keys: - keys_by_type[key_type].append(key) + for key in public_keys: + keys_by_type[key.key_type].append(key.pubkey) it = keys_by_type.items() key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] rules = {"creation_rules": [{"key_groups": key_groups}]} @@ -299,7 +297,7 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: +def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None: folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -307,19 +305,35 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: for user in folder.iterdir(): if not (user / "key.json").exists(): continue - this_pub_key, this_key_type = read_key(user) - if key.pubkey == this_pub_key and key.key_type == this_key_type: - return SopsKey(key.pubkey, user.name, key.key_type) + + keys = read_keys(user) + if key in keys: + return {SopsKey(key.pubkey, user.name, key.key_type)} + + return None + + +def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None: + folder = sops_users_folder(flake_dir) + + if folder.exists(): + for user in folder.iterdir(): + if not (user / "key.json").exists(): + continue + + keys = read_keys(user) + if key in keys: + return {SopsKey(key.pubkey, user.name, key.key_type)} return None @API.register -def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: - maybe_key = maybe_get_user_or_machine(flake_dir, key) - if maybe_key: - return maybe_key - msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)" +def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey]: + maybe_keys = maybe_get_user_or_machine(flake_dir, key) + if maybe_keys: + return maybe_keys + msg = f"A sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)" raise ClanError(msg) @@ -351,7 +365,7 @@ def maybe_get_admin_public_key() -> None | SopsKey: return keyring[0] -def ensure_admin_public_key(flake_dir: Path) -> SopsKey: +def ensure_admin_public_key(flake_dir: Path) -> set[SopsKey]: key = maybe_get_admin_public_key() if key: return ensure_user_or_machine(flake_dir, key) @@ -359,7 +373,7 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey: raise ClanError(msg) -def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]: +def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]: secret_path = secret_path / "secret" error_msg = f"Could not update keys for {secret_path}" @@ -376,7 +390,7 @@ def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[ def encrypt_file( secret_path: Path, content: str | IO[bytes] | bytes | None, - pubkeys: list[tuple[str, KeyType]], + pubkeys: list[SopsKey], ) -> None: folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) @@ -438,7 +452,7 @@ def encrypt_file( def decrypt_file(secret_path: Path) -> str: # decryption uses private keys from the environment or default paths: - no_public_keys_needed: list[tuple[str, KeyType]] = [] + no_public_keys_needed: list[SopsKey] = [] _, stdout = sops_run( Operation.DECRYPT, @@ -475,34 +489,100 @@ def get_meta(secret_path: Path) -> dict: return json.load(f) -def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None: +def write_key(path: Path, key: SopsKey, overwrite: bool) -> None: + return write_keys(path, [key], overwrite) + + +def write_keys(path: Path, keys: Iterable[SopsKey], overwrite: bool) -> None: path.mkdir(parents=True, exist_ok=True) try: flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC if not overwrite: flags |= os.O_EXCL fd = os.open(path / "key.json", flags) - except FileExistsError as e: + except FileExistsError: msg = f"{path.name} already exists in {path}. Use --force to overwrite." - raise ClanError(msg) from e + raise ClanError(msg) from None with os.fdopen(fd, "w") as f: - contents = {"publickey": publickey, "type": key_type.name.lower()} + contents = [ + {"publickey": key.pubkey, "type": key.key_type.name.lower()} + for key in sorted(keys) + ] json.dump(contents, f, indent=2) -def read_key(path: Path) -> tuple[str, KeyType]: - with Path(path / "key.json").open() as f: - try: - key = json.load(f) - except json.JSONDecodeError as e: - msg = f"Failed to decode {path.name}: {e}" - raise ClanError(msg) from e +def append_keys(path: Path, keys: Iterable[SopsKey]) -> None: + path.mkdir(parents=True, exist_ok=True) + + # key file must already exist + try: + current_keys = set(read_keys(path)) + except FileNotFoundError: + msg = f"{path} does not exist." + raise ClanError(msg) from None + + # add the specified keys to the set + # de-duplication is natural + current_keys.update(keys) + + # write the new key set + return write_keys(path, sorted(current_keys), overwrite=True) + + +def remove_keys(path: Path, keys: Iterable[SopsKey]) -> None: + path.mkdir(parents=True, exist_ok=True) + + # key file must already exist + try: + current_keys = set(read_keys(path)) + except FileNotFoundError: + msg = f"{path} does not exist." + raise ClanError(msg) from None + + current_keys.difference_update(keys) + + if not current_keys: + msg = f"No keys would remain in {path}. At least one key is required. Aborting." + raise ClanError(msg) + + # write the new key set + return write_keys(path, sorted(current_keys), overwrite=True) + + +def parse_key(key: Any) -> SopsKey: + if not isinstance(key, dict): + msg = f"Expected a dict but {type(key)!r} was provided" + raise ClanError(msg) key_type = KeyType.validate(key.get("type")) if key_type is None: - msg = f'Invalid key type in {path.name}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).' + msg = f'Invalid key type in {key}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).' raise ClanError(msg) publickey = key.get("publickey") if not publickey: - msg = f"{path.name} does not contain a public key" + msg = f"{key} does not contain a public key" raise ClanError(msg) - return publickey, key_type + return SopsKey(publickey, "", key_type) + + +def read_key(path: Path) -> SopsKey: + keys = read_keys(path) + if len(keys) != 1: + msg = f"Expected exactly one key but {len(keys)} were found" + raise ClanError(msg) + return next(iter(keys)) + + +def read_keys(path: Path) -> set[SopsKey]: + with Path(path / "key.json").open() as f: + try: + keys = json.load(f) + except json.JSONDecodeError as e: + msg = f"Failed to decode {path.name}: {e}" + raise ClanError(msg) from e + + if isinstance(keys, dict): + return {parse_key(keys)} + if isinstance(keys, list): + return set(map(parse_key, keys)) + msg = f"Expected a dict or array but {type(keys)!r} was provided" + raise ClanError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index f7c69d9c6..4c3bdd88f 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -2,6 +2,7 @@ import argparse import json import logging import sys +from collections.abc import Iterable from pathlib import Path from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users @@ -18,7 +19,7 @@ from .folders import ( sops_users_folder, ) from .secrets import update_secrets -from .sops import read_key, write_key +from .sops import append_keys, read_keys, remove_keys, write_keys from .types import ( VALID_USER_NAME, public_or_private_age_key_type, @@ -32,13 +33,12 @@ log = logging.getLogger(__name__) def add_user( flake_dir: Path, name: str, - key: str, - key_type: sops.KeyType, + keys: Iterable[sops.SopsKey], force: bool, ) -> None: path = sops_users_folder(flake_dir) / name - write_key(path, key, key_type, overwrite=force) + write_keys(path, keys, overwrite=force) updated_paths = [path] filter_user_secrets = get_secrets_filter_for_user(flake_dir, name) @@ -74,9 +74,9 @@ def remove_user(flake_dir: Path, name: str) -> None: commit_files(updated_paths, flake_dir, f"Remove user {name}") -def get_user(flake_dir: Path, name: str) -> sops.SopsKey: - key, key_type = read_key(sops_users_folder(flake_dir) / name) - return sops.SopsKey(key, name, key_type) +def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]: + keys = read_keys(sops_users_folder(flake_dir) / name) + return {sops.SopsKey(key.pubkey, name, key.key_type) for key in keys} def list_users(flake_dir: Path) -> list[str]: @@ -124,32 +124,82 @@ def list_command(args: argparse.Namespace) -> None: print("\n".join(lst)) +def add_user_key( + flake_dir: Path, + name: str, + keys: Iterable[sops.SopsKey], +) -> None: + path = sops_users_folder(flake_dir) / name + + append_keys(path, keys) + updated_paths = [path] + + filter_user_secrets = get_secrets_filter_for_user(flake_dir, name) + updated_paths.extend(update_secrets(flake_dir, filter_user_secrets)) + commit_files( + updated_paths, + flake_dir, + f"Add key(s) for user {name} to secrets", + ) + + +def remove_user_key( + flake_dir: Path, + name: str, + keys: Iterable[sops.SopsKey], +) -> None: + path = sops_users_folder(flake_dir) / name + + remove_keys(path, keys) + updated_paths = [path] + + filter_user_secrets = get_secrets_filter_for_user(flake_dir, name) + updated_paths.extend(update_secrets(flake_dir, filter_user_secrets)) + commit_files( + updated_paths, + flake_dir, + f"Remove key(s) for user {name} from secrets", + ) + + +def _key_args(args: argparse.Namespace) -> Iterable[sops.SopsKey]: + age_keys = args.age_key or [] + pgp_keys = args.pgp_key or [] + + key_count = len(age_keys) + len(pgp_keys) + if args.agekey: + key_count += 1 + + if key_count == 0: + err_msg = ( + "Please provide at least one key through `--pgp-key`, " + "`--age-key`, or as a positional (age key) argument." + ) + raise ClanError(err_msg) + + age_keys = [sops.SopsKey(key, "", sops.KeyType.AGE) for key in age_keys] + if args.agekey: + age_keys.append(sops.SopsKey(args.agekey, "", sops.KeyType.AGE)) + + pgp_keys = [sops.SopsKey(key, "", sops.KeyType.PGP) for key in pgp_keys] + + return age_keys + pgp_keys + + def add_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - keys_args = (args.age_key, args.agekey, args.pgp_key) - keys_count = sum(1 if key else 0 for key in keys_args) - if keys_count != 1: - err_msg = ( - f"Please provide one key (got {keys_count}) through `--pgp-key`, " - f"`--age-key`, or as a positional (age key) argument." - ) - raise ClanError(err_msg) - if args.age_key or args.agekey: - key_type = sops.KeyType.AGE - else: - key_type = sops.KeyType.PGP - key = args.agekey or args.age_key or args.pgp_key - add_user(args.flake.path, args.user, key, key_type, args.force) + + add_user(args.flake.path, args.user, _key_args(args), args.force) def get_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - key = get_user(args.flake.path, args.user) - json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) + keys = get_user(args.flake.path, args.user) + json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True) def remove_command(args: argparse.Namespace) -> None: @@ -173,6 +223,22 @@ def remove_secret_command(args: argparse.Namespace) -> None: remove_secret(args.flake.path, args.user, args.secret) +def add_key_command(args: argparse.Namespace) -> None: + if args.flake is None: + msg = "Could not find clan flake toplevel directory" + raise ClanError(msg) + + add_user_key(args.flake.path, args.user, _key_args(args)) + + +def remove_key_command(args: argparse.Namespace) -> None: + if args.flake is None: + msg = "Could not find clan flake toplevel directory" + raise ClanError(msg) + + remove_user_key(args.flake.path, args.user, _key_args(args)) + + def register_users_parser(parser: argparse.ArgumentParser) -> None: subparser = parser.add_subparsers( title="command", @@ -188,30 +254,7 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: "-f", "--force", help="overwrite existing user", action="store_true" ) add_parser.add_argument("user", help="the name of the user", type=user_name_type) - key_type = add_parser.add_mutually_exclusive_group(required=True) - key_type.add_argument( - "agekey", - help="public or private age key of the user. " - "Execute 'clan secrets key --help' on how to retrieve a key. " - "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", - type=public_or_private_age_key_type, - nargs="?", - ) - key_type.add_argument( - "--age-key", - help="public or private age key of the user. " - "Execute 'clan secrets key --help' on how to retrieve a key. " - "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", - type=public_or_private_age_key_type, - ) - key_type.add_argument( - "--pgp-key", - help=( - "public PGP encryption key of the user. " - # Use --fingerprint --fingerprint to get fingerprints for subkeys: - "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." - ), - ) + _add_key_flags(add_parser) add_parser.set_defaults(func=add_command) get_parser = subparser.add_parser("get", help="get a user public key") @@ -253,3 +296,52 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: ) add_dynamic_completer(remove_secrets_action, complete_secrets) remove_secret_parser.set_defaults(func=remove_secret_command) + + add_key_parser = subparser.add_parser( + "add-key", help="add one or more keys for a user" + ) + add_key_user_action = add_key_parser.add_argument( + "user", help="the name of the user", type=user_name_type + ) + add_dynamic_completer(add_key_user_action, complete_users) + _add_key_flags(add_key_parser) + add_key_parser.set_defaults(func=add_key_command) + + remove_key_parser = subparser.add_parser( + "remove-key", help="remove one or more keys for a user" + ) + remove_key_user_action = remove_key_parser.add_argument( + "user", help="the name of the user", type=user_name_type + ) + add_dynamic_completer(remove_key_user_action, complete_users) + _add_key_flags(remove_key_parser) + remove_key_parser.set_defaults(func=remove_key_command) + + +def _add_key_flags(parser: argparse.ArgumentParser) -> None: + key_type = parser.add_mutually_exclusive_group(required=True) + key_type.add_argument( + "agekey", + help="public or private age key for a user. " + "Execute 'clan secrets key --help' on how to retrieve a key. " + "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", + type=public_or_private_age_key_type, + nargs="?", + ) + key_type.add_argument( + "--age-key", + help="public or private age key for a user. " + "Execute 'clan secrets key --help' on how to retrieve a key. " + "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", + type=public_or_private_age_key_type, + action="append", + ) + key_type.add_argument( + "--pgp-key", + help=( + "public PGP encryption key for a user. " + # Use --fingerprint --fingerprint to get fingerprints for subkeys: + "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." + ), + action="append", + ) diff --git a/pkgs/clan-cli/clan_cli/tests/age_keys.py b/pkgs/clan-cli/clan_cli/tests/age_keys.py index 2ee179e12..084b6d354 100644 --- a/pkgs/clan-cli/clan_cli/tests/age_keys.py +++ b/pkgs/clan-cli/clan_cli/tests/age_keys.py @@ -1,5 +1,7 @@ +import dataclasses import json import os +from collections.abc import Iterable from pathlib import Path import pytest @@ -7,18 +9,18 @@ from clan_cli.secrets.folders import sops_secrets_folder from clan_cli.tests.helpers import cli +@dataclasses.dataclass(frozen=True) class KeyPair: - def __init__(self, pubkey: str, privkey: str) -> None: - self.pubkey = pubkey - self.privkey = privkey + pubkey: str + privkey: str class SopsSetup: - """Hold a list of three key pairs and create an "admin" user in the clan. + """Hold a list of key pairs and create an "admin" user in the clan. The first key in the list is used as the admin key and the private part of the key is exposed in the - `SOPS_AGE_KEY` environment variable, the two others can + `SOPS_AGE_KEY` environment variable, the others can be used to add machines or other users. """ @@ -52,6 +54,14 @@ KEYS = [ "age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp", "AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF", ), + KeyPair( + "age1n58rxm8y6h9prmwn0qk7nggfsu9f9j4u35dxg7akpkjd5vgsavssfzmq9y", + "AGE-SECRET-KEY-1YU2JVE445KT6S8UN3403NHH6EZU404RMEH9RTME9SPWXWMLJS0LQM5NWM7", + ), + KeyPair( + "age1eyyhln9g3cdwtrwpckugvqgtf5p8ugt0426sw38ra3wkc0t4rfhslq7txv", + "AGE-SECRET-KEY-1567QKA63Y9P62SHF5TCHVCT5GZX2LZ8NS0E9RKA2QHDA662SF5LQ2VJJYX", + ), ] @@ -73,7 +83,7 @@ def sops_setup( def assert_secrets_file_recipients( flake_path: Path, secret_name: str, - expected_age_recipients_keypairs: list["KeyPair"], + expected_age_recipients_keypairs: Iterable["KeyPair"], err_msg: str | None = None, ) -> None: """Checks that the recipients of a secrets file matches expectations. diff --git a/pkgs/clan-cli/clan_cli/tests/test_machines_cli.py b/pkgs/clan-cli/clan_cli/tests/test_machines_cli.py index e6678a26b..db104635d 100644 --- a/pkgs/clan-cli/clan_cli/tests/test_machines_cli.py +++ b/pkgs/clan-cli/clan_cli/tests/test_machines_cli.py @@ -59,7 +59,7 @@ def test_machine_delete( ) -> None: flake = flake_with_sops - admin_key, machine_key, machine2_key = sops_setup.keys + admin_key, machine_key, machine2_key, *xs = sops_setup.keys # create a couple machines with their keys for name, key in (("my-machine", machine_key), ("my-machine2", machine2_key)): diff --git a/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py b/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py index 86233cb1b..2364d2f9e 100644 --- a/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py @@ -159,6 +159,86 @@ def test_users( ) -> None: _test_identities("users", test_flake, capture_output, age_keys, monkeypatch) + # some additional user-specific tests + + admin_key = age_keys[2] + sops_folder = test_flake.path / "sops" + + user_keys = { + "bob": [age_keys[0], age_keys[1]], + "alice": [age_keys[2]], + "charlie": [age_keys[3], age_keys[4]], + } + + for user, keys in user_keys.items(): + key_args = [f"--age-key={key.pubkey}" for key in keys] + + # add the user keys + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + user, + *key_args, + ] + ) + assert (sops_folder / "users" / user / "key.json").exists() + + # check they are returned in get + with capture_output as output: + cli.run(["secrets", "users", "get", "--flake", str(test_flake.path), user]) + + for key in keys: + assert key.pubkey in output.out + + # set a secret + secret_name = f"{user}_secret" + cli.run( + [ + "secrets", + "set", + "--flake", + str(test_flake.path), + "--user", + user, + secret_name, + ] + ) + + # check the secret has each of our user's keys as a recipient + # in addition the admin key should be there + assert_secrets_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[admin_key, *keys], + ) + + if len(keys) == 1: + continue + + # remove one of the keys + cli.run( + [ + "secrets", + "users", + "remove-key", + "--flake", + str(test_flake.path), + user, + keys[0].pubkey, + ] + ) + + # check the secret has been updated + assert_secrets_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[admin_key, *keys[1:]], + ) + def test_machines( test_flake: FlakeForTest, @@ -786,7 +866,10 @@ def test_secrets_key_generate_gpg( "testuser", ] ) - key = json.loads(output.out) + keys = json.loads(output.out) + assert len(keys) == 1 + + key = keys[0] assert key["type"] == "pgp" assert key["publickey"] == gpg_key.fingerprint diff --git a/pkgs/clan-cli/clan_cli/vars/keygen.py b/pkgs/clan-cli/clan_cli/vars/keygen.py index 6bf8290c5..f9a79d2c2 100644 --- a/pkgs/clan-cli/clan_cli/vars/keygen.py +++ b/pkgs/clan-cli/clan_cli/vars/keygen.py @@ -5,7 +5,7 @@ import os from clan_cli.errors import ClanError from clan_cli.flake import Flake from clan_cli.secrets.key import generate_key -from clan_cli.secrets.sops import KeyType, maybe_get_admin_public_key +from clan_cli.secrets.sops import maybe_get_admin_public_key from clan_cli.secrets.users import add_user log = logging.getLogger(__name__) @@ -24,8 +24,7 @@ def keygen(user: str | None, flake: Flake, force: bool) -> None: add_user( flake_dir=flake.path, name=user, - key=pub_key.pubkey, - key_type=KeyType.AGE, + keys=[pub_key], force=force, ) diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index bcdf147c7..02cb9f283 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -92,7 +92,7 @@ class SecretStore(StoreBase): secret_path = self.secret_path(generator, secret_name) recipient = sops.SopsKey.load_dir(key_dir) recipients = sops.get_recipients(secret_path) - return recipient in recipients + return len(recipient.intersection(recipients)) > 0 def secret_path(self, generator: Generator, secret_name: str) -> Path: return self.directory(generator, secret_name) @@ -258,10 +258,7 @@ class SecretStore(StoreBase): ) ) - return { - sops.SopsKey(pubkey=key, username="", key_type=key_type) - for (key, key_type) in keys - } + return keys # } def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]: