diff --git a/pkgs/clan-cli/clan_cli/api/serde.py b/pkgs/clan-cli/clan_cli/api/serde.py index b69f57d3f..61e4f1196 100644 --- a/pkgs/clan-cli/clan_cli/api/serde.py +++ b/pkgs/clan-cli/clan_cli/api/serde.py @@ -32,6 +32,7 @@ Note: This module assumes the presence of other modules and classes such as `Cla import dataclasses import json from dataclasses import dataclass, fields, is_dataclass +from enum import Enum from pathlib import Path from types import UnionType from typing import ( @@ -179,25 +180,31 @@ def construct_value( # Nested types # list # dict - if get_origin(t) is list: + origin = get_origin(t) + if origin is list: if not isinstance(field_value, list): msg = f"Expected list, got {field_value}" raise ClanError(msg, location=f"{loc}") return [construct_value(get_args(t)[0], item) for item in field_value] - if get_origin(t) is dict and isinstance(field_value, dict): + if origin is dict and isinstance(field_value, dict): return { key: construct_value(get_args(t)[1], value) for key, value in field_value.items() } - if get_origin(t) is Literal: + if origin is Literal: valid_values = get_args(t) if field_value not in valid_values: - msg = f"Expected one of {valid_values}, got {field_value}" + msg = f"Expected one of {', '.join(valid_values)}, got {field_value}" raise ClanError(msg, location=f"{loc}") return field_value - if get_origin(t) is Annotated: + if origin is Enum: + if field_value not in origin.__members__: + msg = f"Expected one of {', '.join(origin.__members__)}, got {field_value}" + raise ClanError(msg, location=f"{loc}") + + if origin is Annotated: (base_type,) = get_args(t) return construct_value(base_type, field_value) diff --git a/pkgs/clan-cli/clan_cli/api/util.py b/pkgs/clan-cli/clan_cli/api/util.py index 45c762a22..3ff0ffc8c 100644 --- a/pkgs/clan-cli/clan_cli/api/util.py +++ b/pkgs/clan-cli/clan_cli/api/util.py @@ -2,6 +2,7 @@ import copy import dataclasses import pathlib from dataclasses import MISSING +from enum import EnumType from types import NoneType, UnionType from typing import ( Annotated, @@ -77,13 +78,16 @@ def type_to_dict( if dataclasses.is_dataclass(t): fields = dataclasses.fields(t) - properties = { - f.metadata.get("alias", f.name): type_to_dict( + properties = {} + for f in fields: + if f.name.startswith("_"): + continue + assert not isinstance( + f.type, str + ), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" + properties[f.metadata.get("alias", f.name)] = type_to_dict( f.type, f"{scope} {t.__name__}.{f.name}", type_map ) - for f in fields - if not f.name.startswith("_") - } required = set() for pn, pv in properties.items(): @@ -192,6 +196,11 @@ def type_to_dict( return {"type": "boolean"} if t is object: return {"type": "object"} + if type(t) is EnumType: + return { + "type": "string", + "enum": list(t.__members__), + } if t is Any: msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}" raise JSchemaTypeError(msg) @@ -208,7 +217,7 @@ def type_to_dict( if t is NoneType: return {"type": "null"} - msg = f"{scope} - Error primitive type not supported {t!s}" + msg = f"{scope} - Basic type '{t!s}' is not supported" raise JSchemaTypeError(msg) - msg = f"{scope} - Error type not supported {t!s}" + msg = f"{scope} - Type '{t!s}' is not supported" raise JSchemaTypeError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index 4fa9cd378..ea5cbb3e8 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -1,63 +1,49 @@ import argparse +import json import logging -from pathlib import Path +import sys from clan_cli.errors import ClanError from clan_cli.git import commit_files +from . import sops from .secrets import update_secrets -from .sops import default_admin_key_path, generate_private_key, get_public_key +from .sops import ( + default_admin_key_path, + generate_private_key, + maybe_get_admin_public_key, +) log = logging.getLogger(__name__) -def extract_public_key(filepath: Path) -> str: - """ - Extracts the public key from a given text file. - """ - try: - with filepath.open() as file: - for line in file: - # Check if the line contains the public key - if line.startswith("# public key:"): - # Extract and return the public key part after the prefix - return line.strip().split(": ")[1] - except FileNotFoundError as e: - msg = f"The file at {filepath} was not found." - raise ClanError(msg) from e - except OSError as e: - msg = f"An error occurred while extracting the public key: {e}" - raise ClanError(msg) from e +def generate_key() -> sops.SopsKey: + key = maybe_get_admin_public_key() + if key is not None: + print(f"{key.key_type.name} key {key.pubkey} is already set") + return key - msg = f"Could not find the public key in the file at {filepath}." - raise ClanError(msg) - - -def generate_key() -> str: path = default_admin_key_path() - if path.exists(): - log.info(f"Key already exists at {path}") - return extract_public_key(path) - priv_key, pub_key = generate_private_key(out_file=path) - log.info( - f"Generated age private key at '{default_admin_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets." + _, pub_key = generate_private_key(out_file=path) + print( + f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets." ) - return pub_key - - -def show_key() -> str: - return get_public_key(default_admin_key_path().read_text()) + return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE) def generate_command(args: argparse.Namespace) -> None: - pub_key = generate_key() - log.info( - f"Also add your age public key to the repository with: \nclan secrets users add {pub_key}" - ) + key = generate_key() + print("Also add your age public key to the repository with:") + key_type = key.key_type.name.lower() + print(f"clan secrets users add --{key_type}-key ") def show_command(args: argparse.Namespace) -> None: - print(show_key()) + key = sops.maybe_get_admin_public_key() + if not key: + msg = "No public key found" + raise ClanError(msg) + json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) def update_command(args: argparse.Namespace) -> None: @@ -73,10 +59,16 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None: required=True, ) - parser_generate = subparser.add_parser("generate", help="generate age key") + parser_generate = subparser.add_parser( + "generate", + description=( + "Generate an age key for the Clan, " + "to use PGP set `SOPS_PGP_FP` in your environment." + ), + ) parser_generate.set_defaults(func=generate_command) - parser_show = subparser.add_parser("show", help="show age public key") + parser_show = subparser.add_parser("show", help="show public key") parser_show.set_defaults(func=show_command) parser_update = subparser.add_parser( diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 5be164751..68257fe8f 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -10,7 +10,7 @@ from clan_cli.errors import ClanError from clan_cli.git import commit_files from clan_cli.machines.types import machine_name_type, validate_hostname -from . import secrets +from . import secrets, sops from .folders import ( list_objects, remove_object, @@ -24,7 +24,7 @@ from .types import public_or_private_age_key_type, secret_name_type def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None: machine_path = sops_machines_folder(flake_dir) / machine - write_key(machine_path, pubkey, force) + write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force) paths = [machine_path] def filter_machine_secrets(secret: Path) -> bool: @@ -48,7 +48,8 @@ def remove_machine(flake_dir: Path, name: str) -> None: def get_machine(flake_dir: Path, name: str) -> str: - return read_key(sops_machines_folder(flake_dir) / name) + key, _ = read_key(sops_machines_folder(flake_dir) / name) + return key def has_machine(flake_dir: Path, name: str) -> bool: @@ -168,7 +169,7 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: add_dynamic_completer(add_machine_action, complete_machines) add_parser.add_argument( "key", - help="public key or private key of the user", + help="public or private age key of the machine", type=public_or_private_age_key_type, ) add_parser.set_defaults(func=add_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index cf262d9d5..3374c7668 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -1,5 +1,7 @@ import argparse +import functools import getpass +import operator import os import shutil import sys @@ -20,6 +22,7 @@ from clan_cli.completions import ( from clan_cli.errors import ClanError from clan_cli.git import commit_files +from . import sops from .folders import ( list_objects, sops_groups_folder, @@ -42,13 +45,13 @@ def update_secrets( changed_files.extend( update_keys( secret_path, - sorted(collect_keys_for_path(secret_path)), + sorted_keys(collect_keys_for_path(secret_path)), ) ) return changed_files -def collect_keys_for_type(folder: Path) -> set[str]: +def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]: if not folder.exists(): return set() keys = set() @@ -68,7 +71,7 @@ def collect_keys_for_type(folder: Path) -> set[str]: return keys -def collect_keys_for_path(path: Path) -> set[str]: +def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: keys = set() keys.update(collect_keys_for_type(path / "machines")) keys.update(collect_keys_for_type(path / "users")) @@ -132,8 +135,8 @@ def encrypt_secret( recipient_keys = collect_keys_for_path(secret_path) - if key.pubkey not in recipient_keys: - recipient_keys.add(key.pubkey) + if (key.pubkey, key.key_type) not in recipient_keys: + recipient_keys.add((key.pubkey, key.key_type)) files_to_commit.extend( allow_member( users_folder(secret_path), @@ -144,7 +147,7 @@ def encrypt_secret( ) secret_path = secret_path / "secret" - encrypt_file(secret_path, value, sorted(recipient_keys)) + encrypt_file(secret_path, value, sorted_keys(recipient_keys)) files_to_commit.append(secret_path) if git_commit: commit_files( @@ -228,7 +231,7 @@ def allow_member( changed.extend( update_keys( group_folder.parent, - sorted(collect_keys_for_path(group_folder.parent)), + sorted_keys(collect_keys_for_path(group_folder.parent)), ) ) return changed @@ -255,10 +258,13 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]: group_folder.parent.rmdir() return update_keys( - target.parent.parent, sorted(collect_keys_for_path(group_folder.parent)) + target.parent.parent, sorted_keys(collect_keys_for_path(group_folder.parent)) ) +sorted_keys = functools.partial(sorted, key=operator.itemgetter(0)) + + def has_secret(secret_path: Path) -> bool: return (secret_path / "secret").exists() diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 99bde4481..b3a90483c 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,3 +1,4 @@ +import enum import io import json import os @@ -19,13 +20,32 @@ from clan_cli.nix import nix_shell from .folders import sops_machines_folder, sops_users_folder -@dataclass +class KeyType(enum.Enum): + AGE = enum.auto() + PGP = enum.auto() + + @classmethod + def validate(cls, value: str | None) -> "KeyType | None": # noqa: ANN102 + if value: + return cls.__members__.get(value.upper()) + return None + + +@dataclass(frozen=True, eq=False) class SopsKey: pubkey: str username: str + key_type: KeyType + + def as_dict(self) -> dict[str, str]: + return { + "publickey": self.pubkey, + "username": self.username, + "type": self.key_type.name.lower(), + } -def get_public_key(privkey: str) -> str: +def get_public_age_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) try: res = subprocess.run( @@ -78,8 +98,7 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: - key = SopsKey(pub_key, username="") +def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -87,20 +106,20 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: for user in folder.iterdir(): if not (user / "key.json").exists(): continue - if read_key(user) == pub_key: - key.username = user.name - return key + this_pub_key, this_key_type = read_key(user) + if key.pubkey == this_pub_key and key.key_type == this_key_type: + return SopsKey(key.pubkey, user.name, key.key_type) return None @API.register -def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey: - key = maybe_get_user_or_machine(flake_dir, pub_key) - if not key: - msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" - raise ClanError(msg) - return key +def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: + maybe_key = maybe_get_user_or_machine(flake_dir, key) + if maybe_key: + return maybe_key + msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)" + raise ClanError(msg) def default_admin_key_path() -> Path: @@ -111,43 +130,59 @@ def default_admin_key_path() -> Path: @API.register -def maybe_get_admin_public_key() -> str | None: - key = os.environ.get("SOPS_AGE_KEY") - if key: - return get_public_key(key) +def maybe_get_admin_public_key() -> None | SopsKey: + age_key = os.environ.get("SOPS_AGE_KEY") + pgp_key = os.environ.get("SOPS_PGP_FP") + if age_key and pgp_key: + msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." + raise ClanError(msg) + if age_key: + return SopsKey( + pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username="" + ) + if pgp_key: + return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="") + path = default_admin_key_path() if path.exists(): - return get_public_key(path.read_text()) + return SopsKey( + pubkey=get_public_age_key(path.read_text()), + key_type=KeyType.AGE, + username="", + ) return None def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - pub_key = maybe_get_admin_public_key() - if pub_key: - return maybe_get_user_or_machine(flake_dir, pub_key) - return None + key = maybe_get_admin_public_key() + if not key: + return None + return maybe_get_user_or_machine(flake_dir, key) def ensure_admin_key(flake_dir: Path) -> SopsKey: - pub_key = maybe_get_admin_public_key() - if not pub_key: - msg = "No sops key found. Please generate one with 'clan secrets key generate'." - raise ClanError(msg) - return ensure_user_or_machine(flake_dir, pub_key) + key = maybe_get_admin_public_key() + if key: + return ensure_user_or_machine(flake_dir, key) + msg = "No sops key found. Please generate one with 'clan secrets key generate'." + raise ClanError(msg) @contextmanager -def sops_manifest(keys: list[str]) -> Iterator[Path]: +def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]: + all_keys: dict[str, list[str]] = { + key_type.lower(): [] for key_type in KeyType.__members__ + } + for key, key_type in keys: + all_keys[key_type.name.lower()].append(key) with NamedTemporaryFile(delete=False, mode="w") as manifest: - json.dump( - {"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2 - ) + json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2) manifest.flush() yield Path(manifest.name) -def update_keys(secret_path: Path, keys: list[str]) -> list[Path]: +def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]: with sops_manifest(keys) as manifest: secret_path = secret_path / "secret" time_before = secret_path.stat().st_mtime @@ -171,7 +206,7 @@ def update_keys(secret_path: Path, keys: list[str]) -> list[Path]: def encrypt_file( secret_path: Path, content: IO[str] | str | bytes | None, - pubkeys: list[str], + pubkeys: list[tuple[str, KeyType]], ) -> None: folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) @@ -237,7 +272,7 @@ def get_meta(secret_path: Path) -> dict: return json.load(f) -def write_key(path: Path, publickey: str, overwrite: bool) -> None: +def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None: path.mkdir(parents=True, exist_ok=True) try: flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC @@ -248,21 +283,23 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None: msg = f"{path.name} already exists in {path}. Use --force to overwrite." raise ClanError(msg) from e with os.fdopen(fd, "w") as f: - json.dump({"publickey": publickey, "type": "age"}, f, indent=2) + contents = {"publickey": publickey, "type": key_type.name.lower()} + json.dump(contents, f, indent=2) -def read_key(path: Path) -> str: +def read_key(path: Path) -> tuple[str, KeyType]: with Path(path / "key.json").open() as f: try: key = json.load(f) except json.JSONDecodeError as e: msg = f"Failed to decode {path.name}: {e}" raise ClanError(msg) from e - if key["type"] != "age": - msg = f"{path.name} is not an age key but {key['type']}. This is not supported" + key_type = KeyType.validate(key.get("type")) + if key_type is None: + msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})." raise ClanError(msg) publickey = key.get("publickey") if not publickey: msg = f"{path.name} does not contain a public key" raise ClanError(msg) - return publickey + return publickey, key_type diff --git a/pkgs/clan-cli/clan_cli/secrets/types.py b/pkgs/clan-cli/clan_cli/secrets/types.py index 405e39418..0a2121cab 100644 --- a/pkgs/clan-cli/clan_cli/secrets/types.py +++ b/pkgs/clan-cli/clan_cli/secrets/types.py @@ -5,7 +5,7 @@ from pathlib import Path from clan_cli.errors import ClanError -from .sops import get_public_key +from .sops import get_public_age_key VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$") VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$") @@ -24,7 +24,7 @@ def public_or_private_age_key_type(arg_value: str) -> str: if arg_value.startswith("age1"): return arg_value.strip() if arg_value.startswith("AGE-SECRET-KEY-"): - return get_public_key(arg_value) + return get_public_age_key(arg_value) if not arg_value.startswith("age1"): msg = f"Please provide an age key starting with age1, got: '{arg_value}'" raise ClanError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 46f4834c3..5348d1405 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -1,11 +1,13 @@ import argparse +import json +import sys from pathlib import Path from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users from clan_cli.errors import ClanError from clan_cli.git import commit_files -from . import secrets +from . import secrets, sops from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder from .secrets import update_secrets from .sops import read_key, write_key @@ -17,13 +19,19 @@ from .types import ( ) -def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None: +def add_user( + flake_dir: Path, + name: str, + key: str, + key_type: sops.KeyType, + force: bool, +) -> None: path = sops_users_folder(flake_dir) / name def filter_user_secrets(secret: Path) -> bool: return secret.joinpath("users", name).exists() - write_key(path, key, force) + write_key(path, key, key_type, overwrite=force) paths = [path] paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets)) commit_files( @@ -42,8 +50,9 @@ def remove_user(flake_dir: Path, name: str) -> None: ) -def get_user(flake_dir: Path, name: str) -> str: - return read_key(sops_users_folder(flake_dir) / name) +def get_user(flake_dir: Path, name: str) -> sops.SopsKey: + key, key_type = read_key(sops_users_folder(flake_dir) / name) + return sops.SopsKey(key, name, key_type) def list_users(flake_dir: Path) -> list[str]: @@ -95,14 +104,24 @@ def add_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - add_user(args.flake.path, args.user, args.key, args.force) + if args.age_key or args.agekey: + key_type = sops.KeyType.AGE + elif args.pgp_key: + key_type = sops.KeyType.PGP + else: + msg = "BUG!: key type not set" + raise ValueError(msg) + key = args.agekey or args.age_key or args.pgp_key + assert key is not None, "key is None" + add_user(args.flake.path, args.user, key, key_type, args.force) def get_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - print(get_user(args.flake.path, args.user)) + key = get_user(args.flake.path, args.user) + json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) def remove_command(args: argparse.Namespace) -> None: @@ -141,12 +160,29 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: "-f", "--force", help="overwrite existing user", action="store_true" ) add_parser.add_argument("user", help="the name of the user", type=user_name_type) - add_parser.add_argument( - "key", - help="public key or private key of the user." - "Execute 'clan secrets key --help' on how to retrieve a key." + key_type = add_parser.add_mutually_exclusive_group(required=True) + key_type.add_argument( + "agekey", + help="public or private age key of the user. " + "Execute 'clan secrets key --help' on how to retrieve a key. " "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", type=public_or_private_age_key_type, + nargs="?", + ) + key_type.add_argument( + "--age-key", + help="public or private age key of the user. " + "Execute 'clan secrets key --help' on how to retrieve a key. " + "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", + type=public_or_private_age_key_type, + ) + key_type.add_argument( + "--pgp-key", + help=( + "public PGP encryption key of the user. " + # Use --fingerprint --fingerprint to get fingerprints for subkeys: + "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." + ), ) add_parser.set_defaults(func=add_command) diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index fe1a888ae..4e7b90602 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -60,7 +60,7 @@ class SecretStore(SecretStoreBase): ) -> bool: secret_path = self.secret_path(generator_name, secret_name, shared) secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in secret["sops"]["age"]] + recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] machines_folder_path = sops_machines_folder(self.machine.flake_dir) machine_pubkey = json.loads( (machines_folder_path / self.machine.name / "key.json").read_text() diff --git a/pkgs/clan-cli/tests/test_api_dataclass_compat.py b/pkgs/clan-cli/tests/test_api_dataclass_compat.py index 82072d4fb..24071c1c8 100644 --- a/pkgs/clan-cli/tests/test_api_dataclass_compat.py +++ b/pkgs/clan-cli/tests/test_api_dataclass_compat.py @@ -129,6 +129,9 @@ def test_all_dataclasses() -> None: try: API.reset() dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent)) + if dclass is None: + msg = f"Could not load dataclass {dataclass} from {file}" + raise ClanError(msg) type_to_dict(dclass) except JSchemaTypeError as e: print(f"Error loading dataclass {dataclass} from {file}: {e}") diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index bb36af2f9..85003a541 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -1,7 +1,12 @@ +import functools +import json import logging import os +import re +import subprocess from collections.abc import Iterator from contextlib import contextmanager +from pathlib import Path from typing import TYPE_CHECKING import pytest @@ -227,7 +232,7 @@ def test_groups( @contextmanager -def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: +def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: old_key = os.environ["SOPS_AGE_KEY_FILE"] monkeypatch.delenv("SOPS_AGE_KEY_FILE") monkeypatch.setenv("SOPS_AGE_KEY", key) @@ -238,29 +243,95 @@ def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key) +@contextmanager +def use_gpg_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: + old_key_file = os.environ.get("SOPS_AGE_KEY_FILE") + old_key = os.environ.get("SOPS_AGE_KEY") + monkeypatch.delenv("SOPS_AGE_KEY_FILE", raising=False) + monkeypatch.delenv("SOPS_AGE_KEY", raising=False) + monkeypatch.setenv("SOPS_PGP_FP", key) + try: + yield + finally: + monkeypatch.delenv("SOPS_PGP_FP") + if old_key_file is not None: + monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key_file) + if old_key is not None: + monkeypatch.setenv("SOPS_AGE_KEY", old_key) + + +@pytest.fixture +def gpg_key( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +) -> str: + gpg_home = tmp_path / "gnupghome" + gpg_home.mkdir(mode=0o700) + + gpg_environ = os.environ.copy() + gpg_environ["GNUPGHOME"] = str(gpg_home) + run = functools.partial( + subprocess.run, + encoding="utf-8", + check=True, + env=gpg_environ, + ) + key_parameters = "\n".join( + ( + "%no-protection", + "%transient-key", + "Key-Type: rsa", + "Key-Usage: cert encrypt", + "Name-Real: Foo Bar", + "Name-Comment: Test user", + "Name-Email: test@clan.lol", + "%commit", + ) + ) + run(["gpg", "--batch", "--quiet", "--generate-key"], input=key_parameters) + details = run(["gpg", "--list-keys", "--with-colons"], capture_output=True) + fingerprint = None + for line in details.stdout.strip().split(os.linesep): + if not line.startswith("fpr"): + continue + fingerprint = line.split(":")[9] + break + assert fingerprint is not None, "Could not generate test GPG key" + log.info(f"Created GPG key under {gpg_home}") + + monkeypatch.setenv("GNUPGHOME", str(gpg_home)) + return fingerprint + + def test_secrets( test_flake: FlakeForTest, capture_output: CaptureOutput, monkeypatch: pytest.MonkeyPatch, + gpg_key: str, age_keys: list["KeyPair"], ) -> None: with capture_output as output: cli.run(["secrets", "list", "--flake", str(test_flake.path)]) assert output.out == "" - monkeypatch.setenv("SOPS_NIX_SECRET", "foo") + # Generate a new key for the clan monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key")) - cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) + with capture_output as output: + cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) + assert "age private key" in output.out + # Read the key that was generated with capture_output as output: cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) - key = output.out + key = json.loads(output.out)["publickey"] assert key.startswith("age1") + # Add testuser with the key that was generated for the clan cli.run( ["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key] ) with pytest.raises(ClanError): # does not exist yet cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"]) + monkeypatch.setenv("SOPS_NIX_SECRET", "foo") cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"]) with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"]) @@ -289,6 +360,8 @@ def test_secrets( cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"]) assert output.out == "key\n" + # using the `age_keys` KeyPair, add a machine and rotate its key + cli.run( [ "secrets", @@ -315,7 +388,7 @@ def test_secrets( cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)]) assert output.out == "machine1\n" - with use_key(age_keys[1].privkey, monkeypatch): + with use_age_key(age_keys[1].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -335,7 +408,7 @@ def test_secrets( ) # should also rotate the encrypted secret - with use_key(age_keys[0].privkey, monkeypatch): + with use_age_key(age_keys[0].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -374,7 +447,7 @@ def test_secrets( "key", ] ) - with capture_output as output, use_key(age_keys[1].privkey, monkeypatch): + with capture_output as output, use_age_key(age_keys[1].privkey, monkeypatch): cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" cli.run( @@ -447,12 +520,12 @@ def test_secrets( ] ) - with use_key(age_keys[1].privkey, monkeypatch): + with use_age_key(age_keys[1].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" - # extend group will update secrets + # Add an user with a GPG key cli.run( [ "secrets", @@ -460,10 +533,13 @@ def test_secrets( "add", "--flake", str(test_flake.path), + "--pgp-key", + gpg_key, "user2", - age_keys[2].pubkey, ] ) + + # Extend group will update secrets cli.run( [ "secrets", @@ -476,7 +552,7 @@ def test_secrets( ] ) - with use_key(age_keys[2].privkey, monkeypatch): # user2 + with use_gpg_key(gpg_key, monkeypatch): # user2 with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -494,7 +570,7 @@ def test_secrets( ) with ( pytest.raises(ClanError), - use_key(age_keys[2].privkey, monkeypatch), + use_gpg_key(gpg_key, monkeypatch), capture_output as output, ): # user2 is not in the group anymore @@ -519,3 +595,66 @@ def test_secrets( with capture_output as output: cli.run(["secrets", "list", "--flake", str(test_flake.path)]) assert output.out == "" + + +def test_secrets_key_generate_gpg( + test_flake: FlakeForTest, + capture_output: CaptureOutput, + monkeypatch: pytest.MonkeyPatch, + gpg_key: str, +) -> None: + with use_gpg_key(gpg_key, monkeypatch): + # Make sure clan secrets key generate recognizes + # the PGP key and does nothing: + with capture_output as output: + cli.run( + [ + "secrets", + "key", + "generate", + "--flake", + str(test_flake.path), + ] + ) + assert "age private key" not in output.out + assert re.match(r"PGP key.+is already set", output.out) is not None + + with capture_output as output: + cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) + key = json.loads(output.out) + assert key["type"] == "pgp" + assert key["publickey"] == gpg_key + + # Add testuser with the key that was (not) generated for the clan: + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + "--pgp-key", + gpg_key, + "testuser", + ] + ) + with capture_output as output: + cli.run( + [ + "secrets", + "users", + "get", + "--flake", + str(test_flake.path), + "testuser", + ] + ) + key = json.loads(output.out) + assert key["type"] == "pgp" + assert key["publickey"] == gpg_key + + monkeypatch.setenv("SOPS_NIX_SECRET", "secret-value") + cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"]) + with capture_output as output: + cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-name"]) + assert output.out == "secret-value"