From 1e0db828271374dbe43ff4bb7587b218ad91782b Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 19 Nov 2024 16:34:50 -0800 Subject: [PATCH] clan-cli: secrets: address CR feedback for sops encryption and key handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move public keys collection to a class method on `SopsKey`, and implement collection for each key type in `KeyType`, this helps make the code more generic ; - Replace `Operation.__call__` by `run` (`sops.run` if you import the entire module), that allows us to dedent the code so that's cool ; - Fix exception handling when trying to get a in-memory temporary file ; - Make Executor cuter 😵🪦. --- pkgs/clan-cli/clan_cli/secrets/secrets.py | 12 +- pkgs/clan-cli/clan_cli/secrets/sops.py | 319 ++++++++++++---------- 2 files changed, 180 insertions(+), 151 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index e665a4c8e..14e41b025 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -94,7 +94,7 @@ def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: def encrypt_secret( flake_dir: Path, secret_path: Path, - value: IO[str] | str | bytes | None, + value: IO[bytes] | str | bytes | None, add_users: list[str] | None = None, add_machines: list[str] | None = None, add_groups: list[str] | None = None, @@ -305,10 +305,10 @@ def list_command(args: argparse.Namespace) -> None: def decrypt_secret(flake_dir: Path, secret_path: Path) -> str: - # I can't think of a good way to ensure that we have the private key for - # the secret. I mean we could collect all private keys we could find and - # then make sure we have the one for the secret, but that seems - # complicated for little ux gain? + # lopter(2024-10): I can't think of a good way to ensure that we have the + # private key for the secret. I mean we could collect all private keys we + # could find and then make sure we have the one for the secret, but that + # seems complicated for little ux gain? path = secret_path / "secret" if not path.exists(): msg = f"Secret '{secret_path!s}' does not exist" @@ -332,7 +332,7 @@ def is_tty_interactive() -> bool: def set_command(args: argparse.Namespace) -> None: env_value = os.environ.get("SOPS_NIX_SECRET") - secret_value: str | IO[str] | None = sys.stdin + secret_value: str | IO[bytes] | None = sys.stdin.buffer if args.edit: secret_value = None elif env_value: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 5d34c9941..664232313 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -11,10 +11,10 @@ from collections.abc import Iterable, Sequence from contextlib import suppress from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO, Any, Protocol +from typing import IO, Protocol +import clan_cli.cmd from clan_cli.api import API -from clan_cli.cmd import Log, run from clan_cli.dirs import user_config_dir from clan_cli.errors import ClanError, CmdOut from clan_cli.nix import nix_shell @@ -47,6 +47,56 @@ class KeyType(enum.Enum): ) raise ClanError(msg) + def collect_public_keys(self) -> Sequence[str]: + keyring: Sequence[str] = [] + + if self == self.AGE: + if keys := os.environ.get("SOPS_AGE_KEY"): + # SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and + # reads identities line by line. See age/keysource.go in + # Sops, and age/parse.go in Age. + for private_key in keys.strip().splitlines(): + public_key = get_public_age_key(private_key) + log.info( + f"Found age public key from a private key " + f"in the environment (SOPS_AGE_KEY): {public_key}" + ) + keyring.append(public_key) + + def maybe_read_from_path(key_path: Path) -> None: + try: + # as in parse.go in age: + lines = Path(key_path).read_text().strip().splitlines() + for private_key in filter(lambda ln: not ln.startswith("#"), lines): + public_key = get_public_age_key(private_key) + log.info( + f"Found age public key from a private key " + f"in {key_path}: {public_key}" + ) + keyring.append(public_key) + except FileNotFoundError: + return + except Exception as ex: + log.warn(f"Could not read age keys from {key_path}: {ex}") + + # Sops will try every location, see age/keysource.go + if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): + maybe_read_from_path(Path(key_path)) + maybe_read_from_path(user_config_dir() / "sops/age/keys.txt") + + return keyring + + if self == self.PGP: + if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"): + for fp in pgp_fingerprints.strip().split(","): + msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}" + log.info(msg) + keyring.append(fp) + return keyring + + msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys" + raise ClanError(msg) + @dataclasses.dataclass(frozen=True) class SopsKey: @@ -64,12 +114,20 @@ class SopsKey: } @classmethod - def load_dir(cls, dir: Path) -> "SopsKey": # noqa: ANN102 + def load_dir(cls, folder: Path) -> "SopsKey": # noqa: ANN102 """Load from the file named `keys.json` in the given directory.""" - pubkey, key_type = read_key(dir) + pubkey, key_type = read_key(folder) username = "" return cls(pubkey, username, key_type) + @classmethod + def collect_public_keys(cls) -> Sequence["SopsKey"]: # noqa: ANN102 + return [ + cls(pubkey=key, username="", key_type=key_type) + for key_type in KeyType + for key in key_type.collect_public_keys() + ] + class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go ERROR_GENERIC = 1 @@ -105,75 +163,81 @@ class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go return ExitStatus(code) if code in ExitStatus else None -class Executor(Protocol): +class Executer(Protocol): def __call__( self, cmd: list[str], *, env: dict[str, str] | None = None ) -> CmdOut: ... class Operation(enum.StrEnum): - decrypt = "decrypt" - edit = "edit" - encrypt = "encrypt" - update_keys = "updatekeys" + DECRYPT = "decrypt" + EDIT = "edit" + ENCRYPT = "encrypt" + UPDATE_KEYS = "updatekeys" - def __call__( - self, - secret_path: Path, - public_keys: Iterable[tuple[str, KeyType]], - executor: Executor, - ) -> tuple[int, str]: - sops_cmd = ["sops"] - environ = os.environ.copy() - with NamedTemporaryFile(delete=False, mode="w") as manifest: - if self == Operation.decrypt: - sops_cmd.append("decrypt") - else: - # When sops is used to edit a file the config is only used at - # file creation, otherwise the keys from the exising file are - # used. - sops_cmd.extend(["--config", manifest.name]) - keys_by_type: dict[KeyType, list[str]] = {} - keys_by_type = {key_type: [] for key_type in KeyType} - for key, key_type in public_keys: - keys_by_type[key_type].append(key) - it = keys_by_type.items() - key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] - rules = {"creation_rules": [{"key_groups": key_groups}]} - json.dump(rules, manifest, indent=2) - manifest.flush() +def run( + call: Operation, + secret_path: Path, + public_keys: Iterable[tuple[str, KeyType]], + executer: Executer, +) -> tuple[int, str]: + """Call the sops binary for the given operation.""" + # louis(2024-11-19): I regrouped the call into the sops binary into this + # one place because calling into sops needs to be done with a carefully + # setup context, and I don't feel good about the idea of having that logic + # exist in multiple places. + sops_cmd = ["sops"] + environ = os.environ.copy() + with NamedTemporaryFile(delete=False, mode="w") as manifest: + if call == Operation.DECRYPT: + sops_cmd.append("decrypt") + else: + # When sops is used to edit a file the config is only used at + # file creation, otherwise the keys from the exising file are + # used. + sops_cmd.extend(["--config", manifest.name]) - if self == Operation.encrypt: - # Remove SOPS env vars used to specify public keys to force - # sops to use our config file [1]; so that the file gets - # encrypted with our keys and not something leaking out of - # the environment. - # - # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 - for var in os.environ: - if var.startswith("SOPS_") and var not in { # allowed: - "SOPS_GPG_EXEC", - "SOPS_AGE_KEY", - "SOPS_AGE_KEY_FILE", - "SOPS_NIX_SECRET", - }: - del environ[var] - sops_cmd.extend(["encrypt", "--in-place"]) - elif self == Operation.update_keys: - sops_cmd.extend(["updatekeys", "--yes"]) - elif self != Operation.edit: - known_operations = ",".join(Operation.__members__.values()) - msg = ( - f"Unsupported sops operation {self.value} " - f"(known operations: {known_operations})" - ) - raise ClanError(msg) - sops_cmd.append(str(secret_path)) + keys_by_type: dict[KeyType, list[str]] = {} + keys_by_type = {key_type: [] for key_type in KeyType} + for key, key_type in public_keys: + keys_by_type[key_type].append(key) + it = keys_by_type.items() + key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] + rules = {"creation_rules": [{"key_groups": key_groups}]} + json.dump(rules, manifest, indent=2) + manifest.flush() - cmd = nix_shell(["nixpkgs#sops"], sops_cmd) - p = executor(cmd, env=environ) - return p.returncode, p.stdout + if call == Operation.ENCRYPT: + # Remove SOPS env vars used to specify public keys to force + # sops to use our config file [1]; so that the file gets + # encrypted with our keys and not something leaking out of + # the environment. + # + # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 + for var in os.environ: + if var.startswith("SOPS_") and var not in { # allowed: + "SOPS_GPG_EXEC", + "SOPS_AGE_KEY", + "SOPS_AGE_KEY_FILE", + "SOPS_NIX_SECRET", + }: + del environ[var] + sops_cmd.extend(["encrypt", "--in-place"]) + elif call == Operation.UPDATE_KEYS: + sops_cmd.extend(["updatekeys", "--yes"]) + elif call != Operation.EDIT: + known_operations = ",".join(Operation.__members__.values()) + msg = ( + f"Unsupported sops operation {call.value} " + f"(known operations: {known_operations})" + ) + raise ClanError(msg) + sops_cmd.append(str(secret_path)) + + cmd = nix_shell(["nixpkgs#sops"], sops_cmd) + p = executer(cmd, env=environ) + return p.returncode, p.stdout def get_public_age_key(privkey: str) -> str: @@ -191,7 +255,7 @@ def get_public_age_key(privkey: str) -> str: def generate_private_key(out_file: Path | None = None) -> tuple[str, str]: cmd = nix_shell(["nixpkgs#age"], ["age-keygen"]) try: - proc = run(cmd) + proc = clan_cli.cmd.run(cmd) res = proc.stdout.strip() pubkey = None private_key = None @@ -262,21 +326,17 @@ def default_admin_private_key_path() -> Path: @API.register def maybe_get_admin_public_key() -> None | SopsKey: - keyring = collect_public_keys() + keyring = SopsKey.collect_public_keys() if len(keyring) == 0: return None if len(keyring) > 1: - # louis@(2024-10-22): - # - # This is confusing when it shows up and you have no information - # about where each key is going from, could we log the discovery - # of each key? + last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]] msg = ( f"Found more than {len(keyring)} public keys in your " f"environment/system and cannot decide which one to " - f"use, first three:\n\n" - f"- {'\n- '.join(str(key.as_dict()) for key in keyring[:3])}\n\n" + f"use, first {len(last_3)}:\n\n" + f"- {'\n- '.join(last_3)}\n\n" f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or " f"SOPS_PGP_FP appropriately" ) @@ -285,48 +345,6 @@ def maybe_get_admin_public_key() -> None | SopsKey: return keyring[0] -def collect_public_keys() -> Sequence[SopsKey]: - username = "" - keyring: list[SopsKey] = [] - - for private_key in collect_private_age_keys(): - public_key = get_public_age_key(private_key) - keyring.append(SopsKey(public_key, username, KeyType.AGE)) - - if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"): - for fp in pgp_fingerprints.strip().split(","): - keyring.append(SopsKey(fp, username, KeyType.PGP)) - - return keyring - - -def collect_private_age_keys() -> Sequence[str]: - private_age_keys: list[str] = [] - - if keys := os.environ.get("SOPS_AGE_KEY"): - # SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and reads - # identities line by line. See age/keysource.go in Sops, and - # age/parse.go in Age. - private_age_keys.extend(keys.strip().splitlines()) - - def maybe_read_from_path(key_path: Path) -> None: - try: - contents = Path(key_path).read_text().strip() - lines = contents.splitlines() # as in parse.go in age: - private_age_keys.extend(ln for ln in lines if ln and not ln.startswith("#")) - except FileNotFoundError: - return - except Exception as ex: - log.warn(f"Could not read age keys from {key_path}: {ex}") - - # Sops will try every location, see age/keysource.go - if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): - maybe_read_from_path(Path(key_path)) - maybe_read_from_path(user_config_dir() / "sops/age/keys.txt") - - return private_age_keys - - def ensure_admin_public_key(flake_dir: Path) -> SopsKey: key = maybe_get_admin_public_key() if key: @@ -338,15 +356,17 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey: def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]: secret_path = secret_path / "secret" error_msg = f"Could not update keys for {secret_path}" - executor = functools.partial(run, log=Log.BOTH, error_msg=error_msg) - rc, _ = Operation.update_keys(secret_path, keys, executor) + executer = functools.partial( + clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg + ) + rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer) was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED return [secret_path] if was_modified else [] def encrypt_file( secret_path: Path, - content: IO[str] | str | bytes | None, + content: str | IO[bytes] | bytes | None, pubkeys: list[tuple[str, KeyType]], ) -> None: folder = secret_path.parent @@ -355,7 +375,7 @@ def encrypt_file( if not content: # Don't use our `run` here, because it breaks editor integration. # We never need this in our UI. - def executor(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut: + def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut: return CmdOut( stdout="", stderr="", @@ -366,48 +386,57 @@ def encrypt_file( msg=None, ) - rc, _ = Operation.edit(secret_path, pubkeys, executor) + rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer) status = ExitStatus.parse(rc) if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED: return msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}" raise ClanError(msg) - def swap_secret(f: Any) -> None: # f's type is not exposed by tempfile - try: + # lopter(2024-11-19): imo NamedTemporaryFile does RAII wrong since it + # creates the file in __init__, when really it should be created in + # __enter__ (that is in Python __enter__ is actually __init__ from a RAII + # perspective, and __init__ should just be thought off as syntax sugar to + # capture extra context), and now the linter is unhappy so hush it. Note + # that if NamedTemporaryFile created the file in __enter__ then we'd have + # to change exception handling: + try: + source = NamedTemporaryFile(dir="/dev/shm", delete=False) # noqa: SIM115 + except (FileNotFoundError, PermissionError): + source = NamedTemporaryFile(delete=False) # noqa: SIM115 + try: # swap the secret: + with source: if isinstance(content, str): - Path(f.name).write_text(content) + source.file.write(content.encode()) elif isinstance(content, bytes): - Path(f.name).write_bytes(content) - elif isinstance(content, io.IOBase): - with Path(f.name).open("w") as fd: - shutil.copyfileobj(content, fd) + source.file.write(content) + elif isinstance(content, io.BufferedReader): + # lopter@(2024-11-19): mypy is freaking out on the 1st + # argument, idk why, it says: + # + # > Cannot infer type argument 1 of "copyfileobj" + shutil.copyfileobj(content, source.file) # type: ignore[misc] else: msg = f"Invalid content type: {type(content)}" raise ClanError(msg) - Operation.encrypt(f.name, pubkeys, functools.partial(run, log=Log.BOTH)) - # atomic copy of the encrypted file - with NamedTemporaryFile(dir=folder, delete=False) as f2: - shutil.copyfile(f.name, f2.name) - Path(f2.name).rename(secret_path) - finally: - with suppress(OSError): - Path(f.name).unlink() - - try: - with NamedTemporaryFile(dir="/dev/shm", delete=False) as f: - swap_secret(f) - except (FileNotFoundError, PermissionError): - # hopefully /tmp is written to an in-memory file to avoid leaking secrets - with NamedTemporaryFile(delete=False) as f: - swap_secret(f) + executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH) + run(Operation.ENCRYPT, Path(source.name), pubkeys, executer) + # atomic copy of the encrypted file + with NamedTemporaryFile(dir=folder, delete=False) as dest: + shutil.copyfile(source.name, dest.name) + Path(dest.name).rename(secret_path) + finally: + with suppress(OSError): + Path(source.name).unlink() def decrypt_file(secret_path: Path) -> str: # decryption uses private keys from the environment or default paths: no_public_keys_needed: list[tuple[str, KeyType]] = [] - executor = functools.partial(run, error_msg=f"Could not decrypt {secret_path}") - _, stdout = Operation.decrypt(secret_path, no_public_keys_needed, executor) + executer = functools.partial( + clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}" + ) + _, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer) return stdout @@ -420,7 +449,7 @@ def get_recipients(secret_path: Path) -> set[SopsKey]: key_type=key_type, ) for key_type in KeyType - for recipient in sops_attrs[key_type.name.lower()] + for recipient in sops_attrs[key_type.name.lower()] or [] }