From 775be88a9c41bb82ed3268faae0ffa3463f7489c Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Mon, 14 Oct 2024 17:06:52 -0700 Subject: [PATCH 1/6] clan-cli: filter any sops recipients set in the environment for encryption This forces sops to use our config file, otherwise if any of the environment variables set to specify recipients is present then `--config` will be ignored (see [env_check]). That's simple enough, still I ended up refactoring how we call sops for correctness, and to align with its behavior. The code now distinguishes between public and private keys explicitly. `secrets.decrypt_secret` does not try to lookup for public and private keys anymore. With this changeset, some people might have to adjust their environment as public age and PGP keys will be discovered like sops would do. In particular if multiple public keys are discovered, then the user will have to specify which one to use for the clan. This also makes the following changes: - try to use `/dev/shm` when swapping a secret (it's what [pass] does fwiw); - alias immediate values for readability; - remove some float comparison that could never succeed, and use sops' exit status instead; - remove unused function `maybe_get_sops_key`. [env_check]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 [pass]: http://passwordstore.org/ --- pkgs/clan-cli/clan_cli/machines/update.py | 4 +- pkgs/clan-cli/clan_cli/secrets/key.py | 9 +- pkgs/clan-cli/clan_cli/secrets/secrets.py | 26 +- pkgs/clan-cli/clan_cli/secrets/sops.py | 339 +++++++++++++++------- 4 files changed, 255 insertions(+), 123 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/machines/update.py b/pkgs/clan-cli/clan_cli/machines/update.py index 5d62adeaa..7f6b99b8c 100644 --- a/pkgs/clan-cli/clan_cli/machines/update.py +++ b/pkgs/clan-cli/clan_cli/machines/update.py @@ -114,8 +114,8 @@ def deploy_machine(machines: MachineGroup) -> None: def deploy(machine: Machine) -> None: host = machine.build_host - generate_facts([machine]) - generate_vars([machine]) + generate_facts([machine], service=None, regenerate=False) + generate_vars([machine], generator_name=None, regenerate=False) upload_secrets(machine) upload_secret_vars(machine) diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index ea5cbb3e8..a4070a9de 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -9,7 +9,7 @@ from clan_cli.git import commit_files from . import sops from .secrets import update_secrets from .sops import ( - default_admin_key_path, + default_admin_private_key_path, generate_private_key, maybe_get_admin_public_key, ) @@ -23,7 +23,7 @@ def generate_key() -> sops.SopsKey: print(f"{key.key_type.name} key {key.pubkey} is already set") return key - path = default_admin_key_path() + path = default_admin_private_key_path() _, pub_key = generate_private_key(out_file=path) print( f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets." @@ -62,8 +62,9 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None: parser_generate = subparser.add_parser( "generate", description=( - "Generate an age key for the Clan, " - "to use PGP set `SOPS_PGP_FP` in your environment." + "Generate an age key for the Clan, if you already have an age " + "or PGP key, then use it to create your user, see: " + "`clan secrets users add --help'" ), ) parser_generate.set_defaults(func=generate_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index fb4ec0cbb..e665a4c8e 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -28,7 +28,12 @@ from .folders import ( sops_secrets_folder, sops_users_folder, ) -from .sops import decrypt_file, encrypt_file, ensure_admin_key, read_key, update_keys +from .sops import ( + decrypt_file, + encrypt_file, + read_key, + update_keys, +) from .types import VALID_SECRET_NAME, secret_name_type log = logging.getLogger(__name__) @@ -101,9 +106,13 @@ def encrypt_secret( add_machines = [] if add_users is None: add_users = [] - key = ensure_admin_key(flake_dir) + key = sops.ensure_admin_public_key(flake_dir) recipient_keys = set() + # encrypt_secret can be called before the secret has been created + # so don't try to call sops.update_keys on a non-existent file: + do_update_keys = False + files_to_commit = [] for user in add_users: files_to_commit.extend( @@ -111,7 +120,7 @@ def encrypt_secret( users_folder(secret_path), sops_users_folder(flake_dir), user, - False, + do_update_keys, ) ) @@ -121,7 +130,7 @@ def encrypt_secret( machines_folder(secret_path), sops_machines_folder(flake_dir), machine, - False, + do_update_keys, ) ) @@ -131,7 +140,7 @@ def encrypt_secret( groups_folder(secret_path), sops_groups_folder(flake_dir), group, - False, + do_update_keys, ) ) @@ -144,7 +153,7 @@ def encrypt_secret( users_folder(secret_path), sops_users_folder(flake_dir), key.username, - False, + do_update_keys, ) ) @@ -296,7 +305,10 @@ def list_command(args: argparse.Namespace) -> None: def decrypt_secret(flake_dir: Path, secret_path: Path) -> str: - ensure_admin_key(flake_dir) + # I can't think of a good way to ensure that we have the private key for + # the secret. I mean we could collect all private keys we could find and + # then make sure we have the one for the secret, but that seems + # complicated for little ux gain? path = secret_path / "secret" if not path.exists(): msg = f"Secret '{secret_path!s}' does not exist" diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 2e81d3b4e..8db72e88d 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,24 +1,28 @@ import enum +import functools import io import json +import logging import os import shutil import subprocess -from collections.abc import Iterable, Iterator -from contextlib import contextmanager, suppress +from collections.abc import Iterable, Sequence +from contextlib import suppress from dataclasses import dataclass from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO +from typing import IO, Any, Protocol from clan_cli.api import API from clan_cli.cmd import Log, run from clan_cli.dirs import user_config_dir -from clan_cli.errors import ClanError +from clan_cli.errors import ClanError, CmdOut from clan_cli.nix import nix_shell from .folders import sops_machines_folder, sops_users_folder +log = logging.getLogger(__name__) + class KeyType(enum.Enum): AGE = enum.auto() @@ -45,6 +49,111 @@ class SopsKey: } +class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go + ERROR_GENERIC = 1 + COULD_NOT_READ_INPUT_FILE = 2 + COULD_NOT_WRITE_OUTPUT_FILE = 3 + ERROR_DUMPING_TREE = 4 + ERROR_READING_CONFIG = 5 + ERROR_INVALID_KMS_ENCRYPTION_CONTEXT_FORMAT = 6 + ERROR_INVALID_SET_FORMAT = 7 + ERROR_CONFLICTING_PARAMETERS = 8 + ERROR_ENCRYPTING_MAC = 21 + ERROR_ENCRYPTING_TREE = 23 + ERROR_DECRYPTING_MAC = 24 + ERROR_DECRYPTING_TREE = 25 + CANNOT_CHANGE_KEYS_FROM_NON_EXISTENT_FILE = 49 + MAC_MISMATCH = 51 + MAC_NOT_FOUND = 52 + CONFIG_FILE_NOT_FOUND = 61 + KEYBOARD_INTERRUPT = 85 + INVALID_TREE_PATH_FORMAT = 91 + NEED_AT_LEAST_ONE_DOCUMENT = 92 + NO_FILE_SPECIFIED = 100 + COULD_NOT_RETRIEVE_KEY = 128 + NO_ENCRYPTION_KEY_FOUND = 111 + DUPLICATE_DECRYPTION_KEY_TYPE = 112 + FILE_HAS_NOT_BEEN_MODIFIED = 200 + NO_EDITOR_FOUND = 201 + FAILED_TO_COMPARE_VERSIONS = 202 + FILE_ALREADY_ENCRYPTED = 203 + + @classmethod + def parse(cls, code: int) -> "ExitStatus | None": # noqa: ANN102 + return ExitStatus(code) if code in ExitStatus else None + + +class Executor(Protocol): + def __call__( + self, cmd: list[str], *, env: dict[str, str] | None = None + ) -> CmdOut: ... + + +class Operation(enum.StrEnum): + decrypt = "decrypt" + edit = "edit" + encrypt = "encrypt" + update_keys = "updatekeys" + + def __call__( + self, + secret_path: Path, + public_keys: Iterable[tuple[str, KeyType]], + executor: Executor, + ) -> tuple[int, str]: + sops_cmd = ["sops"] + environ = os.environ.copy() + with NamedTemporaryFile(delete=False, mode="w") as manifest: + if self == Operation.decrypt: + sops_cmd.append("decrypt") + else: + # When sops is used to edit a file the config is only used at + # file creation, otherwise the keys from the exising file are + # used. + sops_cmd.extend(["--config", manifest.name]) + + keys_by_type: dict[KeyType, list[str]] = {} + keys_by_type = {key_type: [] for key_type in KeyType} + for key, key_type in public_keys: + keys_by_type[key_type].append(key) + it = keys_by_type.items() + key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] + rules = {"creation_rules": [{"key_groups": key_groups}]} + json.dump(rules, manifest, indent=2) + manifest.flush() + + if self == Operation.encrypt: + # Remove SOPS env vars used to specify public keys to force + # sops to use our config file [1]; so that the file gets + # encrypted with our keys and not something leaking out of + # the environment. + # + # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 + for var in os.environ: + if var.startswith("SOPS_") and var not in { # allowed: + "SOPS_GPG_EXEC", + "SOPS_AGE_KEY", + "SOPS_AGE_KEY_FILE", + "SOPS_NIX_SECRET", + }: + del environ[var] + sops_cmd.extend(["encrypt", "--in-place"]) + elif self == Operation.update_keys: + sops_cmd.extend(["updatekeys", "--yes"]) + elif self != Operation.edit: + known_operations = ",".join(Operation.__members__.values()) + msg = ( + f"Unsupported sops operation {self.value} " + f"(known operations: {known_operations})" + ) + raise ClanError(msg) + sops_cmd.append(str(secret_path)) + + cmd = nix_shell(["nixpkgs#sops"], sops_cmd) + p = executor(cmd, env=environ) + return p.returncode, p.stdout + + def get_public_age_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) try: @@ -122,7 +231,7 @@ def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: raise ClanError(msg) -def default_admin_key_path() -> Path: +def default_admin_private_key_path() -> Path: raw_path = os.environ.get("SOPS_AGE_KEY_FILE") if raw_path: return Path(raw_path) @@ -131,37 +240,67 @@ def default_admin_key_path() -> Path: @API.register def maybe_get_admin_public_key() -> None | SopsKey: - age_key = os.environ.get("SOPS_AGE_KEY") - pgp_key = os.environ.get("SOPS_PGP_FP") - if age_key and pgp_key: - msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." - raise ClanError(msg) - if age_key: - return SopsKey( - pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username="" - ) - if pgp_key: - return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="") - - path = default_admin_key_path() - if path.exists(): - return SopsKey( - pubkey=get_public_age_key(path.read_text()), - key_type=KeyType.AGE, - username="", - ) - - return None - - -def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - key = maybe_get_admin_public_key() - if not key: + keyring = collect_public_keys() + if len(keyring) == 0: return None - return maybe_get_user_or_machine(flake_dir, key) + + if len(keyring) > 1: + msg = ( + f"Found more than {len(keyring)} public keys in your " + f"environment/system and cannot decide which one to " + f"use, first three:\n\n" + f"- {'\n- '.join(str(key.as_dict()) for key in keyring[:3])}\n\n" + f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or " + f"SOPS_PGP_FP appropriately" + ) + raise ClanError(msg) + + return keyring[0] -def ensure_admin_key(flake_dir: Path) -> SopsKey: +def collect_public_keys() -> Sequence[SopsKey]: + username = "" + keyring: list[SopsKey] = [] + + for private_key in collect_private_age_keys(): + public_key = get_public_age_key(private_key) + keyring.append(SopsKey(public_key, username, KeyType.AGE)) + + if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"): + for fp in pgp_fingerprints.strip().split(","): + keyring.append(SopsKey(fp, username, KeyType.PGP)) + + return keyring + + +def collect_private_age_keys() -> Sequence[str]: + private_age_keys: list[str] = [] + + if keys := os.environ.get("SOPS_AGE_KEY"): + # SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and reads + # identities line by line. See age/keysource.go in Sops, and + # age/parse.go in Age. + private_age_keys.extend(keys.strip().splitlines()) + + def maybe_read_from_path(key_path: Path) -> None: + try: + contents = Path(key_path).read_text().strip() + lines = contents.splitlines() # as in parse.go in age: + private_age_keys.extend(l for l in lines if l and not l.startswith("#")) + except FileNotFoundError: + return + except Exception as ex: + log.warn(f"Could not read age keys from {key_path}: {ex}") + + # Sops will try every location, see age/keysource.go + if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): + maybe_read_from_path(Path(key_path)) + maybe_read_from_path(user_config_dir() / "sops/age/keys.txt") + + return private_age_keys + + +def ensure_admin_public_key(flake_dir: Path) -> SopsKey: key = maybe_get_admin_public_key() if key: return ensure_user_or_machine(flake_dir, key) @@ -169,39 +308,13 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey: raise ClanError(msg) -@contextmanager -def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]: - all_keys: dict[str, list[str]] = { - key_type.lower(): [] for key_type in KeyType.__members__ - } - for key, key_type in keys: - all_keys[key_type.name.lower()].append(key) - with NamedTemporaryFile(delete=False, mode="w") as manifest: - json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2) - manifest.flush() - yield Path(manifest.name) - - def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]: - keys_sorted = sorted(keys) - with sops_manifest(keys_sorted) as manifest: - secret_path = secret_path / "secret" - time_before = secret_path.stat().st_mtime - cmd = nix_shell( - ["nixpkgs#sops"], - [ - "sops", - "--config", - str(manifest), - "updatekeys", - "--yes", - str(secret_path), - ], - ) - run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}") - if time_before == secret_path.stat().st_mtime: - return [] - return [secret_path] + secret_path = secret_path / "secret" + error_msg = f"Could not update keys for {secret_path}" + executor = functools.partial(run, log=Log.BOTH, error_msg=error_msg) + rc, _ = Operation.update_keys(secret_path, keys, executor) + was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED + return [secret_path] if was_modified else [] def encrypt_file( @@ -212,57 +325,63 @@ def encrypt_file( folder = secret_path.parent folder.mkdir(parents=True, exist_ok=True) - with sops_manifest(pubkeys) as manifest: - if not content: - args = ["sops", "--config", str(manifest)] - args.extend([str(secret_path)]) - cmd = nix_shell(["nixpkgs#sops"], args) - # Don't use our `run` here, because it breaks editor integration. - # We never need this in our UI. - p = subprocess.run(cmd, check=False) - # returns 200 if the file is changed - if p.returncode != 0 and p.returncode != 200: - msg = ( - f"Failed to encrypt {secret_path}: sops exited with {p.returncode}" - ) - raise ClanError(msg) - return + if not content: + # Don't use our `run` here, because it breaks editor integration. + # We never need this in our UI. + def executor(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut: + return CmdOut( + stdout="", + stderr="", + cwd=Path.cwd(), + env=env, + command_list=cmd, + returncode=subprocess.run(cmd, env=env, check=False).returncode, + msg=None, + ) + rc, _ = Operation.edit(secret_path, pubkeys, executor) + status = ExitStatus.parse(rc) + if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED: + return + msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}" + raise ClanError(msg) + + def swap_secret(f: Any) -> None: # f's type is not exposed by tempfile + try: + if isinstance(content, str): + Path(f.name).write_text(content) + elif isinstance(content, bytes): + Path(f.name).write_bytes(content) + elif isinstance(content, io.IOBase): + with Path(f.name).open("w") as fd: + shutil.copyfileobj(content, fd) + else: + msg = f"Invalid content type: {type(content)}" + raise ClanError(msg) + Operation.encrypt(f.name, pubkeys, functools.partial(run, log=Log.BOTH)) + # atomic copy of the encrypted file + with NamedTemporaryFile(dir=folder, delete=False) as f2: + shutil.copyfile(f.name, f2.name) + Path(f2.name).rename(secret_path) + finally: + with suppress(OSError): + Path(f.name).unlink() + + try: + with NamedTemporaryFile(dir="/dev/shm", delete=False) as f: + swap_secret(f) + except (FileNotFoundError, PermissionError): # hopefully /tmp is written to an in-memory file to avoid leaking secrets with NamedTemporaryFile(delete=False) as f: - try: - if isinstance(content, str): - Path(f.name).write_text(content) - elif isinstance(content, bytes): - Path(f.name).write_bytes(content) - elif isinstance(content, io.IOBase): - with Path(f.name).open("w") as fd: - shutil.copyfileobj(content, fd) - else: - msg = f"Invalid content type: {type(content)}" - raise ClanError(msg) - # we pass an empty manifest to pick up existing configuration of the user - args = ["sops", "--config", str(manifest)] - args.extend(["-i", "--encrypt", str(f.name)]) - cmd = nix_shell(["nixpkgs#sops"], args) - run(cmd, log=Log.BOTH) - # atomic copy of the encrypted file - with NamedTemporaryFile(dir=folder, delete=False) as f2: - shutil.copyfile(f.name, f2.name) - Path(f2.name).rename(secret_path) - finally: - with suppress(OSError): - Path(f.name).unlink() + swap_secret(f) def decrypt_file(secret_path: Path) -> str: - with sops_manifest([]) as manifest: - cmd = nix_shell( - ["nixpkgs#sops"], - ["sops", "--config", str(manifest), "--decrypt", str(secret_path)], - ) - res = run(cmd, error_msg=f"Could not decrypt {secret_path}") - return res.stdout + # decryption uses private keys from the environment or default paths: + no_public_keys_needed: list[tuple[str, KeyType]] = [] + executor = functools.partial(run, error_msg=f"Could not decrypt {secret_path}") + _, stdout = Operation.decrypt(secret_path, no_public_keys_needed, executor) + return stdout def get_meta(secret_path: Path) -> dict: From f4e621af8845a5a3aeecd6d9bdb35bcae6004b5d Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Wed, 23 Oct 2024 18:42:18 -0700 Subject: [PATCH 2/6] clan-cli: secrets.sops: improve age keys detection This change allows you to e.g. directly pass `$(age-keygen)` on the command line. --- pkgs/clan-cli/clan_cli/machines/create.py | 1 + pkgs/clan-cli/clan_cli/secrets/sops.py | 5 +++++ pkgs/clan-cli/clan_cli/secrets/types.py | 17 +++++++++-------- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/machines/create.py b/pkgs/clan-cli/clan_cli/machines/create.py index 1995bf38c..fc73cac93 100644 --- a/pkgs/clan-cli/clan_cli/machines/create.py +++ b/pkgs/clan-cli/clan_cli/machines/create.py @@ -81,6 +81,7 @@ def create_machine(opts: CreateOptions) -> None: msg = "Machine name must be a valid hostname" raise ClanError(msg, location="Create Machine") + # lopter@(2024-10-22): Could we just use warn and use the existing config? if dst.exists(): msg = f"Machine {machine_name} already exists in {clan_dir}" description = ( diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 8db72e88d..316419101 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -245,6 +245,11 @@ def maybe_get_admin_public_key() -> None | SopsKey: return None if len(keyring) > 1: + # louis@(2024-10-22): + # + # This is confusing when it shows up and you have no information + # about where each key is going from, could we log the discovery + # of each key? msg = ( f"Found more than {len(keyring)} public keys in your " f"environment/system and cannot decide which one to " diff --git a/pkgs/clan-cli/clan_cli/secrets/types.py b/pkgs/clan-cli/clan_cli/secrets/types.py index 0a2121cab..52d819ea6 100644 --- a/pkgs/clan-cli/clan_cli/secrets/types.py +++ b/pkgs/clan-cli/clan_cli/secrets/types.py @@ -21,14 +21,15 @@ def secret_name_type(arg_value: str) -> str: def public_or_private_age_key_type(arg_value: str) -> str: if Path(arg_value).is_file(): arg_value = Path(arg_value).read_text().strip() - if arg_value.startswith("age1"): - return arg_value.strip() - if arg_value.startswith("AGE-SECRET-KEY-"): - return get_public_age_key(arg_value) - if not arg_value.startswith("age1"): - msg = f"Please provide an age key starting with age1, got: '{arg_value}'" - raise ClanError(msg) - return arg_value + for line in arg_value.splitlines(): + if line.startswith("#"): + continue + if line.startswith("age1"): + return line.strip() + if line.startswith("AGE-SECRET-KEY-"): + return get_public_age_key(line) + msg = f"Please provide an age key starting with age1 or AGE-SECRET-KEY-, got: '{arg_value}'" + raise ClanError(msg) def group_or_user_name_type(what: str) -> Callable[[str], str]: From c266ffce6fd619724abcf23997db9888ab400925 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Mon, 4 Nov 2024 11:47:54 -0800 Subject: [PATCH 3/6] clan-cli: secrets: treemft --- pkgs/clan-cli/clan_cli/secrets/sops.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 316419101..e239677f4 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -49,7 +49,7 @@ class SopsKey: } -class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go +class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go ERROR_GENERIC = 1 COULD_NOT_READ_INPUT_FILE = 2 COULD_NOT_WRITE_OUTPUT_FILE = 3 @@ -130,7 +130,7 @@ class Operation(enum.StrEnum): # # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 for var in os.environ: - if var.startswith("SOPS_") and var not in { # allowed: + if var.startswith("SOPS_") and var not in { # allowed: "SOPS_GPG_EXEC", "SOPS_AGE_KEY", "SOPS_AGE_KEY_FILE", @@ -290,8 +290,8 @@ def collect_private_age_keys() -> Sequence[str]: def maybe_read_from_path(key_path: Path) -> None: try: contents = Path(key_path).read_text().strip() - lines = contents.splitlines() # as in parse.go in age: - private_age_keys.extend(l for l in lines if l and not l.startswith("#")) + lines = contents.splitlines() # as in parse.go in age: + private_age_keys.extend(ln for ln in lines if ln and not ln.startswith("#")) except FileNotFoundError: return except Exception as ex: From 07cd22393ab5efda8d0bfed04bc783e442a2f9bd Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Mon, 18 Nov 2024 17:11:25 -0800 Subject: [PATCH 4/6] clan-cli: rebase sops changes on top of vars changes vars changes in question are from commit: https://git.clan.lol/clan/clan-core/commit/8b94bc71bcce63fa71ae54c1aeb5fad64e0d4903 With this changeset the age specific sops logic that was added is now generic. To keep things simple, this changeset modifies `SopsKey` so that `username` is ignored when comparing different keys. I don't really see us relying on `username` and this makes `SopsKey` hashable, and usable in a `set`, which is nice when you check that you have a particular key. --- pkgs/clan-cli/clan_cli/secrets/sops.py | 41 +++++++++++++-- .../clan_cli/vars/secret_modules/sops.py | 51 +++++++++---------- 2 files changed, 61 insertions(+), 31 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index e239677f4..5d34c9941 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -1,3 +1,4 @@ +import dataclasses import enum import functools import io @@ -8,7 +9,6 @@ import shutil import subprocess from collections.abc import Iterable, Sequence from contextlib import suppress -from dataclasses import dataclass from pathlib import Path from tempfile import NamedTemporaryFile from typing import IO, Any, Protocol @@ -34,11 +34,26 @@ class KeyType(enum.Enum): return cls.__members__.get(value.upper()) return None + @property + def sops_recipient_attr(self) -> str: + """Name of the attribute to get the recipient key from a Sops file.""" + if self == self.AGE: + return "recipient" + if self == self.PGP: + return "fp" + msg = ( + f"KeyType is not properly implemented: " + f'"sops_recipient_attr" is missing for key type "{self.name}"' + ) + raise ClanError(msg) -@dataclass(frozen=True, eq=False) + +@dataclasses.dataclass(frozen=True) class SopsKey: pubkey: str - username: str + # Two SopsKey are considered equal even + # if they don't have the same username: + username: str = dataclasses.field(compare=False) key_type: KeyType def as_dict(self) -> dict[str, str]: @@ -48,6 +63,13 @@ class SopsKey: "type": self.key_type.name.lower(), } + @classmethod + def load_dir(cls, dir: Path) -> "SopsKey": # noqa: ANN102 + """Load from the file named `keys.json` in the given directory.""" + pubkey, key_type = read_key(dir) + username = "" + return cls(pubkey, username, key_type) + class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go ERROR_GENERIC = 1 @@ -389,6 +411,19 @@ def decrypt_file(secret_path: Path) -> str: return stdout +def get_recipients(secret_path: Path) -> set[SopsKey]: + sops_attrs = json.loads((secret_path / "secret").read_text())["sops"] + return { + SopsKey( + pubkey=recipient[key_type.sops_recipient_attr], + username="", + key_type=key_type, + ) + for key_type in KeyType + for recipient in sops_attrs[key_type.name.lower()] + } + + def get_meta(secret_path: Path) -> dict: meta_path = secret_path.parent / "meta.json" if not meta_path.exists(): diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index 5b02d55ac..3b60f471a 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -1,10 +1,10 @@ -import json from dataclasses import dataclass from pathlib import Path from typing import override from clan_cli.errors import ClanError from clan_cli.machines.machines import Machine +from clan_cli.secrets import sops from clan_cli.secrets.folders import ( sops_machines_folder, sops_secrets_folder, @@ -17,8 +17,8 @@ from clan_cli.secrets.secrets import ( encrypt_secret, has_secret, ) -from clan_cli.secrets.sops import KeyType, generate_private_key -from clan_cli.vars.generate import Generator, Var +from clan_cli.vars.generate import Generator +from clan_cli.vars.var import Var from . import SecretStoreBase @@ -52,7 +52,7 @@ class SecretStore(SecretStoreBase): if has_machine(self.machine.flake_dir, self.machine.name): return - priv_key, pub_key = generate_private_key() + priv_key, pub_key = sops.generate_private_key() encrypt_secret( self.machine.flake_dir, sops_secrets_folder(self.machine.flake_dir) @@ -69,24 +69,18 @@ class SecretStore(SecretStoreBase): def user_has_access( self, user: str, generator: Generator, secret_name: str ) -> bool: - secret_path = self.secret_path(generator, secret_name) - secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] - users_folder_path = sops_users_folder(self.machine.flake_dir) - user_pubkey = json.loads((users_folder_path / user / "key.json").read_text())[ - "publickey" - ] - return user_pubkey in recipients + key_dir = sops_users_folder(self.machine.flake_dir) / user + return self.key_has_access(key_dir, generator, secret_name) def machine_has_access(self, generator: Generator, secret_name: str) -> bool: + key_dir = sops_machines_folder(self.machine.flake_dir) / self.machine.name + return self.key_has_access(key_dir, generator, secret_name) + + def key_has_access( + self, key_dir: Path, generator: Generator, secret_name: str + ) -> bool: secret_path = self.secret_path(generator, secret_name) - secret = json.loads((secret_path / "secret").read_text()) - recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] - machines_folder_path = sops_machines_folder(self.machine.flake_dir) - machine_pubkey = json.loads( - (machines_folder_path / self.machine.name / "key.json").read_text() - )["publickey"] - return machine_pubkey in recipients + return sops.SopsKey.load_dir(key_dir) in sops.get_recipients(secret_path) def secret_path(self, generator: Generator, secret_name: str) -> Path: return self.directory(generator, secret_name) @@ -141,7 +135,7 @@ class SecretStore(SecretStoreBase): secret_folder = self.secret_path(generator, name) add_secret(self.machine.flake_dir, self.machine.name, secret_folder) - def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]: + def collect_keys_for_secret(self, path: Path) -> set[sops.SopsKey]: from clan_cli.secrets.secrets import ( collect_keys_for_path, collect_keys_for_type, @@ -159,23 +153,24 @@ class SecretStore(SecretStoreBase): self.machine.flake_dir / "sops" / "groups" / group / "users" ) ) - return keys - @override + return { + sops.SopsKey(pubkey=key, username="", key_type=key_type) + for (key, key_type) in keys + } + + # } def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]: secret_path = self.secret_path(generator, name) - recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"] - current_recipients = {r["recipient"] for r in recipients_} - wanted_recipients = { - key[0] for key in self.collect_keys_for_secret(secret_path) - } + current_recipients = sops.get_recipients(secret_path) + wanted_recipients = self.collect_keys_for_secret(secret_path) needs_update = current_recipients != wanted_recipients recipients_to_add = wanted_recipients - current_recipients var_id = f"{generator.name}/{name}" msg = ( f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. " f"This could have been a malicious actor trying to add their keys, please investigate. " - f"Added keys: {', '.join(recipients_to_add)}" + f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}" ) return needs_update, msg From ffe7b9057d30bf2e3eb61433fd014e954066d5e7 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Tue, 19 Nov 2024 16:34:50 -0800 Subject: [PATCH 5/6] clan-cli: secrets: address CR feedback for sops encryption and key handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move public keys collection to a class method on `SopsKey`, and implement collection for each key type in `KeyType`, this helps make the code more generic ; - Replace `Operation.__call__` by `run` (`sops.run` if you import the entire module), that allows us to dedent the code so that's cool ; - Fix exception handling when trying to get a in-memory temporary file ; - Make Executor cuter 😵🪦. --- pkgs/clan-cli/clan_cli/secrets/secrets.py | 12 +- pkgs/clan-cli/clan_cli/secrets/sops.py | 319 ++++++++++++---------- 2 files changed, 180 insertions(+), 151 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py index e665a4c8e..14e41b025 100644 --- a/pkgs/clan-cli/clan_cli/secrets/secrets.py +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -94,7 +94,7 @@ def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]: def encrypt_secret( flake_dir: Path, secret_path: Path, - value: IO[str] | str | bytes | None, + value: IO[bytes] | str | bytes | None, add_users: list[str] | None = None, add_machines: list[str] | None = None, add_groups: list[str] | None = None, @@ -305,10 +305,10 @@ def list_command(args: argparse.Namespace) -> None: def decrypt_secret(flake_dir: Path, secret_path: Path) -> str: - # I can't think of a good way to ensure that we have the private key for - # the secret. I mean we could collect all private keys we could find and - # then make sure we have the one for the secret, but that seems - # complicated for little ux gain? + # lopter(2024-10): I can't think of a good way to ensure that we have the + # private key for the secret. I mean we could collect all private keys we + # could find and then make sure we have the one for the secret, but that + # seems complicated for little ux gain? path = secret_path / "secret" if not path.exists(): msg = f"Secret '{secret_path!s}' does not exist" @@ -332,7 +332,7 @@ def is_tty_interactive() -> bool: def set_command(args: argparse.Namespace) -> None: env_value = os.environ.get("SOPS_NIX_SECRET") - secret_value: str | IO[str] | None = sys.stdin + secret_value: str | IO[bytes] | None = sys.stdin.buffer if args.edit: secret_value = None elif env_value: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 5d34c9941..664232313 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -11,10 +11,10 @@ from collections.abc import Iterable, Sequence from contextlib import suppress from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO, Any, Protocol +from typing import IO, Protocol +import clan_cli.cmd from clan_cli.api import API -from clan_cli.cmd import Log, run from clan_cli.dirs import user_config_dir from clan_cli.errors import ClanError, CmdOut from clan_cli.nix import nix_shell @@ -47,6 +47,56 @@ class KeyType(enum.Enum): ) raise ClanError(msg) + def collect_public_keys(self) -> Sequence[str]: + keyring: Sequence[str] = [] + + if self == self.AGE: + if keys := os.environ.get("SOPS_AGE_KEY"): + # SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and + # reads identities line by line. See age/keysource.go in + # Sops, and age/parse.go in Age. + for private_key in keys.strip().splitlines(): + public_key = get_public_age_key(private_key) + log.info( + f"Found age public key from a private key " + f"in the environment (SOPS_AGE_KEY): {public_key}" + ) + keyring.append(public_key) + + def maybe_read_from_path(key_path: Path) -> None: + try: + # as in parse.go in age: + lines = Path(key_path).read_text().strip().splitlines() + for private_key in filter(lambda ln: not ln.startswith("#"), lines): + public_key = get_public_age_key(private_key) + log.info( + f"Found age public key from a private key " + f"in {key_path}: {public_key}" + ) + keyring.append(public_key) + except FileNotFoundError: + return + except Exception as ex: + log.warn(f"Could not read age keys from {key_path}: {ex}") + + # Sops will try every location, see age/keysource.go + if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): + maybe_read_from_path(Path(key_path)) + maybe_read_from_path(user_config_dir() / "sops/age/keys.txt") + + return keyring + + if self == self.PGP: + if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"): + for fp in pgp_fingerprints.strip().split(","): + msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}" + log.info(msg) + keyring.append(fp) + return keyring + + msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys" + raise ClanError(msg) + @dataclasses.dataclass(frozen=True) class SopsKey: @@ -64,12 +114,20 @@ class SopsKey: } @classmethod - def load_dir(cls, dir: Path) -> "SopsKey": # noqa: ANN102 + def load_dir(cls, folder: Path) -> "SopsKey": # noqa: ANN102 """Load from the file named `keys.json` in the given directory.""" - pubkey, key_type = read_key(dir) + pubkey, key_type = read_key(folder) username = "" return cls(pubkey, username, key_type) + @classmethod + def collect_public_keys(cls) -> Sequence["SopsKey"]: # noqa: ANN102 + return [ + cls(pubkey=key, username="", key_type=key_type) + for key_type in KeyType + for key in key_type.collect_public_keys() + ] + class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go ERROR_GENERIC = 1 @@ -105,75 +163,81 @@ class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go return ExitStatus(code) if code in ExitStatus else None -class Executor(Protocol): +class Executer(Protocol): def __call__( self, cmd: list[str], *, env: dict[str, str] | None = None ) -> CmdOut: ... class Operation(enum.StrEnum): - decrypt = "decrypt" - edit = "edit" - encrypt = "encrypt" - update_keys = "updatekeys" + DECRYPT = "decrypt" + EDIT = "edit" + ENCRYPT = "encrypt" + UPDATE_KEYS = "updatekeys" - def __call__( - self, - secret_path: Path, - public_keys: Iterable[tuple[str, KeyType]], - executor: Executor, - ) -> tuple[int, str]: - sops_cmd = ["sops"] - environ = os.environ.copy() - with NamedTemporaryFile(delete=False, mode="w") as manifest: - if self == Operation.decrypt: - sops_cmd.append("decrypt") - else: - # When sops is used to edit a file the config is only used at - # file creation, otherwise the keys from the exising file are - # used. - sops_cmd.extend(["--config", manifest.name]) - keys_by_type: dict[KeyType, list[str]] = {} - keys_by_type = {key_type: [] for key_type in KeyType} - for key, key_type in public_keys: - keys_by_type[key_type].append(key) - it = keys_by_type.items() - key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] - rules = {"creation_rules": [{"key_groups": key_groups}]} - json.dump(rules, manifest, indent=2) - manifest.flush() +def run( + call: Operation, + secret_path: Path, + public_keys: Iterable[tuple[str, KeyType]], + executer: Executer, +) -> tuple[int, str]: + """Call the sops binary for the given operation.""" + # louis(2024-11-19): I regrouped the call into the sops binary into this + # one place because calling into sops needs to be done with a carefully + # setup context, and I don't feel good about the idea of having that logic + # exist in multiple places. + sops_cmd = ["sops"] + environ = os.environ.copy() + with NamedTemporaryFile(delete=False, mode="w") as manifest: + if call == Operation.DECRYPT: + sops_cmd.append("decrypt") + else: + # When sops is used to edit a file the config is only used at + # file creation, otherwise the keys from the exising file are + # used. + sops_cmd.extend(["--config", manifest.name]) - if self == Operation.encrypt: - # Remove SOPS env vars used to specify public keys to force - # sops to use our config file [1]; so that the file gets - # encrypted with our keys and not something leaking out of - # the environment. - # - # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 - for var in os.environ: - if var.startswith("SOPS_") and var not in { # allowed: - "SOPS_GPG_EXEC", - "SOPS_AGE_KEY", - "SOPS_AGE_KEY_FILE", - "SOPS_NIX_SECRET", - }: - del environ[var] - sops_cmd.extend(["encrypt", "--in-place"]) - elif self == Operation.update_keys: - sops_cmd.extend(["updatekeys", "--yes"]) - elif self != Operation.edit: - known_operations = ",".join(Operation.__members__.values()) - msg = ( - f"Unsupported sops operation {self.value} " - f"(known operations: {known_operations})" - ) - raise ClanError(msg) - sops_cmd.append(str(secret_path)) + keys_by_type: dict[KeyType, list[str]] = {} + keys_by_type = {key_type: [] for key_type in KeyType} + for key, key_type in public_keys: + keys_by_type[key_type].append(key) + it = keys_by_type.items() + key_groups = [{key_type.name.lower(): keys for key_type, keys in it}] + rules = {"creation_rules": [{"key_groups": key_groups}]} + json.dump(rules, manifest, indent=2) + manifest.flush() - cmd = nix_shell(["nixpkgs#sops"], sops_cmd) - p = executor(cmd, env=environ) - return p.returncode, p.stdout + if call == Operation.ENCRYPT: + # Remove SOPS env vars used to specify public keys to force + # sops to use our config file [1]; so that the file gets + # encrypted with our keys and not something leaking out of + # the environment. + # + # [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229 + for var in os.environ: + if var.startswith("SOPS_") and var not in { # allowed: + "SOPS_GPG_EXEC", + "SOPS_AGE_KEY", + "SOPS_AGE_KEY_FILE", + "SOPS_NIX_SECRET", + }: + del environ[var] + sops_cmd.extend(["encrypt", "--in-place"]) + elif call == Operation.UPDATE_KEYS: + sops_cmd.extend(["updatekeys", "--yes"]) + elif call != Operation.EDIT: + known_operations = ",".join(Operation.__members__.values()) + msg = ( + f"Unsupported sops operation {call.value} " + f"(known operations: {known_operations})" + ) + raise ClanError(msg) + sops_cmd.append(str(secret_path)) + + cmd = nix_shell(["nixpkgs#sops"], sops_cmd) + p = executer(cmd, env=environ) + return p.returncode, p.stdout def get_public_age_key(privkey: str) -> str: @@ -191,7 +255,7 @@ def get_public_age_key(privkey: str) -> str: def generate_private_key(out_file: Path | None = None) -> tuple[str, str]: cmd = nix_shell(["nixpkgs#age"], ["age-keygen"]) try: - proc = run(cmd) + proc = clan_cli.cmd.run(cmd) res = proc.stdout.strip() pubkey = None private_key = None @@ -262,21 +326,17 @@ def default_admin_private_key_path() -> Path: @API.register def maybe_get_admin_public_key() -> None | SopsKey: - keyring = collect_public_keys() + keyring = SopsKey.collect_public_keys() if len(keyring) == 0: return None if len(keyring) > 1: - # louis@(2024-10-22): - # - # This is confusing when it shows up and you have no information - # about where each key is going from, could we log the discovery - # of each key? + last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]] msg = ( f"Found more than {len(keyring)} public keys in your " f"environment/system and cannot decide which one to " - f"use, first three:\n\n" - f"- {'\n- '.join(str(key.as_dict()) for key in keyring[:3])}\n\n" + f"use, first {len(last_3)}:\n\n" + f"- {'\n- '.join(last_3)}\n\n" f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or " f"SOPS_PGP_FP appropriately" ) @@ -285,48 +345,6 @@ def maybe_get_admin_public_key() -> None | SopsKey: return keyring[0] -def collect_public_keys() -> Sequence[SopsKey]: - username = "" - keyring: list[SopsKey] = [] - - for private_key in collect_private_age_keys(): - public_key = get_public_age_key(private_key) - keyring.append(SopsKey(public_key, username, KeyType.AGE)) - - if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"): - for fp in pgp_fingerprints.strip().split(","): - keyring.append(SopsKey(fp, username, KeyType.PGP)) - - return keyring - - -def collect_private_age_keys() -> Sequence[str]: - private_age_keys: list[str] = [] - - if keys := os.environ.get("SOPS_AGE_KEY"): - # SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and reads - # identities line by line. See age/keysource.go in Sops, and - # age/parse.go in Age. - private_age_keys.extend(keys.strip().splitlines()) - - def maybe_read_from_path(key_path: Path) -> None: - try: - contents = Path(key_path).read_text().strip() - lines = contents.splitlines() # as in parse.go in age: - private_age_keys.extend(ln for ln in lines if ln and not ln.startswith("#")) - except FileNotFoundError: - return - except Exception as ex: - log.warn(f"Could not read age keys from {key_path}: {ex}") - - # Sops will try every location, see age/keysource.go - if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): - maybe_read_from_path(Path(key_path)) - maybe_read_from_path(user_config_dir() / "sops/age/keys.txt") - - return private_age_keys - - def ensure_admin_public_key(flake_dir: Path) -> SopsKey: key = maybe_get_admin_public_key() if key: @@ -338,15 +356,17 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey: def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]: secret_path = secret_path / "secret" error_msg = f"Could not update keys for {secret_path}" - executor = functools.partial(run, log=Log.BOTH, error_msg=error_msg) - rc, _ = Operation.update_keys(secret_path, keys, executor) + executer = functools.partial( + clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg + ) + rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer) was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED return [secret_path] if was_modified else [] def encrypt_file( secret_path: Path, - content: IO[str] | str | bytes | None, + content: str | IO[bytes] | bytes | None, pubkeys: list[tuple[str, KeyType]], ) -> None: folder = secret_path.parent @@ -355,7 +375,7 @@ def encrypt_file( if not content: # Don't use our `run` here, because it breaks editor integration. # We never need this in our UI. - def executor(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut: + def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut: return CmdOut( stdout="", stderr="", @@ -366,48 +386,57 @@ def encrypt_file( msg=None, ) - rc, _ = Operation.edit(secret_path, pubkeys, executor) + rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer) status = ExitStatus.parse(rc) if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED: return msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}" raise ClanError(msg) - def swap_secret(f: Any) -> None: # f's type is not exposed by tempfile - try: + # lopter(2024-11-19): imo NamedTemporaryFile does RAII wrong since it + # creates the file in __init__, when really it should be created in + # __enter__ (that is in Python __enter__ is actually __init__ from a RAII + # perspective, and __init__ should just be thought off as syntax sugar to + # capture extra context), and now the linter is unhappy so hush it. Note + # that if NamedTemporaryFile created the file in __enter__ then we'd have + # to change exception handling: + try: + source = NamedTemporaryFile(dir="/dev/shm", delete=False) # noqa: SIM115 + except (FileNotFoundError, PermissionError): + source = NamedTemporaryFile(delete=False) # noqa: SIM115 + try: # swap the secret: + with source: if isinstance(content, str): - Path(f.name).write_text(content) + source.file.write(content.encode()) elif isinstance(content, bytes): - Path(f.name).write_bytes(content) - elif isinstance(content, io.IOBase): - with Path(f.name).open("w") as fd: - shutil.copyfileobj(content, fd) + source.file.write(content) + elif isinstance(content, io.BufferedReader): + # lopter@(2024-11-19): mypy is freaking out on the 1st + # argument, idk why, it says: + # + # > Cannot infer type argument 1 of "copyfileobj" + shutil.copyfileobj(content, source.file) # type: ignore[misc] else: msg = f"Invalid content type: {type(content)}" raise ClanError(msg) - Operation.encrypt(f.name, pubkeys, functools.partial(run, log=Log.BOTH)) - # atomic copy of the encrypted file - with NamedTemporaryFile(dir=folder, delete=False) as f2: - shutil.copyfile(f.name, f2.name) - Path(f2.name).rename(secret_path) - finally: - with suppress(OSError): - Path(f.name).unlink() - - try: - with NamedTemporaryFile(dir="/dev/shm", delete=False) as f: - swap_secret(f) - except (FileNotFoundError, PermissionError): - # hopefully /tmp is written to an in-memory file to avoid leaking secrets - with NamedTemporaryFile(delete=False) as f: - swap_secret(f) + executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH) + run(Operation.ENCRYPT, Path(source.name), pubkeys, executer) + # atomic copy of the encrypted file + with NamedTemporaryFile(dir=folder, delete=False) as dest: + shutil.copyfile(source.name, dest.name) + Path(dest.name).rename(secret_path) + finally: + with suppress(OSError): + Path(source.name).unlink() def decrypt_file(secret_path: Path) -> str: # decryption uses private keys from the environment or default paths: no_public_keys_needed: list[tuple[str, KeyType]] = [] - executor = functools.partial(run, error_msg=f"Could not decrypt {secret_path}") - _, stdout = Operation.decrypt(secret_path, no_public_keys_needed, executor) + executer = functools.partial( + clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}" + ) + _, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer) return stdout @@ -420,7 +449,7 @@ def get_recipients(secret_path: Path) -> set[SopsKey]: key_type=key_type, ) for key_type in KeyType - for recipient in sops_attrs[key_type.name.lower()] + for recipient in sops_attrs[key_type.name.lower()] or [] } From 93fa2af23967a69d517aa1059f1e17fe90cae11f Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Wed, 20 Nov 2024 16:54:58 -0800 Subject: [PATCH 6/6] clan-cli: secrets: sops: `SOPS_NIX_SECRET` is not a sops variable It's a variable internal to us, and it does not need to be allowed. --- pkgs/clan-cli/clan_cli/secrets/sops.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 664232313..d2ec87202 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -220,7 +220,6 @@ def run( "SOPS_GPG_EXEC", "SOPS_AGE_KEY", "SOPS_AGE_KEY_FILE", - "SOPS_NIX_SECRET", }: del environ[var] sops_cmd.extend(["encrypt", "--in-place"])