diff --git a/pkgs/clan-cli/clan_cli/secrets/key.py b/pkgs/clan-cli/clan_cli/secrets/key.py index e35fd5d94..c57b12c42 100644 --- a/pkgs/clan-cli/clan_cli/secrets/key.py +++ b/pkgs/clan-cli/clan_cli/secrets/key.py @@ -31,7 +31,9 @@ def generate_key() -> sops.SopsKey: print( f"Generated age private key at '{path}' for your user.\nPlease back it up on a secure location or you will lose access to your secrets." ) - return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE) + return sops.SopsKey( + pub_key, username="", key_type=sops.KeyType.AGE, source=str(path) + ) def generate_command(args: argparse.Namespace) -> None: diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py index 5bbe489d3..dca8e7b79 100644 --- a/pkgs/clan-cli/clan_cli/secrets/machines.py +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -26,7 +26,11 @@ from .types import public_or_private_age_key_type, secret_name_type def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None: machine_path = sops_machines_folder(flake_dir) / name - write_key(machine_path, sops.SopsKey(pubkey, "", sops.KeyType.AGE), overwrite=force) + write_key( + machine_path, + sops.SopsKey(pubkey, "", sops.KeyType.AGE, source=str(machine_path)), + overwrite=force, + ) paths = [machine_path] filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index 3f962f258..d2f28bfca 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -30,6 +30,9 @@ class KeyType(enum.Enum): AGE = enum.auto() PGP = enum.auto() + def __str__(self) -> str: + return self.name + @classmethod def validate(cls, value: str | None) -> "KeyType | None": if value: @@ -49,7 +52,7 @@ class KeyType(enum.Enum): ) raise ClanError(msg) - def collect_public_keys(self) -> list[str]: + def collect_public_keys(self) -> list["SopsKey"]: keyring = [] if self == self.AGE: @@ -66,7 +69,14 @@ class KeyType(enum.Enum): f"in {key_path}: {public_key}" ) - keyring.append(public_key) + keyring.append( + SopsKey( + pubkey=public_key, + username="", + key_type=self, + source=str(key_path), + ) + ) except ClanError as e: error_msg = f"Failed to read age keys from {key_path}" raise ClanError(error_msg) from e @@ -89,7 +99,14 @@ class KeyType(enum.Enum): f"in the environment (SOPS_AGE_KEY): {public_key}" ) - keyring.append(public_key) + keyring.append( + SopsKey( + pubkey=public_key, + username="", + key_type=self, + source="SOPS_AGE_KEY", + ) + ) except ClanError as e: error_msg = "Failed to read age keys from SOPS_AGE_KEY" raise ClanError(error_msg) from e @@ -107,7 +124,11 @@ class KeyType(enum.Enum): for fp in pgp_fingerprints.strip().split(","): msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}" log.debug(msg) - keyring.append(fp) + keyring.append( + SopsKey( + pubkey=fp, username="", key_type=self, source="SOPS_PGP_FP" + ) + ) return keyring msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys" @@ -121,14 +142,19 @@ class SopsKey: # if they don't have the same username: username: str = dataclasses.field(compare=False) key_type: KeyType + source: str = dataclasses.field(compare=False) def as_dict(self) -> dict[str, str]: return { "publickey": self.pubkey, "username": self.username, "type": self.key_type.name.lower(), + "source": self.source, } + def __str__(self) -> str: + return f"({self.key_type.name}) {self.pubkey} (source: {self.source})" + @classmethod def load_dir(cls, folder: Path) -> set["SopsKey"]: """Load from the file named `keys.json` in the given directory.""" @@ -136,11 +162,10 @@ class SopsKey: @classmethod def collect_public_keys(cls) -> list["SopsKey"]: - return [ - cls(pubkey=key, username="", key_type=key_type) - for key_type in KeyType - for key in key_type.collect_public_keys() - ] + result = [] + for key_type in KeyType: + result.extend(key_type.collect_public_keys()) + return result class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go @@ -384,7 +409,8 @@ def maybe_get_user(flake_dir: Path, keys: set[SopsKey]) -> set[SopsKey] | None: user_keys = read_keys(user) if len(keys.intersection(user_keys)): return { - SopsKey(key.pubkey, user.name, key.key_type) for key in user_keys + SopsKey(key.pubkey, user.name, key.key_type, key.source) + for key in user_keys } return None @@ -538,6 +564,7 @@ def get_recipients(secret_path: Path) -> set[SopsKey]: pubkey=recipient[key_type.sops_recipient_attr], username="", key_type=key_type, + source="sops_file", ) ) return keys @@ -623,7 +650,7 @@ def parse_key(key: Any) -> SopsKey: if not publickey: msg = f"{key} does not contain a public key" raise ClanError(msg) - return SopsKey(publickey, "", key_type) + return SopsKey(publickey, "", key_type, "key_file") def read_key(path: Path) -> SopsKey: diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py index ce341d1eb..3358ee462 100644 --- a/pkgs/clan-cli/clan_cli/secrets/users.py +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -78,7 +78,9 @@ def remove_user(flake_dir: Path, name: str) -> None: def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]: keys = read_keys(sops_users_folder(flake_dir) / name) - return {sops.SopsKey(key.pubkey, name, key.key_type) for key in keys} + return { + sops.SopsKey(key.pubkey, name, key.key_type, source=key.source) for key in keys + } def list_users(flake_dir: Path) -> list[str]: @@ -182,11 +184,17 @@ def _key_args(args: argparse.Namespace) -> Iterable[sops.SopsKey]: ) raise ClanError(err_msg) - age_keys = [sops.SopsKey(key, "", sops.KeyType.AGE) for key in age_keys] + age_keys = [ + sops.SopsKey(key, "", sops.KeyType.AGE, source="cmdline") for key in age_keys + ] if args.agekey: - age_keys.append(sops.SopsKey(args.agekey, "", sops.KeyType.AGE)) + age_keys.append( + sops.SopsKey(args.agekey, "", sops.KeyType.AGE, source="cmdline") + ) - pgp_keys = [sops.SopsKey(key, "", sops.KeyType.PGP) for key in pgp_keys] + pgp_keys = [ + sops.SopsKey(key, "", sops.KeyType.PGP, source="cmdline") for key in pgp_keys + ] return age_keys + pgp_keys diff --git a/pkgs/clan-cli/clan_cli/vars/keygen.py b/pkgs/clan-cli/clan_cli/vars/keygen.py index e293570d5..c969ff73d 100644 --- a/pkgs/clan-cli/clan_cli/vars/keygen.py +++ b/pkgs/clan-cli/clan_cli/vars/keygen.py @@ -50,7 +50,9 @@ def _select_keys_interactive(pub_keys: list[SopsKey]) -> list[SopsKey]: log.info("\nFound existing admin keys on this machine:") selected_keys: list[SopsKey] = [] for i, key in enumerate(pub_keys): - log.info(f"{i + 1}: {key}") + log.info( + f"{i + 1}: type: {key.key_type}\n pubkey: {key.pubkey}\n source: {key.source}" + ) while not selected_keys: choice = input( "Select keys to use (comma-separated list of numbers, or leave empty to select all): "