diff --git a/pkgs/clan-cli/clan_cli/machines/create.py b/pkgs/clan-cli/clan_cli/machines/create.py index 1995bf38c..fc73cac93 100644 --- a/pkgs/clan-cli/clan_cli/machines/create.py +++ b/pkgs/clan-cli/clan_cli/machines/create.py @@ -81,6 +81,7 @@ def create_machine(opts: CreateOptions) -> None: msg = "Machine name must be a valid hostname" raise ClanError(msg, location="Create Machine") + # lopter@(2024-10-22): Could we just use warn and use the existing config? if dst.exists(): msg = f"Machine {machine_name} already exists in {clan_dir}" description = ( diff --git a/pkgs/clan-cli/clan_cli/machines/update.py b/pkgs/clan-cli/clan_cli/machines/update.py index 5d62adeaa..7f6b99b8c 100644 --- a/pkgs/clan-cli/clan_cli/machines/update.py +++ b/pkgs/clan-cli/clan_cli/machines/update.py @@ -114,8 +114,8 @@ def deploy_machine(machines: MachineGroup) -> None: def deploy(machine: Machine) -> None: host = machine.build_host - generate_facts([machine]) - generate_vars([machine]) + generate_facts([machine], service=None, regenerate=False) + generate_vars([machine], generator_name=None, regenerate=False) upload_secrets(machine) upload_secret_vars(machine) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index ea5cbb3e8..a4070a9de 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -9,7 +9,7 @@ from clan_cli.git import commit_files from . import sops from .secrets import update_secrets from .sops import ( - default_admin_key_path, + default_admin_private_key_path, generate_private_key, maybe_get_admin_public_key, ) @@ -23,7 +23,7 @@ def generate_key() -> sops.SopsKey: print(f"{key.key_type.name} key {key.pubkey} is already set") return key - path = default_admin_key_path() + path = default_admin_private_key_path() _, pub_key = generate_private_key(out_file=path) print( f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets." @@ -62,8 +62,9 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None: parser_generate = subparser.add_parser( "generate", description=( - "Generate an age key for the Clan, " - "to use PGP set `SOPS_PGP_FP` in your environment." + "Generate an age key for the Clan, if you already have an age " + "or PGP key, then use it to create your user, see: " + "`clan secrets users add --help'" ), ) parser_generate.set_defaults(func=generate_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index fb4ec0cbb..14e41b025 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -28,7 +28,12 @@ from .folders import ( sops_secrets_folder, sops_users_folder, ) -from .sops import decrypt_file, encrypt_file, ensure_admin_key, read_key, update_keys +from .sops import ( + decrypt_file, + encrypt_file, + read_key, + update_keys, +) from .types import VALID_SECRET_NAME, secret_name_type log = logging.getLogger(__name__) @@ -89,7 +94,7 @@ def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: def encrypt_secret( flake_dir: Path, secret_path: Path, - value: IO[str] | str | bytes | None, + value: IO[bytes] | str | bytes | None, add_users: list[str] | None = None, add_machines: list[str] | None = None, add_groups: list[str] | None = None, @@ -101,9 +106,13 @@ def encrypt_secret( add_machines = [] if add_users is None: add_users = [] - key = ensure_admin_key(flake_dir) + key = sops.ensure_admin_public_key(flake_dir) recipient_keys = set() + # encrypt_secret can be called before the secret has been created + # so don't try to call sops.update_keys on a non-existent file: + do_update_keys = False + files_to_commit = [] for user in add_users: files_to_commit.extend( @@ -111,7 +120,7 @@ def encrypt_secret( users_folder(secret_path), sops_users_folder(flake_dir), user, - False, + do_update_keys, ) ) @@ -121,7 +130,7 @@ def encrypt_secret( machines_folder(secret_path), sops_machines_folder(flake_dir), machine, - False, + do_update_keys, ) ) @@ -131,7 +140,7 @@ def encrypt_secret( groups_folder(secret_path), sops_groups_folder(flake_dir), group, - False, + do_update_keys, ) ) @@ -144,7 +153,7 @@ def encrypt_secret( users_folder(secret_path), sops_users_folder(flake_dir), key.username, - False, + do_update_keys, ) ) @@ -296,7 +305,10 @@ def list_command(args: argparse.Namespace) -> None: def decrypt_secret(flake_dir: Path, secret_path: Path) -> str: - ensure_admin_key(flake_dir) + # lopter(2024-10): I can't think of a good way to ensure that we have the + # private key for the secret. I mean we could collect all private keys we + # could find and then make sure we have the one for the secret, but that + # seems complicated for little ux gain? path = secret_path / "secret" if not path.exists(): msg = f"Secret '{secret_path!s}' does not exist" @@ -320,7 +332,7 @@ def is_tty_interactive() -> bool: def set_command(args: argparse.Namespace) -> None: env_value = os.environ.get("SOPS_NIX_SECRET") - secret_value: str | IO[str] | None = sys.stdin + secret_value: str | IO[bytes] | None = sys.stdin.buffer if args.edit: secret_value = None elif env_value: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 2e81d3b4e..d2ec87202 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,24 +1,28 @@ +import dataclasses import enum +import functools import io import json +import logging import os import shutil import subprocess -from collections.abc import Iterable, Iterator -from contextlib import contextmanager, suppress -from dataclasses import dataclass +from collections.abc import Iterable, Sequence +from contextlib import suppress from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO +from typing import IO, Protocol +import clan_cli.cmd from clan_cli.api import API -from clan_cli.cmd import Log, run from clan_cli.dirs import user_config_dir -from clan_cli.errors import ClanError +from clan_cli.errors import ClanError, CmdOut from clan_cli.nix import nix_shell from .folders import sops_machines_folder, sops_users_folder +log = logging.getLogger(__name__) + class KeyType(enum.Enum): AGE = enum.auto() @@ -30,11 +34,76 @@ class KeyType(enum.Enum): return cls.__members__.get(value.upper()) return None + @property + def sops_recipient_attr(self) -> str: + """Name of the attribute to get the recipient key from a Sops file.""" + if self == self.AGE: + return "recipient" + if self == self.PGP: + return "fp" + msg = ( + f"KeyType is not properly implemented: " + f'"sops_recipient_attr" is missing for key type "{self.name}"' + ) + raise ClanError(msg) -@dataclass(frozen=True, eq=False) + def collect_public_keys(self) -> Sequence[str]: + keyring: Sequence[str] = [] + + if self == self.AGE: + if keys := os.environ.get("SOPS_AGE_KEY"): + # SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and + # reads identities line by line. See age/keysource.go in + # Sops, and age/parse.go in Age. + for private_key in keys.strip().splitlines(): + public_key = get_public_age_key(private_key) + log.info( + f"Found age public key from a private key " + f"in the environment (SOPS_AGE_KEY): {public_key}" + ) + keyring.append(public_key) + + def maybe_read_from_path(key_path: Path) -> None: + try: + # as in parse.go in age: + lines = Path(key_path).read_text().strip().splitlines() + for private_key in filter(lambda ln: not ln.startswith("#"), lines): + public_key = get_public_age_key(private_key) + log.info( + f"Found age public key from a private key " + f"in {key_path}: {public_key}" + ) + keyring.append(public_key) + except FileNotFoundError: + return + except Exception as ex: + log.warn(f"Could not read age keys from {key_path}: {ex}") + + # Sops will try every location, see age/keysource.go + if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): + maybe_read_from_path(Path(key_path)) + maybe_read_from_path(user_config_dir() / "sops/age/keys.txt") + + return keyring + + if self == self.PGP: + if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"): + for fp in pgp_fingerprints.strip().split(","): + msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}" + log.info(msg) + keyring.append(fp) + return keyring + + msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys" + raise ClanError(msg) + + +@dataclasses.dataclass(frozen=True) class SopsKey: pubkey: str - username: str + # Two SopsKey are considered equal even + # if they don't have the same username: + username: str = dataclasses.field(compare=False) key_type: KeyType def as_dict(self) -> dict[str, str]: @@ -44,6 +113,131 @@ class SopsKey: "type": self.key_type.name.lower(), } + @classmethod + def load_dir(cls, folder: Path) -> "SopsKey": # noqa: ANN102 + """Load from the file named `keys.json` in the given directory.""" + pubkey, key_type = read_key(folder) + username = "" + return cls(pubkey, username, key_type) + + @classmethod + def collect_public_keys(cls) -> Sequence["SopsKey"]: # noqa: ANN102 + return [ + cls(pubkey=key, username="", key_type=key_type) + for key_type in KeyType + for key in key_type.collect_public_keys() + ] + + +class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go + ERROR_GENERIC = 1 + COULD_NOT_READ_INPUT_FILE = 2 + COULD_NOT_WRITE_OUTPUT_FILE = 3 + ERROR_DUMPING_TREE = 4 + ERROR_READING_CONFIG = 5 + ERROR_INVALID_KMS_ENCRYPTION_CONTEXT_FORMAT = 6 + ERROR_INVALID_SET_FORMAT = 7 + ERROR_CONFLICTING_PARAMETERS = 8 + ERROR_ENCRYPTING_MAC = 21 + ERROR_ENCRYPTING_TREE = 23 + ERROR_DECRYPTING_MAC = 24 + ERROR_DECRYPTING_TREE = 25 + CANNOT_CHANGE_KEYS_FROM_NON_EXISTENT_FILE = 49 + MAC_MISMATCH = 51 + MAC_NOT_FOUND = 52 + CONFIG_FILE_NOT_FOUND = 61 + KEYBOARD_INTERRUPT = 85 + INVALID_TREE_PATH_FORMAT = 91 + NEED_AT_LEAST_ONE_DOCUMENT = 92 + NO_FILE_SPECIFIED = 100 + COULD_NOT_RETRIEVE_KEY = 128 + NO_ENCRYPTION_KEY_FOUND = 111 + DUPLICATE_DECRYPTION_KEY_TYPE = 112 + FILE_HAS_NOT_BEEN_MODIFIED = 200 + NO_EDITOR_FOUND = 201 + FAILED_TO_COMPARE_VERSIONS = 202 + FILE_ALREADY_ENCRYPTED = 203 + + @classmethod + def parse(cls, code: int) -> "ExitStatus | None": # noqa: ANN102 + return ExitStatus(code) if code in ExitStatus else None + + +class Executer(Protocol): + def __call__( + self, cmd: list[str], *, env: dict[str, str] | None = None + ) -> CmdOut: ... + + +class Operation(enum.StrEnum): + DECRYPT = "decrypt" + EDIT = "edit" + ENCRYPT = "encrypt" + UPDATE_KEYS = "updatekeys" + + +def run( + call: Operation, + secret_path: Path, + public_keys: Iterable[tuple[str, KeyType]], + executer: Executer, +) -> tuple[int, str]: + """Call the sops binary for the given operation.""" + # louis(2024-11-19): I regrouped the call into the sops binary into this + # one place because calling into sops needs to be done with a carefully + # setup context, and I don't feel good about the idea of having that logic + # exist in multiple places. + sops_cmd = ["sops"] + environ = os.environ.copy() + with NamedTemporaryFile(delete=False, mode="w") as manifest: + if call == Operation.DECRYPT: + sops_cmd.append("decrypt") + else: + # When sops is used to edit a file the config is only used at + # file creation, otherwise the keys from the exising file are + # used. + sops_cmd.extend(["--config", manifest.name]) + + keys_by_type: dict[KeyType, list[str]] = {} + keys_by_type = {key_type: [] for key_type in KeyType} + for key, key_type in public_keys: + keys_by_type[key_type].append(key) + it = keys_by_type.items() + key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] + rules = {"creation_rules": [{"key_groups": key_groups}]} + json.dump(rules, manifest, indent=2) + manifest.flush() + + if call == Operation.ENCRYPT: + # Remove SOPS env vars used to specify public keys to force + # sops to use our config file [1]; so that the file gets + # encrypted with our keys and not something leaking out of + # the environment. + # + # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 + for var in os.environ: + if var.startswith("SOPS_") and var not in { # allowed: + "SOPS_GPG_EXEC", + "SOPS_AGE_KEY", + "SOPS_AGE_KEY_FILE", + }: + del environ[var] + sops_cmd.extend(["encrypt", "--in-place"]) + elif call == Operation.UPDATE_KEYS: + sops_cmd.extend(["updatekeys", "--yes"]) + elif call != Operation.EDIT: + known_operations = ",".join(Operation.__members__.values()) + msg = ( + f"Unsupported sops operation {call.value} " + f"(known operations: {known_operations})" + ) + raise ClanError(msg) + sops_cmd.append(str(secret_path)) + + cmd = nix_shell(["nixpkgs#sops"], sops_cmd) + p = executer(cmd, env=environ) + return p.returncode, p.stdout + def get_public_age_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) @@ -60,7 +254,7 @@ def get_public_age_key(privkey: str) -> str: def generate_private_key(out_file: Path | None = None) -> tuple[str, str]: cmd = nix_shell(["nixpkgs#age"], ["age-keygen"]) try: - proc = run(cmd) + proc = clan_cli.cmd.run(cmd) res = proc.stdout.strip() pubkey = None private_key = None @@ -122,7 +316,7 @@ def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: raise ClanError(msg) -def default_admin_key_path() -> Path: +def default_admin_private_key_path() -> Path: raw_path = os.environ.get("SOPS_AGE_KEY_FILE") if raw_path: return Path(raw_path) @@ -131,37 +325,26 @@ def default_admin_key_path() -> Path: @API.register def maybe_get_admin_public_key() -> None | SopsKey: - age_key = os.environ.get("SOPS_AGE_KEY") - pgp_key = os.environ.get("SOPS_PGP_FP") - if age_key and pgp_key: - msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." - raise ClanError(msg) - if age_key: - return SopsKey( - pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username="" - ) - if pgp_key: - return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="") - - path = default_admin_key_path() - if path.exists(): - return SopsKey( - pubkey=get_public_age_key(path.read_text()), - key_type=KeyType.AGE, - username="", - ) - - return None - - -def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - key = maybe_get_admin_public_key() - if not key: + keyring = SopsKey.collect_public_keys() + if len(keyring) == 0: return None - return maybe_get_user_or_machine(flake_dir, key) + + if len(keyring) > 1: + last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]] + msg = ( + f"Found more than {len(keyring)} public keys in your " + f"environment/system and cannot decide which one to " + f"use, first {len(last_3)}:\n\n" + f"- {'\n- '.join(last_3)}\n\n" + f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or " + f"SOPS_PGP_FP appropriately" + ) + raise ClanError(msg) + + return keyring[0] -def ensure_admin_key(flake_dir: Path) -> SopsKey: +def ensure_admin_public_key(flake_dir: Path) -> SopsKey: key = maybe_get_admin_public_key() if key: return ensure_user_or_machine(flake_dir, key) @@ -169,100 +352,104 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey: raise ClanError(msg) -@contextmanager -def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]: - all_keys: dict[str, list[str]] = { - key_type.lower(): [] for key_type in KeyType.__members__ - } - for key, key_type in keys: - all_keys[key_type.name.lower()].append(key) - with NamedTemporaryFile(delete=False, mode="w") as manifest: - json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2) - manifest.flush() - yield Path(manifest.name) - - def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]: - keys_sorted = sorted(keys) - with sops_manifest(keys_sorted) as manifest: - secret_path = secret_path / "secret" - time_before = secret_path.stat().st_mtime - cmd = nix_shell( - ["nixpkgs#sops"], - [ - "sops", - "--config", - str(manifest), - "updatekeys", - "--yes", - str(secret_path), - ], - ) - run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}") - if time_before == secret_path.stat().st_mtime: - return [] - return [secret_path] + secret_path = secret_path / "secret" + error_msg = f"Could not update keys for {secret_path}" + executer = functools.partial( + clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg + ) + rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer) + was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED + return [secret_path] if was_modified else [] def encrypt_file( secret_path: Path, - content: IO[str] | str | bytes | None, + content: str | IO[bytes] | bytes | None, pubkeys: list[tuple[str, KeyType]], ) -> None: folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) - with sops_manifest(pubkeys) as manifest: - if not content: - args = ["sops", "--config", str(manifest)] - args.extend([str(secret_path)]) - cmd = nix_shell(["nixpkgs#sops"], args) - # Don't use our `run` here, because it breaks editor integration. - # We never need this in our UI. - p = subprocess.run(cmd, check=False) - # returns 200 if the file is changed - if p.returncode != 0 and p.returncode != 200: - msg = ( - f"Failed to encrypt {secret_path}: sops exited with {p.returncode}" - ) - raise ClanError(msg) - return + if not content: + # Don't use our `run` here, because it breaks editor integration. + # We never need this in our UI. + def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut: + return CmdOut( + stdout="", + stderr="", + cwd=Path.cwd(), + env=env, + command_list=cmd, + returncode=subprocess.run(cmd, env=env, check=False).returncode, + msg=None, + ) - # hopefully /tmp is written to an in-memory file to avoid leaking secrets - with NamedTemporaryFile(delete=False) as f: - try: - if isinstance(content, str): - Path(f.name).write_text(content) - elif isinstance(content, bytes): - Path(f.name).write_bytes(content) - elif isinstance(content, io.IOBase): - with Path(f.name).open("w") as fd: - shutil.copyfileobj(content, fd) - else: - msg = f"Invalid content type: {type(content)}" - raise ClanError(msg) - # we pass an empty manifest to pick up existing configuration of the user - args = ["sops", "--config", str(manifest)] - args.extend(["-i", "--encrypt", str(f.name)]) - cmd = nix_shell(["nixpkgs#sops"], args) - run(cmd, log=Log.BOTH) - # atomic copy of the encrypted file - with NamedTemporaryFile(dir=folder, delete=False) as f2: - shutil.copyfile(f.name, f2.name) - Path(f2.name).rename(secret_path) - finally: - with suppress(OSError): - Path(f.name).unlink() + rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer) + status = ExitStatus.parse(rc) + if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED: + return + msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}" + raise ClanError(msg) + + # lopter(2024-11-19): imo NamedTemporaryFile does RAII wrong since it + # creates the file in __init__, when really it should be created in + # __enter__ (that is in Python __enter__ is actually __init__ from a RAII + # perspective, and __init__ should just be thought off as syntax sugar to + # capture extra context), and now the linter is unhappy so hush it. Note + # that if NamedTemporaryFile created the file in __enter__ then we'd have + # to change exception handling: + try: + source = NamedTemporaryFile(dir="/dev/shm", delete=False) # noqa: SIM115 + except (FileNotFoundError, PermissionError): + source = NamedTemporaryFile(delete=False) # noqa: SIM115 + try: # swap the secret: + with source: + if isinstance(content, str): + source.file.write(content.encode()) + elif isinstance(content, bytes): + source.file.write(content) + elif isinstance(content, io.BufferedReader): + # lopter@(2024-11-19): mypy is freaking out on the 1st + # argument, idk why, it says: + # + # > Cannot infer type argument 1 of "copyfileobj" + shutil.copyfileobj(content, source.file) # type: ignore[misc] + else: + msg = f"Invalid content type: {type(content)}" + raise ClanError(msg) + executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH) + run(Operation.ENCRYPT, Path(source.name), pubkeys, executer) + # atomic copy of the encrypted file + with NamedTemporaryFile(dir=folder, delete=False) as dest: + shutil.copyfile(source.name, dest.name) + Path(dest.name).rename(secret_path) + finally: + with suppress(OSError): + Path(source.name).unlink() def decrypt_file(secret_path: Path) -> str: - with sops_manifest([]) as manifest: - cmd = nix_shell( - ["nixpkgs#sops"], - ["sops", "--config", str(manifest), "--decrypt", str(secret_path)], + # decryption uses private keys from the environment or default paths: + no_public_keys_needed: list[tuple[str, KeyType]] = [] + executer = functools.partial( + clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}" + ) + _, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer) + return stdout + + +def get_recipients(secret_path: Path) -> set[SopsKey]: + sops_attrs = json.loads((secret_path / "secret").read_text())["sops"] + return { + SopsKey( + pubkey=recipient[key_type.sops_recipient_attr], + username="", + key_type=key_type, ) - res = run(cmd, error_msg=f"Could not decrypt {secret_path}") - return res.stdout + for key_type in KeyType + for recipient in sops_attrs[key_type.name.lower()] or [] + } def get_meta(secret_path: Path) -> dict: diff --git a/pkgs/clan-cli/clan_cli/secrets/types.py b/pkgs/clan-cli/clan_cli/secrets/types.py index 0a2121cab..52d819ea6 100644 --- a/pkgs/clan-cli/clan_cli/secrets/types.py +++ b/pkgs/clan-cli/clan_cli/secrets/types.py @@ -21,14 +21,15 @@ def secret_name_type(arg_value: str) -> str: def public_or_private_age_key_type(arg_value: str) -> str: if Path(arg_value).is_file(): arg_value = Path(arg_value).read_text().strip() - if arg_value.startswith("age1"): - return arg_value.strip() - if arg_value.startswith("AGE-SECRET-KEY-"): - return get_public_age_key(arg_value) - if not arg_value.startswith("age1"): - msg = f"Please provide an age key starting with age1, got: '{arg_value}'" - raise ClanError(msg) - return arg_value + for line in arg_value.splitlines(): + if line.startswith("#"): + continue + if line.startswith("age1"): + return line.strip() + if line.startswith("AGE-SECRET-KEY-"): + return get_public_age_key(line) + msg = f"Please provide an age key starting with age1 or AGE-SECRET-KEY-, got: '{arg_value}'" + raise ClanError(msg) def group_or_user_name_type(what: str) -> Callable[[str], str]: diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index 5b02d55ac..3b60f471a 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -1,10 +1,10 @@ -import json from dataclasses import dataclass from pathlib import Path from typing import override from clan_cli.errors import ClanError from clan_cli.machines.machines import Machine +from clan_cli.secrets import sops from clan_cli.secrets.folders import ( sops_machines_folder, sops_secrets_folder, @@ -17,8 +17,8 @@ from clan_cli.secrets.secrets import ( encrypt_secret, has_secret, ) -from clan_cli.secrets.sops import KeyType, generate_private_key -from clan_cli.vars.generate import Generator, Var +from clan_cli.vars.generate import Generator +from clan_cli.vars.var import Var from . import SecretStoreBase @@ -52,7 +52,7 @@ class SecretStore(SecretStoreBase): if has_machine(self.machine.flake_dir, self.machine.name): return - priv_key, pub_key = generate_private_key() + priv_key, pub_key = sops.generate_private_key() encrypt_secret( self.machine.flake_dir, sops_secrets_folder(self.machine.flake_dir) @@ -69,24 +69,18 @@ class SecretStore(SecretStoreBase): def user_has_access( self, user: str, generator: Generator, secret_name: str ) -> bool: - secret_path = self.secret_path(generator, secret_name) - secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] - users_folder_path = sops_users_folder(self.machine.flake_dir) - user_pubkey = json.loads((users_folder_path / user / "key.json").read_text())[ - "publickey" - ] - return user_pubkey in recipients + key_dir = sops_users_folder(self.machine.flake_dir) / user + return self.key_has_access(key_dir, generator, secret_name) def machine_has_access(self, generator: Generator, secret_name: str) -> bool: + key_dir = sops_machines_folder(self.machine.flake_dir) / self.machine.name + return self.key_has_access(key_dir, generator, secret_name) + + def key_has_access( + self, key_dir: Path, generator: Generator, secret_name: str + ) -> bool: secret_path = self.secret_path(generator, secret_name) - secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] - machines_folder_path = sops_machines_folder(self.machine.flake_dir) - machine_pubkey = json.loads( - (machines_folder_path / self.machine.name / "key.json").read_text() - )["publickey"] - return machine_pubkey in recipients + return sops.SopsKey.load_dir(key_dir) in sops.get_recipients(secret_path) def secret_path(self, generator: Generator, secret_name: str) -> Path: return self.directory(generator, secret_name) @@ -141,7 +135,7 @@ class SecretStore(SecretStoreBase): secret_folder = self.secret_path(generator, name) add_secret(self.machine.flake_dir, self.machine.name, secret_folder) - def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]: + def collect_keys_for_secret(self, path: Path) -> set[sops.SopsKey]: from clan_cli.secrets.secrets import ( collect_keys_for_path, collect_keys_for_type, @@ -159,23 +153,24 @@ class SecretStore(SecretStoreBase): self.machine.flake_dir / "sops" / "groups" / group / "users" ) ) - return keys - @override + return { + sops.SopsKey(pubkey=key, username="", key_type=key_type) + for (key, key_type) in keys + } + + # } def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]: secret_path = self.secret_path(generator, name) - recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"] - current_recipients = {r["recipient"] for r in recipients_} - wanted_recipients = { - key[0] for key in self.collect_keys_for_secret(secret_path) - } + current_recipients = sops.get_recipients(secret_path) + wanted_recipients = self.collect_keys_for_secret(secret_path) needs_update = current_recipients != wanted_recipients recipients_to_add = wanted_recipients - current_recipients var_id = f"{generator.name}/{name}" msg = ( f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. " f"This could have been a malicious actor trying to add their keys, please investigate. " - f"Added keys: {', '.join(recipients_to_add)}" + f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}" ) return needs_update, msg