diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 1007faaaf..dc85a3926 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -91,10 +91,7 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user_or_machine( - flake_dir: Path, pub_key: str, key_type: KeyType -) -> SopsKey | None: - key = SopsKey(pub_key, username="", key_type=key_type) +def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None: folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] for folder in folders: @@ -103,7 +100,7 @@ def maybe_get_user_or_machine( if not (user / "key.json").exists(): continue this_pub_key, this_key_type = read_key(user) - if pub_key == this_pub_key and key_type == this_key_type: + if key.pubkey == this_pub_key and key.key_type == this_key_type: key.username = user.name return key @@ -111,12 +108,12 @@ def maybe_get_user_or_machine( @API.register -def ensure_user_or_machine(flake_dir: Path, pub_key: str, key_type: KeyType) -> SopsKey: - key = maybe_get_user_or_machine(flake_dir, pub_key, key_type) - if not key: - msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" - raise ClanError(msg) - return key +def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey: + maybe_key = maybe_get_user_or_machine(flake_dir, key) + if maybe_key: + return maybe_key + msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)" + raise ClanError(msg) def default_admin_key_path() -> Path: @@ -127,37 +124,43 @@ def default_admin_key_path() -> Path: @API.register -def maybe_get_admin_public_key() -> tuple[str, KeyType | None]: +def maybe_get_admin_public_key() -> None | SopsKey: age_key = os.environ.get("SOPS_AGE_KEY") pgp_key = os.environ.get("SOPS_PGP_FP") if age_key and pgp_key: msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other." raise ClanError(msg) if age_key: - return get_public_age_key(age_key), KeyType.AGE + return SopsKey( + pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username="" + ) if pgp_key: - return pgp_key, KeyType.PGP + return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="") path = default_admin_key_path() if path.exists(): - return get_public_age_key(path.read_text()), KeyType.AGE + return SopsKey( + pubkey=get_public_age_key(path.read_text()), + key_type=KeyType.AGE, + username="", + ) - return "", None - - -def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: - pub_key, key_type = maybe_get_admin_public_key() - if key_type: - return maybe_get_user_or_machine(flake_dir, pub_key, key_type) return None +def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: + key = maybe_get_admin_public_key() + if not key: + return None + return maybe_get_user_or_machine(flake_dir, key) + + def ensure_admin_key(flake_dir: Path) -> SopsKey: - pub_key, key_type = maybe_get_admin_public_key() - if not key_type: - msg = "No sops key found. Please generate one with 'clan secrets key generate'." - raise ClanError(msg) - return ensure_user_or_machine(flake_dir, pub_key, key_type) + key = maybe_get_admin_public_key() + if key: + return ensure_user_or_machine(flake_dir, key) + msg = "No sops key found. Please generate one with 'clan secrets key generate'." + raise ClanError(msg) @contextmanager