diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index cd2593bd2..48b096d1e 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -1,15 +1,42 @@ import json +import os +import subprocess +from dataclasses import dataclass from pathlib import Path +from typing import IO +from clan_cli.dirs import user_config_dir +from clan_cli.errors import ClanError +from clan_cli.git import commit_files from clan_cli.machines.machines import Machine +from clan_cli.nix import nix_shell from clan_cli.secrets.folders import sops_machines_folder, sops_secrets_folder -from clan_cli.secrets.machines import add_machine, add_secret, has_machine -from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret -from clan_cli.secrets.sops import generate_private_key +from clan_cli.secrets.machines import add_secret +from clan_cli.secrets.secrets import ( + allow_member, + collect_keys_for_path, + decrypt_secret, + groups_folder, + has_secret, + machines_folder, + users_folder, +) +from clan_cli.secrets.sops import ( + decrypt_file, + encrypt_file, + generate_private_key, + read_key, +) from . import SecretStoreBase +@dataclass +class SopsKey: + pubkey: str + username: str + + class SecretStore(SecretStoreBase): def __init__(self, machine: Machine) -> None: self.machine = machine @@ -26,21 +53,171 @@ class SecretStore(SecretStoreBase): if not has_secrets: return - if has_machine(self.machine.flake_dir, self.machine.name): + # exit early if the machine already exists + if (self.sops_dir / "machines" / self.machine.name / "key.json").exists(): return + priv_key, pub_key = generate_private_key() - encrypt_secret( - self.machine.flake_dir, - sops_secrets_folder(self.machine.flake_dir) - / f"{self.machine.name}-age.key", + self.encrypt_secret( + self.sops_dir / "secrets" / f"{self.machine.name}-age.key", priv_key, ) - add_machine(self.machine.flake_dir, self.machine.name, pub_key, False) + self.add_machine(self.machine.name, pub_key) + + @property + def sops_dir(self) -> Path: + return self.machine.flake_dir / "sops" @property def store_name(self) -> str: return "sops" + def add_machine(self, machine: str, pubkey: str) -> None: + machine_path = self.sops_dir / "machines" / machine + self.write_key(machine_path, pubkey) + paths = [machine_path] + commit_files( + paths, + self.machine.flake_dir, + f"Add machine {machine} to secrets", + ) + + def write_key(self, machine_path: Path, publickey: str) -> None: + machine_path.mkdir(parents=True, exist_ok=True) + try: + flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_EXCL + fd = os.open(machine_path / "key.json", flags) + except FileExistsError as e: + msg = f"{machine_path.name} already exists in {machine_path}" + raise ClanError(msg) from e + with os.fdopen(fd, "w") as f: + json.dump({"publickey": publickey, "type": "age"}, f, indent=2) + + def default_admin_key_path(self) -> Path: + raw_path = os.environ.get("SOPS_AGE_KEY_FILE") + if raw_path: + return Path(raw_path) + return user_config_dir() / "sops" / "age" / "keys.txt" + + def get_public_key(self, privkey: str) -> str: + cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) + try: + res = subprocess.run( + cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True + ) + except subprocess.CalledProcessError as e: + msg = "Failed to get public key for age private key. Is the key malformed?" + raise ClanError(msg) from e + return res.stdout.strip() + + def maybe_get_admin_public_key(self) -> str | None: + key = os.environ.get("SOPS_AGE_KEY") + if key: + return self.get_public_key(key) + path = self.default_admin_key_path() + if path.exists(): + return self.get_public_key(path.read_text()) + + return None + + # TODO: get rid of `clan secrets generate` dependency + def admin_key(self) -> SopsKey: + pub_key = self.maybe_get_admin_public_key() + if not pub_key: + msg = "No sops key found. Please generate one with 'clan secrets key generate'." + raise ClanError(msg) + # return SopsKey(pub_key, username="") + return self.ensure_user_or_machine(pub_key) + + # TODO: find alternative to `clan secrets users add` + def ensure_user_or_machine(self, pub_key: str) -> SopsKey: + key = self.maybe_get_user_or_machine(pub_key) + if not key: + msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" + raise ClanError(msg) + return key + + def maybe_get_user_or_machine(self, pub_key: str) -> SopsKey | None: + key = SopsKey(pub_key, username="") + folders = [self.sops_dir / "users", self.sops_dir / "machines"] + + for folder in folders: + if folder.exists(): + for user in folder.iterdir(): + if not (user / "key.json").exists(): + continue + if read_key(user) == pub_key: + key.username = user.name + return key + + return None + + def encrypt_secret( + self, + secret_path: Path, + value: IO[str] | str | bytes | None, + add_machines: list[str] | None = None, + add_groups: list[str] | None = None, + git_commit: bool = True, + ) -> None: + if add_groups is None: + add_groups = [] + if add_machines is None: + add_machines = [] + key = self.admin_key() + recipient_keys = set() + + files_to_commit = [] + for machine in add_machines: + files_to_commit.extend( + allow_member( + machines_folder(secret_path), + self.sops_dir / "machines", + machine, + False, + ) + ) + + for group in add_groups: + files_to_commit.extend( + allow_member( + groups_folder(secret_path), + self.sops_dir / "groups", + group, + False, + ) + ) + + recipient_keys = collect_keys_for_path(secret_path) + + if key.pubkey not in recipient_keys: + recipient_keys.add(key.pubkey) + files_to_commit.extend( + allow_member( + users_folder(secret_path), + self.sops_dir / "users", + key.username, + False, + ) + ) + + secret_path = secret_path / "secret" + encrypt_file(secret_path, value, sorted(recipient_keys)) + files_to_commit.append(secret_path) + if git_commit: + commit_files( + files_to_commit, + self.machine.flake_dir, + f"Update secret {secret_path.parent.name}", + ) + + def decrypt_secret(self, secret_path: Path) -> str: + path = secret_path / "secret" + if not path.exists(): + msg = f"Secret '{secret_path!s}' does not exist" + raise ClanError(msg) + return decrypt_file(path) + def machine_has_access( self, generator_name: str, secret_name: str, shared: bool ) -> bool: @@ -79,8 +256,7 @@ class SecretStore(SecretStoreBase): add_secret(self.machine.flake_dir, self.machine.name, secret_folder) else: # initialize the secret - encrypt_secret( - self.machine.flake_dir, + self.encrypt_secret( secret_folder, value, add_machines=[self.machine.name] if deployed else [],