From 8f16cf02826b544bd6e5a50a9bd3b3eec6ae15b6 Mon Sep 17 00:00:00 2001 From: DavHau Date: Fri, 4 Oct 2024 18:20:53 +0200 Subject: [PATCH] Revert "vars: refactor - copy logic to sops secret module" This reverts commit ae53ea7399fb097a7cc1c1edac24855db10677fb. --- pkgs/clan-cli/clan_cli/vars/keygen.py | 4 +- .../clan_cli/vars/secret_modules/sops.py | 190 +----------------- 2 files changed, 13 insertions(+), 181 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/vars/keygen.py b/pkgs/clan-cli/clan_cli/vars/keygen.py index 3d20f94d9..cfee7a51c 100644 --- a/pkgs/clan-cli/clan_cli/vars/keygen.py +++ b/pkgs/clan-cli/clan_cli/vars/keygen.py @@ -5,8 +5,8 @@ import os from clan_cli.clan_uri import FlakeId from clan_cli.errors import ClanError from clan_cli.secrets.key import generate_key +from clan_cli.secrets.sops import maybe_get_admin_public_key from clan_cli.secrets.users import add_user -from clan_cli.vars.secret_modules.sops import SecretStore as SopsSecretStore log = logging.getLogger(__name__) @@ -17,7 +17,7 @@ def keygen(user: str | None, flake: FlakeId, force: bool) -> None: if not user: msg = "No user provided and $USER is not set. Please provide a user via --user." raise ClanError(msg) - pub_key = SopsSecretStore.maybe_get_admin_public_key() + pub_key = maybe_get_admin_public_key() if not pub_key: pub_key = generate_key() # TODO set flake_dir=flake.path / "vars" diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py index 761621674..fe1a888ae 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/sops.py @@ -1,32 +1,13 @@ import json -import os -import subprocess from dataclasses import dataclass from pathlib import Path -from typing import IO -from clan_cli.dirs import user_config_dir from clan_cli.errors import ClanError -from clan_cli.git import commit_files from clan_cli.machines.machines import Machine -from clan_cli.nix import nix_shell from clan_cli.secrets.folders import sops_machines_folder, sops_secrets_folder -from clan_cli.secrets.machines import add_secret -from clan_cli.secrets.secrets import ( - allow_member, - collect_keys_for_path, - decrypt_secret, - groups_folder, - has_secret, - machines_folder, - users_folder, -) -from clan_cli.secrets.sops import ( - decrypt_file, - encrypt_file, - generate_private_key, - read_key, -) +from clan_cli.secrets.machines import add_machine, add_secret, has_machine +from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret +from clan_cli.secrets.sops import generate_private_key from . import SecretStoreBase @@ -59,171 +40,21 @@ class SecretStore(SecretStoreBase): if not has_secrets: return - # exit early if the machine already exists - if (self.sops_dir / "machines" / self.machine.name / "key.json").exists(): + if has_machine(self.machine.flake_dir, self.machine.name): return - priv_key, pub_key = generate_private_key() - self.encrypt_secret( - self.sops_dir / "secrets" / f"{self.machine.name}-age.key", + encrypt_secret( + self.machine.flake_dir, + sops_secrets_folder(self.machine.flake_dir) + / f"{self.machine.name}-age.key", priv_key, ) - self.add_machine(self.machine.name, pub_key) - - @property - def sops_dir(self) -> Path: - return self.machine.flake_dir / "sops" + add_machine(self.machine.flake_dir, self.machine.name, pub_key, False) @property def store_name(self) -> str: return "sops" - def add_machine(self, machine: str, pubkey: str) -> None: - machine_path = self.sops_dir / "machines" / machine - self.write_key(machine_path, pubkey) - paths = [machine_path] - commit_files( - paths, - self.machine.flake_dir, - f"Add machine {machine} to secrets", - ) - - def write_key(self, machine_path: Path, publickey: str) -> None: - machine_path.mkdir(parents=True, exist_ok=True) - try: - flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_EXCL - fd = os.open(machine_path / "key.json", flags) - except FileExistsError as e: - msg = f"{machine_path.name} already exists in {machine_path}" - raise ClanError(msg) from e - with os.fdopen(fd, "w") as f: - json.dump({"publickey": publickey, "type": "age"}, f, indent=2) - - @staticmethod - def default_admin_key_path() -> Path: - raw_path = os.environ.get("SOPS_AGE_KEY_FILE") - if raw_path: - return Path(raw_path) - return user_config_dir() / "sops" / "age" / "keys.txt" - - @staticmethod - def get_public_key(privkey: str) -> str: - cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) - try: - res = subprocess.run( - cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True - ) - except subprocess.CalledProcessError as e: - msg = "Failed to get public key for age private key. Is the key malformed?" - raise ClanError(msg) from e - return res.stdout.strip() - - @classmethod - def maybe_get_admin_public_key(cls: type["SecretStore"]) -> str | None: - key = os.environ.get("SOPS_AGE_KEY") - if key: - return cls.get_public_key(key) - path = cls.default_admin_key_path() - if path.exists(): - return cls.get_public_key(path.read_text()) - - return None - - # TODO: get rid of `clan secrets generate` dependency - def admin_key(self) -> SopsKey: - pub_key = SecretStore.maybe_get_admin_public_key() - if not pub_key: - raise MissingKeyError - return self.ensure_user_or_machine(pub_key) - - # TODO: find alternative to `clan secrets users add` - def ensure_user_or_machine(self, pub_key: str) -> SopsKey: - key = self.maybe_get_user(pub_key) - if not key: - raise MissingKeyError - return key - - def maybe_get_user(self, pub_key: str) -> SopsKey | None: - key = SopsKey(pub_key, username="") - folders = [self.sops_dir / "users"] - - for folder in folders: - if folder.exists(): - for user in folder.iterdir(): - if not (user / "key.json").exists(): - continue - if read_key(user) == pub_key: - key.username = user.name - return key - - return None - - def encrypt_secret( - self, - secret_path: Path, - value: IO[str] | str | bytes | None, - add_machines: list[str] | None = None, - add_groups: list[str] | None = None, - git_commit: bool = True, - ) -> None: - if add_groups is None: - add_groups = [] - if add_machines is None: - add_machines = [] - key = self.admin_key() - recipient_keys = set() - - files_to_commit = [] - for machine in add_machines: - files_to_commit.extend( - allow_member( - machines_folder(secret_path), - self.sops_dir / "machines", - machine, - False, - ) - ) - - for group in add_groups: - files_to_commit.extend( - allow_member( - groups_folder(secret_path), - self.sops_dir / "groups", - group, - False, - ) - ) - - recipient_keys = collect_keys_for_path(secret_path) - - if key.pubkey not in recipient_keys: - recipient_keys.add(key.pubkey) - files_to_commit.extend( - allow_member( - users_folder(secret_path), - self.sops_dir / "users", - key.username, - False, - ) - ) - - secret_path = secret_path / "secret" - encrypt_file(secret_path, value, sorted(recipient_keys)) - files_to_commit.append(secret_path) - if git_commit: - commit_files( - files_to_commit, - self.machine.flake_dir, - f"Update secret {secret_path.parent.name}", - ) - - def decrypt_secret(self, secret_path: Path) -> str: - path = secret_path / "secret" - if not path.exists(): - msg = f"Secret '{secret_path!s}' does not exist" - raise ClanError(msg) - return decrypt_file(path) - def machine_has_access( self, generator_name: str, secret_name: str, shared: bool ) -> bool: @@ -262,7 +93,8 @@ class SecretStore(SecretStoreBase): add_secret(self.machine.flake_dir, self.machine.name, secret_folder) else: # initialize the secret - self.encrypt_secret( + encrypt_secret( + self.machine.flake_dir, secret_folder, value, add_machines=[self.machine.name] if deployed else [],