diff --git a/pkgs/clan-cli/clan_cli/api/serde.py b/pkgs/clan-cli/clan_cli/api/serde.py index 61e4f1196..b69f57d3f 100644 --- a/pkgs/clan-cli/clan_cli/api/serde.py +++ b/pkgs/clan-cli/clan_cli/api/serde.py @@ -32,7 +32,6 @@ Note: This module assumes the presence of other modules and classes such as `Cla import dataclasses import json from dataclasses import dataclass, fields, is_dataclass -from enum import Enum from pathlib import Path from types import UnionType from typing import ( @@ -180,31 +179,25 @@ def construct_value( # Nested types # list # dict - origin = get_origin(t) - if origin is list: + if get_origin(t) is list: if not isinstance(field_value, list): msg = f"Expected list, got {field_value}" raise ClanError(msg, location=f"{loc}") return [construct_value(get_args(t)[0], item) for item in field_value] - if origin is dict and isinstance(field_value, dict): + if get_origin(t) is dict and isinstance(field_value, dict): return { key: construct_value(get_args(t)[1], value) for key, value in field_value.items() } - if origin is Literal: + if get_origin(t) is Literal: valid_values = get_args(t) if field_value not in valid_values: - msg = f"Expected one of {', '.join(valid_values)}, got {field_value}" + msg = f"Expected one of {valid_values}, got {field_value}" raise ClanError(msg, location=f"{loc}") return field_value - if origin is Enum: - if field_value not in origin.__members__: - msg = f"Expected one of {', '.join(origin.__members__)}, got {field_value}" - raise ClanError(msg, location=f"{loc}") - - if origin is Annotated: + if get_origin(t) is Annotated: (base_type,) = get_args(t) return construct_value(base_type, field_value) diff --git a/pkgs/clan-cli/clan_cli/api/util.py b/pkgs/clan-cli/clan_cli/api/util.py index 3ff0ffc8c..45c762a22 100644 --- a/pkgs/clan-cli/clan_cli/api/util.py +++ b/pkgs/clan-cli/clan_cli/api/util.py @@ -2,7 +2,6 @@ import copy import dataclasses import pathlib from dataclasses import MISSING -from enum import EnumType from types import NoneType, UnionType from typing import ( Annotated, @@ -78,16 +77,13 @@ def type_to_dict( if dataclasses.is_dataclass(t): fields = dataclasses.fields(t) - properties = {} - for f in fields: - if f.name.startswith("_"): - continue - assert not isinstance( - f.type, str - ), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" - properties[f.metadata.get("alias", f.name)] = type_to_dict( + properties = { + f.metadata.get("alias", f.name): type_to_dict( f.type, f"{scope} {t.__name__}.{f.name}", type_map ) + for f in fields + if not f.name.startswith("_") + } required = set() for pn, pv in properties.items(): @@ -196,11 +192,6 @@ def type_to_dict( return {"type": "boolean"} if t is object: return {"type": "object"} - if type(t) is EnumType: - return { - "type": "string", - "enum": list(t.__members__), - } if t is Any: msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}" raise JSchemaTypeError(msg) @@ -217,7 +208,7 @@ def type_to_dict( if t is NoneType: return {"type": "null"} - msg = f"{scope} - Basic type '{t!s}' is not supported" + msg = f"{scope} - Error primitive type not supported {t!s}" raise JSchemaTypeError(msg) - msg = f"{scope} - Type '{t!s}' is not supported" + msg = f"{scope} - Error type not supported {t!s}" raise JSchemaTypeError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index ea5cbb3e8..4fa9cd378 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -1,49 +1,63 @@ import argparse -import json import logging -import sys +from pathlib import Path from clan_cli.errors import ClanError from clan_cli.git import commit_files -from . import sops from .secrets import update_secrets -from .sops import ( - default_admin_key_path, - generate_private_key, - maybe_get_admin_public_key, -) +from .sops import default_admin_key_path, generate_private_key, get_public_key log = logging.getLogger(__name__) -def generate_key() -> sops.SopsKey: - key = maybe_get_admin_public_key() - if key is not None: - print(f"{key.key_type.name} key {key.pubkey} is already set") - return key +def extract_public_key(filepath: Path) -> str: + """ + Extracts the public key from a given text file. + """ + try: + with filepath.open() as file: + for line in file: + # Check if the line contains the public key + if line.startswith("# public key:"): + # Extract and return the public key part after the prefix + return line.strip().split(": ")[1] + except FileNotFoundError as e: + msg = f"The file at {filepath} was not found." + raise ClanError(msg) from e + except OSError as e: + msg = f"An error occurred while extracting the public key: {e}" + raise ClanError(msg) from e + msg = f"Could not find the public key in the file at {filepath}." + raise ClanError(msg) + + +def generate_key() -> str: path = default_admin_key_path() - _, pub_key = generate_private_key(out_file=path) - print( - f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets." + if path.exists(): + log.info(f"Key already exists at {path}") + return extract_public_key(path) + priv_key, pub_key = generate_private_key(out_file=path) + log.info( + f"Generated age private key at '{default_admin_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets." ) - return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE) + return pub_key + + +def show_key() -> str: + return get_public_key(default_admin_key_path().read_text()) def generate_command(args: argparse.Namespace) -> None: - key = generate_key() - print("Also add your age public key to the repository with:") - key_type = key.key_type.name.lower() - print(f"clan secrets users add --{key_type}-key ") + pub_key = generate_key() + log.info( + f"Also add your age public key to the repository with: \nclan secrets users add {pub_key}" + ) def show_command(args: argparse.Namespace) -> None: - key = sops.maybe_get_admin_public_key() - if not key: - msg = "No public key found" - raise ClanError(msg) - json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) + print(show_key()) def update_command(args: argparse.Namespace) -> None: @@ -59,16 +73,10 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None: required=True, ) - parser_generate = subparser.add_parser( - "generate", - description=( - "Generate an age key for the Clan, " - "to use PGP set `SOPS_PGP_FP` in your environment." - ), - ) + parser_generate = subparser.add_parser("generate", help="generate age key") parser_generate.set_defaults(func=generate_command) - parser_show = subparser.add_parser("show", help="show public key") + parser_show = subparser.add_parser("show", help="show age public key") parser_show.set_defaults(func=show_command) parser_update = subparser.add_parser( diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 68257fe8f..5be164751 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -10,7 +10,7 @@ from clan_cli.errors import ClanError from clan_cli.git import commit_files from clan_cli.machines.types import machine_name_type, validate_hostname -from . import secrets, sops +from . import secrets from .folders import ( list_objects, remove_object, @@ -24,7 +24,7 @@ from .types import public_or_private_age_key_type, secret_name_type def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None: machine_path = sops_machines_folder(flake_dir) / machine - write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force) + write_key(machine_path, pubkey, force) paths = [machine_path] def filter_machine_secrets(secret: Path) -> bool: @@ -48,8 +48,7 @@ def remove_machine(flake_dir: Path, name: str) -> None: def get_machine(flake_dir: Path, name: str) -> str: - key, _ = read_key(sops_machines_folder(flake_dir) / name) - return key + return read_key(sops_machines_folder(flake_dir) / name) def has_machine(flake_dir: Path, name: str) -> bool: @@ -169,7 +168,7 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None: add_dynamic_completer(add_machine_action, complete_machines) add_parser.add_argument( "key", - help="public or private age key of the machine", + help="public key or private key of the user", type=public_or_private_age_key_type, ) add_parser.set_defaults(func=add_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index 3374c7668..cf262d9d5 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -1,7 +1,5 @@ import argparse -import functools import getpass -import operator import os import shutil import sys @@ -22,7 +20,6 @@ from clan_cli.completions import ( from clan_cli.errors import ClanError from clan_cli.git import commit_files -from . import sops from .folders import ( list_objects, sops_groups_folder, @@ -45,13 +42,13 @@ def update_secrets( changed_files.extend( update_keys( secret_path, - sorted_keys(collect_keys_for_path(secret_path)), + sorted(collect_keys_for_path(secret_path)), ) ) return changed_files -def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]: +def collect_keys_for_type(folder: Path) -> set[str]: if not folder.exists(): return set() keys = set() @@ -71,7 +68,7 @@ def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]: return keys -def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: +def collect_keys_for_path(path: Path) -> set[str]: keys = set() keys.update(collect_keys_for_type(path / "machines")) keys.update(collect_keys_for_type(path / "users")) @@ -135,8 +132,8 @@ def encrypt_secret( recipient_keys = collect_keys_for_path(secret_path) - if (key.pubkey, key.key_type) not in recipient_keys: - recipient_keys.add((key.pubkey, key.key_type)) + if key.pubkey not in recipient_keys: + recipient_keys.add(key.pubkey) files_to_commit.extend( allow_member( users_folder(secret_path), @@ -147,7 +144,7 @@ def encrypt_secret( ) secret_path = secret_path / "secret" - encrypt_file(secret_path, value, sorted_keys(recipient_keys)) + encrypt_file(secret_path, value, sorted(recipient_keys)) files_to_commit.append(secret_path) if git_commit: commit_files( @@ -231,7 +228,7 @@ def allow_member( changed.extend( update_keys( group_folder.parent, - sorted_keys(collect_keys_for_path(group_folder.parent)), + sorted(collect_keys_for_path(group_folder.parent)), ) ) return changed @@ -258,13 +255,10 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]: group_folder.parent.rmdir() return update_keys( - target.parent.parent, sorted_keys(collect_keys_for_path(group_folder.parent)) + target.parent.parent, sorted(collect_keys_for_path(group_folder.parent)) ) -sorted_keys = functools.partial(sorted, key=operator.itemgetter(0)) - - def has_secret(secret_path: Path) -> bool: return (secret_path / "secret").exists() diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index b3a90483c..99bde4481 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,4 +1,3 @@ -import enum import io import json import os @@ -20,32 +19,13 @@ from clan_cli.nix import nix_shell from .folders import sops_machines_folder, sops_users_folder -class KeyType(enum.Enum): - AGE = enum.auto() - PGP = enum.auto() - - @classmethod - def validate(cls, value: str | None) -> "KeyType | None": # noqa: ANN102 - if value: - return cls.__members__.get(value.upper()) - return None - - -@dataclass(frozen=True, eq=False) +@dataclass class SopsKey: pubkey: str username: str - key_type: KeyType - - def as_dict(self) -> dict[str, str]: - return { - "publickey": self.pubkey, - "username": self.username, - "type": self.key_type.name.lower(), - } -def get_public_age_key(privkey: str) -> str: +def get_public_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) try: res = subprocess.run( @@ -98,7 +78,8 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: +def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: + key = SopsKey(pub_key, username="") folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -106,20 +87,20 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: for user in folder.iterdir(): if not (user / "key.json").exists(): continue - this_pub_key, this_key_type = read_key(user) - if key.pubkey == this_pub_key and key.key_type == this_key_type: - return SopsKey(key.pubkey, user.name, key.key_type) + if read_key(user) == pub_key: + key.username = user.name + return key return None @API.register -def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: - maybe_key = maybe_get_user_or_machine(flake_dir, key) - if maybe_key: - return maybe_key - msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)" - raise ClanError(msg) +def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey: + key = maybe_get_user_or_machine(flake_dir, pub_key) + if not key: + msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" + raise ClanError(msg) + return key def default_admin_key_path() -> Path: @@ -130,59 +111,43 @@ def default_admin_key_path() -> Path: @API.register -def maybe_get_admin_public_key() -> None | SopsKey: - age_key = os.environ.get("SOPS_AGE_KEY") - pgp_key = os.environ.get("SOPS_PGP_FP") - if age_key and pgp_key: - msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." - raise ClanError(msg) - if age_key: - return SopsKey( - pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username="" - ) - if pgp_key: - return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="") - +def maybe_get_admin_public_key() -> str | None: + key = os.environ.get("SOPS_AGE_KEY") + if key: + return get_public_key(key) path = default_admin_key_path() if path.exists(): - return SopsKey( - pubkey=get_public_age_key(path.read_text()), - key_type=KeyType.AGE, - username="", - ) + return get_public_key(path.read_text()) return None def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - key = maybe_get_admin_public_key() - if not key: - return None - return maybe_get_user_or_machine(flake_dir, key) + pub_key = maybe_get_admin_public_key() + if pub_key: + return maybe_get_user_or_machine(flake_dir, pub_key) + return None def ensure_admin_key(flake_dir: Path) -> SopsKey: - key = maybe_get_admin_public_key() - if key: - return ensure_user_or_machine(flake_dir, key) - msg = "No sops key found. Please generate one with 'clan secrets key generate'." - raise ClanError(msg) + pub_key = maybe_get_admin_public_key() + if not pub_key: + msg = "No sops key found. Please generate one with 'clan secrets key generate'." + raise ClanError(msg) + return ensure_user_or_machine(flake_dir, pub_key) @contextmanager -def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]: - all_keys: dict[str, list[str]] = { - key_type.lower(): [] for key_type in KeyType.__members__ - } - for key, key_type in keys: - all_keys[key_type.name.lower()].append(key) +def sops_manifest(keys: list[str]) -> Iterator[Path]: with NamedTemporaryFile(delete=False, mode="w") as manifest: - json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2) + json.dump( + {"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2 + ) manifest.flush() yield Path(manifest.name) -def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]: +def update_keys(secret_path: Path, keys: list[str]) -> list[Path]: with sops_manifest(keys) as manifest: secret_path = secret_path / "secret" time_before = secret_path.stat().st_mtime @@ -206,7 +171,7 @@ def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path def encrypt_file( secret_path: Path, content: IO[str] | str | bytes | None, - pubkeys: list[tuple[str, KeyType]], + pubkeys: list[str], ) -> None: folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) @@ -272,7 +237,7 @@ def get_meta(secret_path: Path) -> dict: return json.load(f) -def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None: +def write_key(path: Path, publickey: str, overwrite: bool) -> None: path.mkdir(parents=True, exist_ok=True) try: flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC @@ -283,23 +248,21 @@ def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> msg = f"{path.name} already exists in {path}. Use --force to overwrite." raise ClanError(msg) from e with os.fdopen(fd, "w") as f: - contents = {"publickey": publickey, "type": key_type.name.lower()} - json.dump(contents, f, indent=2) + json.dump({"publickey": publickey, "type": "age"}, f, indent=2) -def read_key(path: Path) -> tuple[str, KeyType]: +def read_key(path: Path) -> str: with Path(path / "key.json").open() as f: try: key = json.load(f) except json.JSONDecodeError as e: msg = f"Failed to decode {path.name}: {e}" raise ClanError(msg) from e - key_type = KeyType.validate(key.get("type")) - if key_type is None: - msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})." + if key["type"] != "age": + msg = f"{path.name} is not an age key but {key['type']}. This is not supported" raise ClanError(msg) publickey = key.get("publickey") if not publickey: msg = f"{path.name} does not contain a public key" raise ClanError(msg) - return publickey, key_type + return publickey diff --git a/pkgs/clan-cli/clan_cli/secrets/types.py b/pkgs/clan-cli/clan_cli/secrets/types.py index 0a2121cab..405e39418 100644 --- a/pkgs/clan-cli/clan_cli/secrets/types.py +++ b/pkgs/clan-cli/clan_cli/secrets/types.py @@ -5,7 +5,7 @@ from pathlib import Path from clan_cli.errors import ClanError -from .sops import get_public_age_key +from .sops import get_public_key VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$") VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$") @@ -24,7 +24,7 @@ def public_or_private_age_key_type(arg_value: str) -> str: if arg_value.startswith("age1"): return arg_value.strip() if arg_value.startswith("AGE-SECRET-KEY-"): - return get_public_age_key(arg_value) + return get_public_key(arg_value) if not arg_value.startswith("age1"): msg = f"Please provide an age key starting with age1, got: '{arg_value}'" raise ClanError(msg) diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index 5348d1405..46f4834c3 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -1,13 +1,11 @@ import argparse -import json -import sys from pathlib import Path from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users from clan_cli.errors import ClanError from clan_cli.git import commit_files -from . import secrets, sops +from . import secrets from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder from .secrets import update_secrets from .sops import read_key, write_key @@ -19,19 +17,13 @@ from .types import ( ) -def add_user( - flake_dir: Path, - name: str, - key: str, - key_type: sops.KeyType, - force: bool, -) -> None: +def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None: path = sops_users_folder(flake_dir) / name def filter_user_secrets(secret: Path) -> bool: return secret.joinpath("users", name).exists() - write_key(path, key, key_type, overwrite=force) + write_key(path, key, force) paths = [path] paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets)) commit_files( @@ -50,9 +42,8 @@ def remove_user(flake_dir: Path, name: str) -> None: ) -def get_user(flake_dir: Path, name: str) -> sops.SopsKey: - key, key_type = read_key(sops_users_folder(flake_dir) / name) - return sops.SopsKey(key, name, key_type) +def get_user(flake_dir: Path, name: str) -> str: + return read_key(sops_users_folder(flake_dir) / name) def list_users(flake_dir: Path) -> list[str]: @@ -104,24 +95,14 @@ def add_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - if args.age_key or args.agekey: - key_type = sops.KeyType.AGE - elif args.pgp_key: - key_type = sops.KeyType.PGP - else: - msg = "BUG!: key type not set" - raise ValueError(msg) - key = args.agekey or args.age_key or args.pgp_key - assert key is not None, "key is None" - add_user(args.flake.path, args.user, key, key_type, args.force) + add_user(args.flake.path, args.user, args.key, args.force) def get_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - key = get_user(args.flake.path, args.user) - json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) + print(get_user(args.flake.path, args.user)) def remove_command(args: argparse.Namespace) -> None: @@ -160,29 +141,12 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None: "-f", "--force", help="overwrite existing user", action="store_true" ) add_parser.add_argument("user", help="the name of the user", type=user_name_type) - key_type = add_parser.add_mutually_exclusive_group(required=True) - key_type.add_argument( - "agekey", - help="public or private age key of the user. " - "Execute 'clan secrets key --help' on how to retrieve a key. " + add_parser.add_argument( + "key", + help="public key or private key of the user." + "Execute 'clan secrets key --help' on how to retrieve a key." "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", type=public_or_private_age_key_type, - nargs="?", - ) - key_type.add_argument( - "--age-key", - help="public or private age key of the user. " - "Execute 'clan secrets key --help' on how to retrieve a key. " - "To fetch an age key from an SSH host key: ssh-keyscan | nix shell nixpkgs#ssh-to-age -c ssh-to-age", - type=public_or_private_age_key_type, - ) - key_type.add_argument( - "--pgp-key", - help=( - "public PGP encryption key of the user. " - # Use --fingerprint --fingerprint to get fingerprints for subkeys: - "Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it." - ), ) add_parser.set_defaults(func=add_command) diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index 42d0f7bbb..761621674 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -229,7 +229,7 @@ class SecretStore(SecretStoreBase): ) -> bool: secret_path = self.secret_path(generator_name, secret_name, shared) secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] + recipients = [r["recipient"] for r in secret["sops"]["age"]] machines_folder_path = sops_machines_folder(self.machine.flake_dir) machine_pubkey = json.loads( (machines_folder_path / self.machine.name / "key.json").read_text() diff --git a/pkgs/clan-cli/tests/test_api_dataclass_compat.py b/pkgs/clan-cli/tests/test_api_dataclass_compat.py index 24071c1c8..82072d4fb 100644 --- a/pkgs/clan-cli/tests/test_api_dataclass_compat.py +++ b/pkgs/clan-cli/tests/test_api_dataclass_compat.py @@ -129,9 +129,6 @@ def test_all_dataclasses() -> None: try: API.reset() dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent)) - if dclass is None: - msg = f"Could not load dataclass {dataclass} from {file}" - raise ClanError(msg) type_to_dict(dclass) except JSchemaTypeError as e: print(f"Error loading dataclass {dataclass} from {file}: {e}") diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index 85003a541..bb36af2f9 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -1,12 +1,7 @@ -import functools -import json import logging import os -import re -import subprocess from collections.abc import Iterator from contextlib import contextmanager -from pathlib import Path from typing import TYPE_CHECKING import pytest @@ -232,7 +227,7 @@ def test_groups( @contextmanager -def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: +def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: old_key = os.environ["SOPS_AGE_KEY_FILE"] monkeypatch.delenv("SOPS_AGE_KEY_FILE") monkeypatch.setenv("SOPS_AGE_KEY", key) @@ -243,95 +238,29 @@ def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key) -@contextmanager -def use_gpg_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: - old_key_file = os.environ.get("SOPS_AGE_KEY_FILE") - old_key = os.environ.get("SOPS_AGE_KEY") - monkeypatch.delenv("SOPS_AGE_KEY_FILE", raising=False) - monkeypatch.delenv("SOPS_AGE_KEY", raising=False) - monkeypatch.setenv("SOPS_PGP_FP", key) - try: - yield - finally: - monkeypatch.delenv("SOPS_PGP_FP") - if old_key_file is not None: - monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key_file) - if old_key is not None: - monkeypatch.setenv("SOPS_AGE_KEY", old_key) - - -@pytest.fixture -def gpg_key( - tmp_path: Path, - monkeypatch: pytest.MonkeyPatch, -) -> str: - gpg_home = tmp_path / "gnupghome" - gpg_home.mkdir(mode=0o700) - - gpg_environ = os.environ.copy() - gpg_environ["GNUPGHOME"] = str(gpg_home) - run = functools.partial( - subprocess.run, - encoding="utf-8", - check=True, - env=gpg_environ, - ) - key_parameters = "\n".join( - ( - "%no-protection", - "%transient-key", - "Key-Type: rsa", - "Key-Usage: cert encrypt", - "Name-Real: Foo Bar", - "Name-Comment: Test user", - "Name-Email: test@clan.lol", - "%commit", - ) - ) - run(["gpg", "--batch", "--quiet", "--generate-key"], input=key_parameters) - details = run(["gpg", "--list-keys", "--with-colons"], capture_output=True) - fingerprint = None - for line in details.stdout.strip().split(os.linesep): - if not line.startswith("fpr"): - continue - fingerprint = line.split(":")[9] - break - assert fingerprint is not None, "Could not generate test GPG key" - log.info(f"Created GPG key under {gpg_home}") - - monkeypatch.setenv("GNUPGHOME", str(gpg_home)) - return fingerprint - - def test_secrets( test_flake: FlakeForTest, capture_output: CaptureOutput, monkeypatch: pytest.MonkeyPatch, - gpg_key: str, age_keys: list["KeyPair"], ) -> None: with capture_output as output: cli.run(["secrets", "list", "--flake", str(test_flake.path)]) assert output.out == "" - # Generate a new key for the clan + monkeypatch.setenv("SOPS_NIX_SECRET", "foo") monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key")) - with capture_output as output: - cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) - assert "age private key" in output.out - # Read the key that was generated + cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) with capture_output as output: cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) - key = json.loads(output.out)["publickey"] + key = output.out assert key.startswith("age1") - # Add testuser with the key that was generated for the clan cli.run( ["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key] ) with pytest.raises(ClanError): # does not exist yet cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"]) - monkeypatch.setenv("SOPS_NIX_SECRET", "foo") cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"]) with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"]) @@ -360,8 +289,6 @@ def test_secrets( cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"]) assert output.out == "key\n" - # using the `age_keys` KeyPair, add a machine and rotate its key - cli.run( [ "secrets", @@ -388,7 +315,7 @@ def test_secrets( cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)]) assert output.out == "machine1\n" - with use_age_key(age_keys[1].privkey, monkeypatch): + with use_key(age_keys[1].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -408,7 +335,7 @@ def test_secrets( ) # should also rotate the encrypted secret - with use_age_key(age_keys[0].privkey, monkeypatch): + with use_key(age_keys[0].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -447,7 +374,7 @@ def test_secrets( "key", ] ) - with capture_output as output, use_age_key(age_keys[1].privkey, monkeypatch): + with capture_output as output, use_key(age_keys[1].privkey, monkeypatch): cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" cli.run( @@ -520,12 +447,12 @@ def test_secrets( ] ) - with use_age_key(age_keys[1].privkey, monkeypatch): + with use_key(age_keys[1].privkey, monkeypatch): with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" - # Add an user with a GPG key + # extend group will update secrets cli.run( [ "secrets", @@ -533,13 +460,10 @@ def test_secrets( "add", "--flake", str(test_flake.path), - "--pgp-key", - gpg_key, "user2", + age_keys[2].pubkey, ] ) - - # Extend group will update secrets cli.run( [ "secrets", @@ -552,7 +476,7 @@ def test_secrets( ] ) - with use_gpg_key(gpg_key, monkeypatch): # user2 + with use_key(age_keys[2].privkey, monkeypatch): # user2 with capture_output as output: cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) assert output.out == "foo" @@ -570,7 +494,7 @@ def test_secrets( ) with ( pytest.raises(ClanError), - use_gpg_key(gpg_key, monkeypatch), + use_key(age_keys[2].privkey, monkeypatch), capture_output as output, ): # user2 is not in the group anymore @@ -595,66 +519,3 @@ def test_secrets( with capture_output as output: cli.run(["secrets", "list", "--flake", str(test_flake.path)]) assert output.out == "" - - -def test_secrets_key_generate_gpg( - test_flake: FlakeForTest, - capture_output: CaptureOutput, - monkeypatch: pytest.MonkeyPatch, - gpg_key: str, -) -> None: - with use_gpg_key(gpg_key, monkeypatch): - # Make sure clan secrets key generate recognizes - # the PGP key and does nothing: - with capture_output as output: - cli.run( - [ - "secrets", - "key", - "generate", - "--flake", - str(test_flake.path), - ] - ) - assert "age private key" not in output.out - assert re.match(r"PGP key.+is already set", output.out) is not None - - with capture_output as output: - cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) - key = json.loads(output.out) - assert key["type"] == "pgp" - assert key["publickey"] == gpg_key - - # Add testuser with the key that was (not) generated for the clan: - cli.run( - [ - "secrets", - "users", - "add", - "--flake", - str(test_flake.path), - "--pgp-key", - gpg_key, - "testuser", - ] - ) - with capture_output as output: - cli.run( - [ - "secrets", - "users", - "get", - "--flake", - str(test_flake.path), - "testuser", - ] - ) - key = json.loads(output.out) - assert key["type"] == "pgp" - assert key["publickey"] == gpg_key - - monkeypatch.setenv("SOPS_NIX_SECRET", "secret-value") - cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"]) - with capture_output as output: - cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-name"]) - assert output.out == "secret-value"