diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index b1292a0e9..0c07cb326 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -13,15 +13,16 @@ from .sops import ( default_admin_private_key_path, generate_private_key, load_age_plugins, - maybe_get_admin_public_key, + maybe_get_admin_public_keys, ) log = logging.getLogger(__name__) def generate_key() -> sops.SopsKey: - key = maybe_get_admin_public_key() - if key is not None: + keys = maybe_get_admin_public_keys() + if keys is not None: + key = keys[0] print(f"{key.key_type.name} key {key.pubkey} is already set", file=sys.stderr) return key @@ -44,11 +45,11 @@ def generate_command(args: argparse.Namespace) -> None: def show_command(args: argparse.Namespace) -> None: - key = sops.maybe_get_admin_public_key() - if not key: + keys = sops.maybe_get_admin_public_keys() + if not keys: msg = "No public key found" raise ClanError(msg) - json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) + json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True) def update_command(args: argparse.Namespace) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 842287695..34f91c870 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -7,7 +7,7 @@ import os import re import shutil import subprocess -from collections.abc import Iterable, Sequence +from collections.abc import Iterable from contextlib import suppress from pathlib import Path from tempfile import NamedTemporaryFile @@ -50,8 +50,8 @@ class KeyType(enum.Enum): ) raise ClanError(msg) - def collect_public_keys(self) -> Sequence[str]: - keyring: list[str] = [] + def collect_public_keys(self) -> list[str]: + keyring = [] if self == self.AGE: @@ -136,7 +136,7 @@ class SopsKey: return read_keys(folder) @classmethod - def collect_public_keys(cls) -> Sequence["SopsKey"]: + def collect_public_keys(cls) -> list["SopsKey"]: return [ cls(pubkey=key, username="", key_type=key_type) for key_type in KeyType @@ -374,7 +374,7 @@ def get_user_name(flake_dir: Path, user: str) -> str: print(f"{flake_dir / user} already exists") -def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None: +def maybe_get_user(flake_dir: Path, keys: set[SopsKey]) -> set[SopsKey] | None: folder = sops_users_folder(flake_dir) if folder.exists(): @@ -382,9 +382,11 @@ def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None: if not (user / "key.json").exists(): continue - keys = read_keys(user) - if key in keys: - return {SopsKey(key.pubkey, user.name, key.key_type) for key in keys} + user_keys = read_keys(user) + if len(keys.intersection(user_keys)): + return { + SopsKey(key.pubkey, user.name, key.key_type) for key in user_keys + } return None @@ -397,39 +399,32 @@ def default_admin_private_key_path() -> Path: @API.register -def maybe_get_admin_public_key() -> SopsKey | None: +def maybe_get_admin_public_keys() -> list[SopsKey] | None: keyring = SopsKey.collect_public_keys() if len(keyring) == 0: return None - if len(keyring) > 1: - last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]] - msg = ( - f"Found {len(keyring)} public keys in your " - f"environment/system and cannot decide which one to " - f"use, first {len(last_3)}:\n\n" - f"- {'\n- '.join(last_3)}\n\n" - f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or " - f"SOPS_PGP_FP appropriately" - ) - raise ClanError(msg) - - return keyring[0] + return keyring def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]: - key = maybe_get_admin_public_key() + keys = maybe_get_admin_public_keys() - if not key: + if not keys: msg = "No SOPS key found. Please generate one with `clan secrets key generate`." raise ClanError(msg) - user_keys = maybe_get_user(flake_dir, key) + user_keys = maybe_get_user(flake_dir, set(keys)) if not user_keys: - # todo improve error message - msg = f"We could not figure out which Clan secrets user you are with the SOPS key we found: {key.pubkey}" + msg = ( + f"We could not figure out which Clan secrets user you are with the SOPS keys we found:\n" + f"- {'\n- '.join(f'{key.key_type.name.lower()}: {key.pubkey}' for key in keys)}\n\n" + f"Please ensure you have created a Clan secrets user and added one of your SOPS keys" + f"to that user.\n" + f"For more information, see: https://docs.clan.lol/guides/getting-started/secrets/#add-your-public-keys" + ) raise ClanError(msg) return user_keys diff --git a/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py b/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py index a190fec2b..e6d72ccaa 100644 --- a/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/clan_cli/tests/test_secrets_cli.py @@ -638,7 +638,7 @@ def test_secrets( with capture_output as output: cli.run(["secrets", "key", "show", "--flake", str(test_flake_with_core.path)]) - key = json.loads(output.out) + key = json.loads(output.out)[0] assert key["publickey"].startswith("age1") # Add testuser with the key that was generated for the clan cli.run( @@ -991,7 +991,7 @@ def test_secrets_key_generate_gpg( cli.run( ["secrets", "key", "show", "--flake", str(test_flake_with_core.path)] ) - key = json.loads(output.out) + key = json.loads(output.out)[0] assert key["type"] == "pgp" assert key["publickey"] == gpg_key.fingerprint diff --git a/pkgs/clan-cli/clan_cli/vars/keygen.py b/pkgs/clan-cli/clan_cli/vars/keygen.py index db0ceb97e..4f1bdd931 100644 --- a/pkgs/clan-cli/clan_cli/vars/keygen.py +++ b/pkgs/clan-cli/clan_cli/vars/keygen.py @@ -4,7 +4,7 @@ import os from pathlib import Path from clan_cli.secrets.key import generate_key -from clan_cli.secrets.sops import maybe_get_admin_public_key +from clan_cli.secrets.sops import maybe_get_admin_public_keys from clan_cli.secrets.users import add_user from clan_lib.api import API from clan_lib.errors import ClanError @@ -19,14 +19,14 @@ def keygen(flake_dir: Path, user: str | None = None, force: bool = False) -> Non if not user: msg = "No user provided and $USER is not set. Please provide a user via --user." raise ClanError(msg) - pub_key = maybe_get_admin_public_key() - if not pub_key: - pub_key = generate_key() + pub_keys = maybe_get_admin_public_keys() + if not pub_keys: + pub_keys = [generate_key()] # TODO set flake_dir=flake.path / "vars" add_user( flake_dir=flake_dir, name=user, - keys=[pub_key], + keys=pub_keys, force=force, ) diff --git a/pkgs/clan-cli/clan_lib/tests/test_create.py b/pkgs/clan-cli/clan_lib/tests/test_create.py index b46bbb879..245eb0cf1 100644 --- a/pkgs/clan-cli/clan_lib/tests/test_create.py +++ b/pkgs/clan-cli/clan_lib/tests/test_create.py @@ -11,7 +11,7 @@ import pytest from clan_cli.machines.create import CreateOptions as ClanCreateOptions from clan_cli.machines.create import create_machine from clan_cli.secrets.key import generate_key -from clan_cli.secrets.sops import maybe_get_admin_public_key +from clan_cli.secrets.sops import maybe_get_admin_public_keys from clan_cli.secrets.users import add_user from clan_cli.ssh.host_key import HostKeyCheck from clan_cli.vars.generate import generate_vars_for_machine, get_generators_closure @@ -159,10 +159,10 @@ def test_clan_create_api( fix_flake_inputs(dest_clan_dir, clan_core_dir) # ===== CREATE SOPS KEY ====== - sops_key = maybe_get_admin_public_key() - if sops_key is None: + sops_keys = maybe_get_admin_public_keys() + if sops_keys is None: # TODO: In the UI we need a view for this - sops_key = generate_key() + sops_keys = [generate_key()] else: msg = "SOPS key already exists, please remove it before running this test" raise ClanError(msg) @@ -171,7 +171,7 @@ def test_clan_create_api( add_user( dest_clan_dir, name="testuser", - keys=[sops_key], + keys=sops_keys, force=False, )