From 61ceb44a7119c1d0c28f697ea7aee491e27b5817 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Fri, 27 Sep 2024 11:31:59 -0700 Subject: [PATCH 01/14] Draft: clan-cli: secrets: Add support for PGP keys with sops-nix To use a PGP key instead of an age key you can set `SOPS_PGP_FP`. (You can use `gpg -k --fingerprint --fingerprint` to get your PGP encryption key fingerprint, remove spaces from it). The internal manifest file already supported a type field, and so I built from there. With those changes, I was able to add my PGP key, and update all my secrets with it, instead of the age key originally generated: ``` % clan secrets key show | jq { "key": "ADB6276965590A096004F6D1E114CBAE8FA29165", "type": "pgp" } % clan secrets key update % for s in $(clan secrets list) ; do clan secrets users add-secret kal-pgp-from-2022-12-to-2024-12 "$s"; done % for s in $(clan secrets list) ; do clan secrets users remove-secret --debug kal "$s" ; done ``` --- pkgs/clan-cli/clan_cli/secrets/key.py | 13 ++-- pkgs/clan-cli/clan_cli/secrets/secrets.py | 22 +++--- pkgs/clan-cli/clan_cli/secrets/sops.py | 85 +++++++++++++++-------- pkgs/clan-cli/clan_cli/secrets/types.py | 4 +- pkgs/clan-cli/clan_cli/secrets/users.py | 39 ++++++++--- 5 files changed, 106 insertions(+), 57 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index 4fa9cd378..484fc1fec 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -5,8 +5,9 @@ from pathlib import Path from clan_cli.errors import ClanError from clan_cli.git import commit_files +from . import sops from .secrets import update_secrets -from .sops import default_admin_key_path, generate_private_key, get_public_key +from .sops import default_admin_key_path, generate_private_key log = logging.getLogger(__name__) @@ -45,10 +46,6 @@ def generate_key() -> str: return pub_key -def show_key() -> str: - return get_public_key(default_admin_key_path().read_text()) - - def generate_command(args: argparse.Namespace) -> None: pub_key = generate_key() log.info( @@ -57,7 +54,9 @@ def generate_command(args: argparse.Namespace) -> None: def show_command(args: argparse.Namespace) -> None: - print(show_key()) + key, type = sops.maybe_get_public_key() + type_or_null = f'"{type.name.lower()}"' if type else "null" + print(f'{{"key": "{key}", "type": {type_or_null}}}') def update_command(args: argparse.Namespace) -> None: @@ -76,7 +75,7 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None: parser_generate = subparser.add_parser("generate", help="generate age key") parser_generate.set_defaults(func=generate_command) - parser_show = subparser.add_parser("show", help="show age public key") + parser_show = subparser.add_parser("show", help="show public key") parser_show.set_defaults(func=show_command) parser_update = subparser.add_parser( diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index cf262d9d5..51f838731 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -1,5 +1,7 @@ import argparse +import functools import getpass +import operator import os import shutil import sys @@ -20,6 +22,7 @@ from clan_cli.completions import ( from clan_cli.errors import ClanError from clan_cli.git import commit_files +from . import sops from .folders import ( list_objects, sops_groups_folder, @@ -42,13 +45,13 @@ def update_secrets( changed_files.extend( update_keys( secret_path, - sorted(collect_keys_for_path(secret_path)), + sorted_keys(collect_keys_for_path(secret_path)), ) ) return changed_files -def collect_keys_for_type(folder: Path) -> set[str]: +def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]: if not folder.exists(): return set() keys = set() @@ -68,7 +71,7 @@ def collect_keys_for_type(folder: Path) -> set[str]: return keys -def collect_keys_for_path(path: Path) -> set[str]: +def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: keys = set() keys.update(collect_keys_for_type(path / "machines")) keys.update(collect_keys_for_type(path / "users")) @@ -132,8 +135,8 @@ def encrypt_secret( recipient_keys = collect_keys_for_path(secret_path) - if key.pubkey not in recipient_keys: - recipient_keys.add(key.pubkey) + if (key.pubkey, key.type) not in recipient_keys: + recipient_keys.add((key.pubkey, key.type)) files_to_commit.extend( allow_member( users_folder(secret_path), @@ -144,7 +147,7 @@ def encrypt_secret( ) secret_path = secret_path / "secret" - encrypt_file(secret_path, value, sorted(recipient_keys)) + encrypt_file(secret_path, value, sorted_keys(recipient_keys)) files_to_commit.append(secret_path) if git_commit: commit_files( @@ -228,7 +231,7 @@ def allow_member( changed.extend( update_keys( group_folder.parent, - sorted(collect_keys_for_path(group_folder.parent)), + sorted_keys(collect_keys_for_path(group_folder.parent)), ) ) return changed @@ -255,10 +258,13 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]: group_folder.parent.rmdir() return update_keys( - target.parent.parent, sorted(collect_keys_for_path(group_folder.parent)) + target.parent.parent, sorted_keys(collect_keys_for_path(group_folder.parent)) ) +sorted_keys = functools.partial(sorted, key=operator.itemgetter(0)) + + def has_secret(secret_path: Path) -> bool: return (secret_path / "secret").exists() diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 99bde4481..86110180f 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +import enum import io import json import os @@ -19,13 +22,25 @@ from clan_cli.nix import nix_shell from .folders import sops_machines_folder, sops_users_folder +class KeyType(enum.Enum): + AGE = enum.auto() + PGP = enum.auto() + + @classmethod + def validate(cls, value: str | None) -> KeyType | None: # noqa: ANN102 + if value: + return cls.__members__.get(value.upper()) + return None + + @dataclass class SopsKey: pubkey: str username: str + type: KeyType -def get_public_key(privkey: str) -> str: +def get_public_age_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) try: res = subprocess.run( @@ -78,8 +93,8 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: - key = SopsKey(pub_key, username="") +def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> SopsKey | None: + key = SopsKey(pub_key, username="", type=type) folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -87,7 +102,7 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: for user in folder.iterdir(): if not (user / "key.json").exists(): continue - if read_key(user) == pub_key: + if read_key(user) == (pub_key, type): key.username = user.name return key @@ -95,8 +110,8 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: @API.register -def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey: - key = maybe_get_user_or_machine(flake_dir, pub_key) +def ensure_user_or_machine(flake_dir: Path, pub_key: str, key_type: KeyType) -> SopsKey: + key = maybe_get_user_or_machine(flake_dir, pub_key, key_type) if not key: msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" raise ClanError(msg) @@ -111,43 +126,51 @@ def default_admin_key_path() -> Path: @API.register -def maybe_get_admin_public_key() -> str | None: - key = os.environ.get("SOPS_AGE_KEY") - if key: - return get_public_key(key) +def maybe_get_admin_public_key() -> tuple[str, KeyType | None]: + age_key = os.environ.get("SOPS_AGE_KEY") + pgp_key = os.environ.get("SOPS_PGP_FP") + if age_key and pgp_key: + msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." + raise ClanError(msg) + if age_key: + return get_public_age_key(age_key), KeyType.AGE + if pgp_key: + return pgp_key, KeyType.PGP + path = default_admin_key_path() if path.exists(): - return get_public_key(path.read_text()) + return get_public_age_key(path.read_text()), KeyType.AGE - return None + return "", None def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - pub_key = maybe_get_admin_public_key() - if pub_key: - return maybe_get_user_or_machine(flake_dir, pub_key) + pub_key, key_type = maybe_get_admin_public_key() + if key_type: + return maybe_get_user_or_machine(flake_dir, pub_key, key_type) return None def ensure_admin_key(flake_dir: Path) -> SopsKey: - pub_key = maybe_get_admin_public_key() - if not pub_key: + pub_key, key_type = maybe_get_admin_public_key() + if not key_type: msg = "No sops key found. Please generate one with 'clan secrets key generate'." raise ClanError(msg) - return ensure_user_or_machine(flake_dir, pub_key) + return ensure_user_or_machine(flake_dir, pub_key, key_type) @contextmanager -def sops_manifest(keys: list[str]) -> Iterator[Path]: +def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]: + all_keys = {type.lower(): [] for type in KeyType.__members__.keys()} + for key, type in keys: + all_keys[type.name.lower()].append(key) with NamedTemporaryFile(delete=False, mode="w") as manifest: - json.dump( - {"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2 - ) + json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2) manifest.flush() yield Path(manifest.name) -def update_keys(secret_path: Path, keys: list[str]) -> list[Path]: +def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]: with sops_manifest(keys) as manifest: secret_path = secret_path / "secret" time_before = secret_path.stat().st_mtime @@ -171,7 +194,7 @@ def update_keys(secret_path: Path, keys: list[str]) -> list[Path]: def encrypt_file( secret_path: Path, content: IO[str] | str | bytes | None, - pubkeys: list[str], + pubkeys: list[tuple[str, KeyType]], ) -> None: folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) @@ -237,7 +260,7 @@ def get_meta(secret_path: Path) -> dict: return json.load(f) -def write_key(path: Path, publickey: str, overwrite: bool) -> None: +def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> None: path.mkdir(parents=True, exist_ok=True) try: flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC @@ -248,21 +271,23 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None: msg = f"{path.name} already exists in {path}. Use --force to overwrite." raise ClanError(msg) from e with os.fdopen(fd, "w") as f: - json.dump({"publickey": publickey, "type": "age"}, f, indent=2) + contents = {"publickey": publickey, "type": type.name.lower()} + json.dump(contents, f, indent=2) -def read_key(path: Path) -> str: +def read_key(path: Path) -> tuple[str, KeyType]: with Path(path / "key.json").open() as f: try: key = json.load(f) except json.JSONDecodeError as e: msg = f"Failed to decode {path.name}: {e}" raise ClanError(msg) from e - if key["type"] != "age": - msg = f"{path.name} is not an age key but {key['type']}. This is not supported" + type = KeyType.validate(key.get("type")) + if type is None: + msg = f"Invalid key type in {path.name}: \"{type}\" (expected one of {', '.join(KeyType.__members__.keys())})." raise ClanError(msg) publickey = key.get("publickey") if not publickey: msg = f"{path.name} does not contain a public key" raise ClanError(msg) - return publickey + return publickey, type diff --git a/pkgs/clan-cli/clan_cli/secrets/types.py b/pkgs/clan-cli/clan_cli/secrets/types.py index 405e39418..0a2121cab 100644 --- a/pkgs/clan-cli/clan_cli/secrets/types.py +++ b/pkgs/clan-cli/clan_cli/secrets/types.py @@ -5,7 +5,7 @@ from pathlib import Path from clan_cli.errors import ClanError -from .sops import get_public_key +from .sops import get_public_age_key VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$") VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$") @@ -24,7 +24,7 @@ def public_or_private_age_key_type(arg_value: str) -> str: if arg_value.startswith("age1"): return arg_value.strip() if arg_value.startswith("AGE-SECRET-KEY-"): - return get_public_key(arg_value) + return get_public_age_key(arg_value) if not arg_value.startswith("age1"): msg = f"Please provide an age key starting with age1, got: '{arg_value}'" raise ClanError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 46f4834c3..4e629beb8 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -1,11 +1,12 @@ import argparse +import os from pathlib import Path from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users from clan_cli.errors import ClanError from clan_cli.git import commit_files -from . import secrets +from . import secrets, sops from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder from .secrets import update_secrets from .sops import read_key, write_key @@ -17,13 +18,19 @@ from .types import ( ) -def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None: +def add_user( + flake_dir: Path, + name: str, + key: str, + key_type: sops.KeyType, + force: bool, +) -> None: path = sops_users_folder(flake_dir) / name def filter_user_secrets(secret: Path) -> bool: return secret.joinpath("users", name).exists() - write_key(path, key, force) + write_key(path, key, key_type, overwrite=force) paths = [path] paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets)) commit_files( @@ -42,7 +49,7 @@ def remove_user(flake_dir: Path, name: str) -> None: ) -def get_user(flake_dir: Path, name: str) -> str: +def get_user(flake_dir: Path, name: str) -> tuple[str, sops.KeyType]: return read_key(sops_users_folder(flake_dir) / name) @@ -95,14 +102,18 @@ def add_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - add_user(args.flake.path, args.user, args.key, args.force) + key_type = sops.KeyType.AGE if args.key_age else sops.KeyType.PGP + key = args.key_age or args.key_pgp + add_user(args.flake.path, args.user, key, key_type, args.force) def get_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - print(get_user(args.flake.path, args.user)) + key, type = get_user(args.flake.path, args.user) + type_or_null = f'"{type.name.lower()}"' if type else "null" + print(f'{{"key": "{key}", "type": {type_or_null}}}') def remove_command(args: argparse.Namespace) -> None: @@ -141,13 +152,21 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: "-f", "--force", help="overwrite existing user", action="store_true" ) add_parser.add_argument("user", help="the name of the user", type=user_name_type) - add_parser.add_argument( - "key", - help="public key or private key of the user." - "Execute 'clan secrets key --help' on how to retrieve a key." + key_type = add_parser.add_mutually_exclusive_group(required=True) + key_type.add_argument( + "--key-age", + help="public or private age key of the user. " + "Execute 'clan secrets key --help' on how to retrieve a key. " "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", type=public_or_private_age_key_type, ) + key_type.add_argument( + "--key-pgp", + help=( + "public PGP encryption key of the user. " + "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." + ), + ) add_parser.set_defaults(func=add_command) get_parser = subparser.add_parser("get", help="get a user public key") From 30d0afe75b4611302ffa2f56ee865d51c4fb946d Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Fri, 27 Sep 2024 13:50:13 -0700 Subject: [PATCH 02/14] Fix: use new sops api in `clan secrets machines` --- pkgs/clan-cli/clan_cli/secrets/machines.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 5be164751..2fc28c47e 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -10,7 +10,7 @@ from clan_cli.errors import ClanError from clan_cli.git import commit_files from clan_cli.machines.types import machine_name_type, validate_hostname -from . import secrets +from . import secrets, sops from .folders import ( list_objects, remove_object, @@ -24,7 +24,7 @@ from .types import public_or_private_age_key_type, secret_name_type def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None: machine_path = sops_machines_folder(flake_dir) / machine - write_key(machine_path, pubkey, force) + write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force) paths = [machine_path] def filter_machine_secrets(secret: Path) -> bool: @@ -48,7 +48,8 @@ def remove_machine(flake_dir: Path, name: str) -> None: def get_machine(flake_dir: Path, name: str) -> str: - return read_key(sops_machines_folder(flake_dir) / name) + key, type = read_key(sops_machines_folder(flake_dir) / name) + return key def has_machine(flake_dir: Path, name: str) -> bool: @@ -168,7 +169,7 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: add_dynamic_completer(add_machine_action, complete_machines) add_parser.add_argument( "key", - help="public key or private key of the user", + help="public or private age key of the machine", type=public_or_private_age_key_type, ) add_parser.set_defaults(func=add_command) From 710b832066c9ba5135a21e2fb1ca38bdcdb75830 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Sun, 29 Sep 2024 19:27:20 -0700 Subject: [PATCH 03/14] Fix: do not assume users use age keys in vars/sops With added support for PGP for users keys, do not assume an age key is going to be present in secrets files. --- pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index 761621674..42d0f7bbb 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -229,7 +229,7 @@ class SecretStore(SecretStoreBase): ) -> bool: secret_path = self.secret_path(generator_name, secret_name, shared) secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in secret["sops"]["age"]] + recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] machines_folder_path = sops_machines_folder(self.machine.flake_dir) machine_pubkey = json.loads( (machines_folder_path / self.machine.name / "key.json").read_text() From 24973370b3071ab06e9e4d2ef909fc270ed56d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 12:00:27 +0200 Subject: [PATCH 04/14] secrets: do not shadow python builtins --- pkgs/clan-cli/clan_cli/secrets/key.py | 4 +-- pkgs/clan-cli/clan_cli/secrets/machines.py | 2 +- pkgs/clan-cli/clan_cli/secrets/secrets.py | 4 +-- pkgs/clan-cli/clan_cli/secrets/sops.py | 30 ++++++++++++---------- pkgs/clan-cli/clan_cli/secrets/users.py | 4 +-- 5 files changed, 24 insertions(+), 20 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index 484fc1fec..db1a1bf57 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -54,8 +54,8 @@ def generate_command(args: argparse.Namespace) -> None: def show_command(args: argparse.Namespace) -> None: - key, type = sops.maybe_get_public_key() - type_or_null = f'"{type.name.lower()}"' if type else "null" + key, key_type = sops.maybe_get_admin_public_key() + type_or_null = f'"{key_type.name.lower()}"' if key_type else "null" print(f'{{"key": "{key}", "type": {type_or_null}}}') diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 2fc28c47e..68257fe8f 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -48,7 +48,7 @@ def remove_machine(flake_dir: Path, name: str) -> None: def get_machine(flake_dir: Path, name: str) -> str: - key, type = read_key(sops_machines_folder(flake_dir) / name) + key, _ = read_key(sops_machines_folder(flake_dir) / name) return key diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index 51f838731..3374c7668 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -135,8 +135,8 @@ def encrypt_secret( recipient_keys = collect_keys_for_path(secret_path) - if (key.pubkey, key.type) not in recipient_keys: - recipient_keys.add((key.pubkey, key.type)) + if (key.pubkey, key.key_type) not in recipient_keys: + recipient_keys.add((key.pubkey, key.key_type)) files_to_commit.extend( allow_member( users_folder(secret_path), diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 86110180f..28ff19134 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -37,7 +37,7 @@ class KeyType(enum.Enum): class SopsKey: pubkey: str username: str - type: KeyType + key_type: KeyType def get_public_age_key(privkey: str) -> str: @@ -93,8 +93,10 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> SopsKey | None: - key = SopsKey(pub_key, username="", type=type) +def maybe_get_user_or_machine( + flake_dir: Path, pub_key: str, key_type: KeyType +) -> SopsKey | None: + key = SopsKey(pub_key, username="", key_type=key_type) folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -102,7 +104,7 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> S for user in folder.iterdir(): if not (user / "key.json").exists(): continue - if read_key(user) == (pub_key, type): + if read_key(user) == (pub_key, key_type): key.username = user.name return key @@ -161,9 +163,11 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey: @contextmanager def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]: - all_keys = {type.lower(): [] for type in KeyType.__members__.keys()} - for key, type in keys: - all_keys[type.name.lower()].append(key) + all_keys: dict[str, list[str]] = { + key_type.lower(): [] for key_type in KeyType.__members__ + } + for key, key_type in keys: + all_keys[key_type.name.lower()].append(key) with NamedTemporaryFile(delete=False, mode="w") as manifest: json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2) manifest.flush() @@ -260,7 +264,7 @@ def get_meta(secret_path: Path) -> dict: return json.load(f) -def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> None: +def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None: path.mkdir(parents=True, exist_ok=True) try: flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC @@ -271,7 +275,7 @@ def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> Non msg = f"{path.name} already exists in {path}. Use --force to overwrite." raise ClanError(msg) from e with os.fdopen(fd, "w") as f: - contents = {"publickey": publickey, "type": type.name.lower()} + contents = {"publickey": publickey, "type": key_type.name.lower()} json.dump(contents, f, indent=2) @@ -282,12 +286,12 @@ def read_key(path: Path) -> tuple[str, KeyType]: except json.JSONDecodeError as e: msg = f"Failed to decode {path.name}: {e}" raise ClanError(msg) from e - type = KeyType.validate(key.get("type")) - if type is None: - msg = f"Invalid key type in {path.name}: \"{type}\" (expected one of {', '.join(KeyType.__members__.keys())})." + key_type = KeyType.validate(key.get("type")) + if key_type is None: + msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})." raise ClanError(msg) publickey = key.get("publickey") if not publickey: msg = f"{path.name} does not contain a public key" raise ClanError(msg) - return publickey, type + return publickey, key_type diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 4e629beb8..6f5e0c39b 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -111,8 +111,8 @@ def get_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - key, type = get_user(args.flake.path, args.user) - type_or_null = f'"{type.name.lower()}"' if type else "null" + key, key_type = get_user(args.flake.path, args.user) + type_or_null = '"{key_type.name.lower()}"' if key_type else "null" print(f'{{"key": "{key}", "type": {type_or_null}}}') From d909078033e937b8f2bbb8e9d533cb8942d3999d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 12:00:37 +0200 Subject: [PATCH 05/14] default key type to age and rename to age-key/pgp-key --- pkgs/clan-cli/clan_cli/secrets/sops.py | 3 ++- pkgs/clan-cli/clan_cli/secrets/users.py | 24 +++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 28ff19134..d342b8f3f 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -104,7 +104,8 @@ def maybe_get_user_or_machine( for user in folder.iterdir(): if not (user / "key.json").exists(): continue - if read_key(user) == (pub_key, key_type): + this_pub_key, this_key_type = read_key(user) + if pub_key == this_pub_key and key_type == this_key_type: key.username = user.name return key diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 6f5e0c39b..679162dd6 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -1,5 +1,4 @@ import argparse -import os from pathlib import Path from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users @@ -102,8 +101,15 @@ def add_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - key_type = sops.KeyType.AGE if args.key_age else sops.KeyType.PGP - key = args.key_age or args.key_pgp + if args.age_key or args.agekey: + key_type = sops.KeyType.AGE + elif args.pgp_key: + key_type = sops.KeyType.PGP + else: + msg = "BUG!: key type not set" + raise ValueError(msg) + key = args.agekey or args.age_key or args.pgp_key + assert key is not None, "key is None" add_user(args.flake.path, args.user, key, key_type, args.force) @@ -154,14 +160,22 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: add_parser.add_argument("user", help="the name of the user", type=user_name_type) key_type = add_parser.add_mutually_exclusive_group(required=True) key_type.add_argument( - "--key-age", + "agekey", + help="public or private age key of the user. " + "Execute 'clan secrets key --help' on how to retrieve a key. " + "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", + type=public_or_private_age_key_type, + nargs="?", + ) + key_type.add_argument( + "--age-key", help="public or private age key of the user. " "Execute 'clan secrets key --help' on how to retrieve a key. " "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", type=public_or_private_age_key_type, ) key_type.add_argument( - "--key-pgp", + "--pgp-key", help=( "public PGP encryption key of the user. " "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." From db065ea06bdf433a67d637d5dc2df1ed3d91af1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 15:34:58 +0200 Subject: [PATCH 06/14] error if we cannot load a dataclass from file --- pkgs/clan-cli/tests/test_api_dataclass_compat.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkgs/clan-cli/tests/test_api_dataclass_compat.py b/pkgs/clan-cli/tests/test_api_dataclass_compat.py index 82072d4fb..24071c1c8 100644 --- a/pkgs/clan-cli/tests/test_api_dataclass_compat.py +++ b/pkgs/clan-cli/tests/test_api_dataclass_compat.py @@ -129,6 +129,9 @@ def test_all_dataclasses() -> None: try: API.reset() dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent)) + if dclass is None: + msg = f"Could not load dataclass {dataclass} from {file}" + raise ClanError(msg) type_to_dict(dclass) except JSchemaTypeError as e: print(f"Error loading dataclass {dataclass} from {file}: {e}") From 541a73692f903f6563b27947e7a62e3f03456068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 15:40:50 +0200 Subject: [PATCH 07/14] fix serialisation of SopsKey type --- pkgs/clan-cli/clan_cli/api/serde.py | 17 ++++++++++++----- pkgs/clan-cli/clan_cli/api/util.py | 23 ++++++++++++++++------- pkgs/clan-cli/clan_cli/secrets/sops.py | 4 +--- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/api/serde.py b/pkgs/clan-cli/clan_cli/api/serde.py index b69f57d3f..61e4f1196 100644 --- a/pkgs/clan-cli/clan_cli/api/serde.py +++ b/pkgs/clan-cli/clan_cli/api/serde.py @@ -32,6 +32,7 @@ Note: This module assumes the presence of other modules and classes such as `Cla import dataclasses import json from dataclasses import dataclass, fields, is_dataclass +from enum import Enum from pathlib import Path from types import UnionType from typing import ( @@ -179,25 +180,31 @@ def construct_value( # Nested types # list # dict - if get_origin(t) is list: + origin = get_origin(t) + if origin is list: if not isinstance(field_value, list): msg = f"Expected list, got {field_value}" raise ClanError(msg, location=f"{loc}") return [construct_value(get_args(t)[0], item) for item in field_value] - if get_origin(t) is dict and isinstance(field_value, dict): + if origin is dict and isinstance(field_value, dict): return { key: construct_value(get_args(t)[1], value) for key, value in field_value.items() } - if get_origin(t) is Literal: + if origin is Literal: valid_values = get_args(t) if field_value not in valid_values: - msg = f"Expected one of {valid_values}, got {field_value}" + msg = f"Expected one of {', '.join(valid_values)}, got {field_value}" raise ClanError(msg, location=f"{loc}") return field_value - if get_origin(t) is Annotated: + if origin is Enum: + if field_value not in origin.__members__: + msg = f"Expected one of {', '.join(origin.__members__)}, got {field_value}" + raise ClanError(msg, location=f"{loc}") + + if origin is Annotated: (base_type,) = get_args(t) return construct_value(base_type, field_value) diff --git a/pkgs/clan-cli/clan_cli/api/util.py b/pkgs/clan-cli/clan_cli/api/util.py index 45c762a22..3ff0ffc8c 100644 --- a/pkgs/clan-cli/clan_cli/api/util.py +++ b/pkgs/clan-cli/clan_cli/api/util.py @@ -2,6 +2,7 @@ import copy import dataclasses import pathlib from dataclasses import MISSING +from enum import EnumType from types import NoneType, UnionType from typing import ( Annotated, @@ -77,13 +78,16 @@ def type_to_dict( if dataclasses.is_dataclass(t): fields = dataclasses.fields(t) - properties = { - f.metadata.get("alias", f.name): type_to_dict( + properties = {} + for f in fields: + if f.name.startswith("_"): + continue + assert not isinstance( + f.type, str + ), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" + properties[f.metadata.get("alias", f.name)] = type_to_dict( f.type, f"{scope} {t.__name__}.{f.name}", type_map ) - for f in fields - if not f.name.startswith("_") - } required = set() for pn, pv in properties.items(): @@ -192,6 +196,11 @@ def type_to_dict( return {"type": "boolean"} if t is object: return {"type": "object"} + if type(t) is EnumType: + return { + "type": "string", + "enum": list(t.__members__), + } if t is Any: msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}" raise JSchemaTypeError(msg) @@ -208,7 +217,7 @@ def type_to_dict( if t is NoneType: return {"type": "null"} - msg = f"{scope} - Error primitive type not supported {t!s}" + msg = f"{scope} - Basic type '{t!s}' is not supported" raise JSchemaTypeError(msg) - msg = f"{scope} - Error type not supported {t!s}" + msg = f"{scope} - Type '{t!s}' is not supported" raise JSchemaTypeError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index d342b8f3f..1007faaaf 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,5 +1,3 @@ -from __future__ import annotations - import enum import io import json @@ -27,7 +25,7 @@ class KeyType(enum.Enum): PGP = enum.auto() @classmethod - def validate(cls, value: str | None) -> KeyType | None: # noqa: ANN102 + def validate(cls, value: str | None) -> "KeyType | None": # noqa: ANN102 if value: return cls.__members__.get(value.upper()) return None From 4a3030d6edc39433878bc073a790a40671237f9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 17:14:51 +0200 Subject: [PATCH 08/14] secrets: replace Key, key type tuple with SopsKey class --- pkgs/clan-cli/clan_cli/secrets/sops.py | 57 ++++++++++++++------------ 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 1007faaaf..dc85a3926 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -91,10 +91,7 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine( - flake_dir: Path, pub_key: str, key_type: KeyType -) -> SopsKey | None: - key = SopsKey(pub_key, username="", key_type=key_type) +def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -103,7 +100,7 @@ def maybe_get_user_or_machine( if not (user / "key.json").exists(): continue this_pub_key, this_key_type = read_key(user) - if pub_key == this_pub_key and key_type == this_key_type: + if key.pubkey == this_pub_key and key.key_type == this_key_type: key.username = user.name return key @@ -111,12 +108,12 @@ def maybe_get_user_or_machine( @API.register -def ensure_user_or_machine(flake_dir: Path, pub_key: str, key_type: KeyType) -> SopsKey: - key = maybe_get_user_or_machine(flake_dir, pub_key, key_type) - if not key: - msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" - raise ClanError(msg) - return key +def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: + maybe_key = maybe_get_user_or_machine(flake_dir, key) + if maybe_key: + return maybe_key + msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)" + raise ClanError(msg) def default_admin_key_path() -> Path: @@ -127,37 +124,43 @@ def default_admin_key_path() -> Path: @API.register -def maybe_get_admin_public_key() -> tuple[str, KeyType | None]: +def maybe_get_admin_public_key() -> None | SopsKey: age_key = os.environ.get("SOPS_AGE_KEY") pgp_key = os.environ.get("SOPS_PGP_FP") if age_key and pgp_key: msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." raise ClanError(msg) if age_key: - return get_public_age_key(age_key), KeyType.AGE + return SopsKey( + pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username="" + ) if pgp_key: - return pgp_key, KeyType.PGP + return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="") path = default_admin_key_path() if path.exists(): - return get_public_age_key(path.read_text()), KeyType.AGE + return SopsKey( + pubkey=get_public_age_key(path.read_text()), + key_type=KeyType.AGE, + username="", + ) - return "", None - - -def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - pub_key, key_type = maybe_get_admin_public_key() - if key_type: - return maybe_get_user_or_machine(flake_dir, pub_key, key_type) return None +def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: + key = maybe_get_admin_public_key() + if not key: + return None + return maybe_get_user_or_machine(flake_dir, key) + + def ensure_admin_key(flake_dir: Path) -> SopsKey: - pub_key, key_type = maybe_get_admin_public_key() - if not key_type: - msg = "No sops key found. Please generate one with 'clan secrets key generate'." - raise ClanError(msg) - return ensure_user_or_machine(flake_dir, pub_key, key_type) + key = maybe_get_admin_public_key() + if key: + return ensure_user_or_machine(flake_dir, key) + msg = "No sops key found. Please generate one with 'clan secrets key generate'." + raise ClanError(msg) @contextmanager From be5f10e241e4217c679ebafa0c6daf801e2d1478 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 12:30:51 +0200 Subject: [PATCH 09/14] secrets/show: pretty print json --- pkgs/clan-cli/clan_cli/secrets/key.py | 12 +++++++++--- pkgs/clan-cli/tests/test_secrets_cli.py | 3 ++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index db1a1bf57..f471615d3 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -1,5 +1,7 @@ import argparse +import json import logging +import sys from pathlib import Path from clan_cli.errors import ClanError @@ -54,9 +56,13 @@ def generate_command(args: argparse.Namespace) -> None: def show_command(args: argparse.Namespace) -> None: - key, key_type = sops.maybe_get_admin_public_key() - type_or_null = f'"{key_type.name.lower()}"' if key_type else "null" - print(f'{{"key": "{key}", "type": {type_or_null}}}') + key = sops.maybe_get_admin_public_key() + if not key: + msg = "No public key found" + raise ClanError(msg) + json.dump( + {"key": key.pubkey, "type": str(key.key_type)}, sys.stdout, indent=2, sort_keys=True + ) def update_command(args: argparse.Namespace) -> None: diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index bb36af2f9..91b6ffd42 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -1,3 +1,4 @@ +import json import logging import os from collections.abc import Iterator @@ -253,7 +254,7 @@ def test_secrets( cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) with capture_output as output: cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) - key = output.out + key = json.loads(output.out)["key"] assert key.startswith("age1") cli.run( ["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key] From 6694c2b60dae7ddbf22142e90b35ef69e6a5e8cd Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 1 Oct 2024 19:41:51 -0700 Subject: [PATCH 10/14] Fix key dump in `clan secrets key show` ``` In [4]: str(Type.AGE) Out[4]: Type.AGE In [5]: Type.AGE.name.lower() Out[5]: age ``` --- pkgs/clan-cli/clan_cli/secrets/key.py | 4 +--- pkgs/clan-cli/clan_cli/secrets/sops.py | 7 +++++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index f471615d3..f41091d0e 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -60,9 +60,7 @@ def show_command(args: argparse.Namespace) -> None: if not key: msg = "No public key found" raise ClanError(msg) - json.dump( - {"key": key.pubkey, "type": str(key.key_type)}, sys.stdout, indent=2, sort_keys=True - ) + json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) def update_command(args: argparse.Namespace) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index dc85a3926..ad956ef3a 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -37,6 +37,13 @@ class SopsKey: username: str key_type: KeyType + def as_dict(self) -> dict[str, str]: + return { + "publickey": self.pubkey, + "username": self.username, + "type": self.key_type.name.lower(), + } + def get_public_age_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) From 6848b3b6b3b095cd6cde53ab95a927d73ada70e0 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 1 Oct 2024 19:44:56 -0700 Subject: [PATCH 11/14] fix: `clan secrets user get` dump the user identity correctly --- pkgs/clan-cli/clan_cli/secrets/users.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 679162dd6..0a5c8809b 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -1,4 +1,6 @@ import argparse +import json +import sys from pathlib import Path from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users @@ -48,8 +50,9 @@ def remove_user(flake_dir: Path, name: str) -> None: ) -def get_user(flake_dir: Path, name: str) -> tuple[str, sops.KeyType]: - return read_key(sops_users_folder(flake_dir) / name) +def get_user(flake_dir: Path, name: str) -> sops.SopsKey: + key, key_type = read_key(sops_users_folder(flake_dir) / name) + return sops.SopsKey(key, name, key_type) def list_users(flake_dir: Path) -> list[str]: @@ -117,9 +120,8 @@ def get_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - key, key_type = get_user(args.flake.path, args.user) - type_or_null = '"{key_type.name.lower()}"' if key_type else "null" - print(f'{{"key": "{key}", "type": {type_or_null}}}') + key = get_user(args.flake.path, args.user) + json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) def remove_command(args: argparse.Namespace) -> None: From 7999465d89b545274bb392862f483b71c9aabc9d Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 1 Oct 2024 19:52:00 -0700 Subject: [PATCH 12/14] Make clan_cli.secrets.sops.SopsKey immutable and remove its __eq__ method Immutability seems sensible for this type. There is some ambiguity on how to compare keys, in particular when `user.name == ""`, but the rest matches. --- pkgs/clan-cli/clan_cli/secrets/sops.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index ad956ef3a..b3a90483c 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -31,7 +31,7 @@ class KeyType(enum.Enum): return None -@dataclass +@dataclass(frozen=True, eq=False) class SopsKey: pubkey: str username: str @@ -108,8 +108,7 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: continue this_pub_key, this_key_type = read_key(user) if key.pubkey == this_pub_key and key.key_type == this_key_type: - key.username = user.name - return key + return SopsKey(key.pubkey, user.name, key.key_type) return None From 103ad87bc934ed0624ca0088639fab5efcc6a71c Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 1 Oct 2024 19:53:22 -0700 Subject: [PATCH 13/14] Improvements for `clan secrets key generate`. I am not sure to understand what `extract_public_key` was for. It seems like `age-keygen -y` will just work fine for a file like `extract_public_key` is looking for. Unless someone intentionally made a file with a comment like that without the private key in it. Messages are moved to stdout rather being logged. It feels like the output is meaningful in the first step users are going to take. Also makes testing easier, as log messages are captured differently than stdout. The call to add an user is changed to be easier to copy paste and work whether PGP or age is in use. A description for the command is added instead of help which does not seem to be displayed. --- pkgs/clan-cli/clan_cli/secrets/key.py | 61 ++++++++++--------------- pkgs/clan-cli/clan_cli/secrets/users.py | 1 + 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index f41091d0e..ea5cbb3e8 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -2,57 +2,40 @@ import argparse import json import logging import sys -from pathlib import Path from clan_cli.errors import ClanError from clan_cli.git import commit_files from . import sops from .secrets import update_secrets -from .sops import default_admin_key_path, generate_private_key +from .sops import ( + default_admin_key_path, + generate_private_key, + maybe_get_admin_public_key, +) log = logging.getLogger(__name__) -def extract_public_key(filepath: Path) -> str: - """ - Extracts the public key from a given text file. - """ - try: - with filepath.open() as file: - for line in file: - # Check if the line contains the public key - if line.startswith("# public key:"): - # Extract and return the public key part after the prefix - return line.strip().split(": ")[1] - except FileNotFoundError as e: - msg = f"The file at {filepath} was not found." - raise ClanError(msg) from e - except OSError as e: - msg = f"An error occurred while extracting the public key: {e}" - raise ClanError(msg) from e +def generate_key() -> sops.SopsKey: + key = maybe_get_admin_public_key() + if key is not None: + print(f"{key.key_type.name} key {key.pubkey} is already set") + return key - msg = f"Could not find the public key in the file at {filepath}." - raise ClanError(msg) - - -def generate_key() -> str: path = default_admin_key_path() - if path.exists(): - log.info(f"Key already exists at {path}") - return extract_public_key(path) - priv_key, pub_key = generate_private_key(out_file=path) - log.info( - f"Generated age private key at '{default_admin_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets." + _, pub_key = generate_private_key(out_file=path) + print( + f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets." ) - return pub_key + return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE) def generate_command(args: argparse.Namespace) -> None: - pub_key = generate_key() - log.info( - f"Also add your age public key to the repository with: \nclan secrets users add {pub_key}" - ) + key = generate_key() + print("Also add your age public key to the repository with:") + key_type = key.key_type.name.lower() + print(f"clan secrets users add --{key_type}-key ") def show_command(args: argparse.Namespace) -> None: @@ -76,7 +59,13 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None: required=True, ) - parser_generate = subparser.add_parser("generate", help="generate age key") + parser_generate = subparser.add_parser( + "generate", + description=( + "Generate an age key for the Clan, " + "to use PGP set `SOPS_PGP_FP` in your environment." + ), + ) parser_generate.set_defaults(func=generate_command) parser_show = subparser.add_parser("show", help="show public key") diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 0a5c8809b..5348d1405 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -180,6 +180,7 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: "--pgp-key", help=( "public PGP encryption key of the user. " + # Use --fingerprint --fingerprint to get fingerprints for subkeys: "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." ), ) From dbe8927a7747d64b4730c9fbfeeb0abf29a7b9d8 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 1 Oct 2024 20:08:00 -0700 Subject: [PATCH 14/14] Update tests for `clan secrets` --- pkgs/clan-cli/tests/test_secrets_cli.py | 162 ++++++++++++++++++++++-- 1 file changed, 150 insertions(+), 12 deletions(-) diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index 91b6ffd42..85003a541 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -1,8 +1,12 @@ +import functools import json import logging import os +import re +import subprocess from collections.abc import Iterator from contextlib import contextmanager +from pathlib import Path from typing import TYPE_CHECKING import pytest @@ -228,7 +232,7 @@ def test_groups( @contextmanager -def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: +def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: old_key = os.environ["SOPS_AGE_KEY_FILE"] monkeypatch.delenv("SOPS_AGE_KEY_FILE") monkeypatch.setenv("SOPS_AGE_KEY", key) @@ -239,29 +243,95 @@ def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key) +@contextmanager +def use_gpg_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: + old_key_file = os.environ.get("SOPS_AGE_KEY_FILE") + old_key = os.environ.get("SOPS_AGE_KEY") + monkeypatch.delenv("SOPS_AGE_KEY_FILE", raising=False) + monkeypatch.delenv("SOPS_AGE_KEY", raising=False) + monkeypatch.setenv("SOPS_PGP_FP", key) + try: + yield + finally: + monkeypatch.delenv("SOPS_PGP_FP") + if old_key_file is not None: + monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key_file) + if old_key is not None: + monkeypatch.setenv("SOPS_AGE_KEY", old_key) + + +@pytest.fixture +def gpg_key( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> str: + gpg_home = tmp_path / "gnupghome" + gpg_home.mkdir(mode=0o700) + + gpg_environ = os.environ.copy() + gpg_environ["GNUPGHOME"] = str(gpg_home) + run = functools.partial( + subprocess.run, + encoding="utf-8", + check=True, + env=gpg_environ, + ) + key_parameters = "\n".join( + ( + "%no-protection", + "%transient-key", + "Key-Type: rsa", + "Key-Usage: cert encrypt", + "Name-Real: Foo Bar", + "Name-Comment: Test user", + "Name-Email: test@clan.lol", + "%commit", + ) + ) + run(["gpg", "--batch", "--quiet", "--generate-key"], input=key_parameters) + details = run(["gpg", "--list-keys", "--with-colons"], capture_output=True) + fingerprint = None + for line in details.stdout.strip().split(os.linesep): + if not line.startswith("fpr"): + continue + fingerprint = line.split(":")[9] + break + assert fingerprint is not None, "Could not generate test GPG key" + log.info(f"Created GPG key under {gpg_home}") + + monkeypatch.setenv("GNUPGHOME", str(gpg_home)) + return fingerprint + + def test_secrets( test_flake: FlakeForTest, capture_output: CaptureOutput, monkeypatch: pytest.MonkeyPatch, + gpg_key: str, age_keys: list["KeyPair"], ) -> None: with capture_output as output: cli.run(["secrets", "list", "--flake", str(test_flake.path)]) assert output.out == "" - monkeypatch.setenv("SOPS_NIX_SECRET", "foo") + # Generate a new key for the clan monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key")) - cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) + with capture_output as output: + cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) + assert "age private key" in output.out + # Read the key that was generated with capture_output as output: cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) - key = json.loads(output.out)["key"] + key = json.loads(output.out)["publickey"] assert key.startswith("age1") + # Add testuser with the key that was generated for the clan cli.run( ["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key] ) with pytest.raises(ClanError): # does not exist yet cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"]) + monkeypatch.setenv("SOPS_NIX_SECRET", "foo") cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"]) with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"]) @@ -290,6 +360,8 @@ def test_secrets( cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"]) assert output.out == "key\n" + # using the `age_keys` KeyPair, add a machine and rotate its key + cli.run( [ "secrets", @@ -316,7 +388,7 @@ def test_secrets( cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)]) assert output.out == "machine1\n" - with use_key(age_keys[1].privkey, monkeypatch): + with use_age_key(age_keys[1].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -336,7 +408,7 @@ def test_secrets( ) # should also rotate the encrypted secret - with use_key(age_keys[0].privkey, monkeypatch): + with use_age_key(age_keys[0].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -375,7 +447,7 @@ def test_secrets( "key", ] ) - with capture_output as output, use_key(age_keys[1].privkey, monkeypatch): + with capture_output as output, use_age_key(age_keys[1].privkey, monkeypatch): cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" cli.run( @@ -448,12 +520,12 @@ def test_secrets( ] ) - with use_key(age_keys[1].privkey, monkeypatch): + with use_age_key(age_keys[1].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" - # extend group will update secrets + # Add an user with a GPG key cli.run( [ "secrets", @@ -461,10 +533,13 @@ def test_secrets( "add", "--flake", str(test_flake.path), + "--pgp-key", + gpg_key, "user2", - age_keys[2].pubkey, ] ) + + # Extend group will update secrets cli.run( [ "secrets", @@ -477,7 +552,7 @@ def test_secrets( ] ) - with use_key(age_keys[2].privkey, monkeypatch): # user2 + with use_gpg_key(gpg_key, monkeypatch): # user2 with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -495,7 +570,7 @@ def test_secrets( ) with ( pytest.raises(ClanError), - use_key(age_keys[2].privkey, monkeypatch), + use_gpg_key(gpg_key, monkeypatch), capture_output as output, ): # user2 is not in the group anymore @@ -520,3 +595,66 @@ def test_secrets( with capture_output as output: cli.run(["secrets", "list", "--flake", str(test_flake.path)]) assert output.out == "" + + +def test_secrets_key_generate_gpg( + test_flake: FlakeForTest, + capture_output: CaptureOutput, + monkeypatch: pytest.MonkeyPatch, + gpg_key: str, +) -> None: + with use_gpg_key(gpg_key, monkeypatch): + # Make sure clan secrets key generate recognizes + # the PGP key and does nothing: + with capture_output as output: + cli.run( + [ + "secrets", + "key", + "generate", + "--flake", + str(test_flake.path), + ] + ) + assert "age private key" not in output.out + assert re.match(r"PGP key.+is already set", output.out) is not None + + with capture_output as output: + cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) + key = json.loads(output.out) + assert key["type"] == "pgp" + assert key["publickey"] == gpg_key + + # Add testuser with the key that was (not) generated for the clan: + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + "--pgp-key", + gpg_key, + "testuser", + ] + ) + with capture_output as output: + cli.run( + [ + "secrets", + "users", + "get", + "--flake", + str(test_flake.path), + "testuser", + ] + ) + key = json.loads(output.out) + assert key["type"] == "pgp" + assert key["publickey"] == gpg_key + + monkeypatch.setenv("SOPS_NIX_SECRET", "secret-value") + cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"]) + with capture_output as output: + cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-name"]) + assert output.out == "secret-value"