create clan: better info about existing sop keys

When creating a new clan, the key selection now looks like this:
```
Found existing admin keys on this machine:
1: type: AGE
   pubkey: age1xyz...
   source: /home/grmpf/.config/sops/age/keys.txt
2: type: PGP
   pubkey: abc...
   source: SOPS_PGP_FP
Select keys to use (comma-separated list of numbers, or leave empty to select all):
```

This is achieved by adding a `source` attribute to `SopsKey`.
This commit is contained in:
DavHau
2025-07-23 13:19:18 +07:00
parent c94330ee9c
commit cc69892e3b
5 changed files with 61 additions and 18 deletions

View File

@@ -31,7 +31,9 @@ def generate_key() -> sops.SopsKey:
print( print(
f"Generated age private key at '{path}' for your user.\nPlease back it up on a secure location or you will lose access to your secrets." f"Generated age private key at '{path}' for your user.\nPlease back it up on a secure location or you will lose access to your secrets."
) )
return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE) return sops.SopsKey(
pub_key, username="", key_type=sops.KeyType.AGE, source=str(path)
)
def generate_command(args: argparse.Namespace) -> None: def generate_command(args: argparse.Namespace) -> None:

View File

@@ -26,7 +26,11 @@ from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None: def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None:
machine_path = sops_machines_folder(flake_dir) / name machine_path = sops_machines_folder(flake_dir) / name
write_key(machine_path, sops.SopsKey(pubkey, "", sops.KeyType.AGE), overwrite=force) write_key(
machine_path,
sops.SopsKey(pubkey, "", sops.KeyType.AGE, source=str(machine_path)),
overwrite=force,
)
paths = [machine_path] paths = [machine_path]
filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name) filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name)

View File

@@ -30,6 +30,9 @@ class KeyType(enum.Enum):
AGE = enum.auto() AGE = enum.auto()
PGP = enum.auto() PGP = enum.auto()
def __str__(self) -> str:
return self.name
@classmethod @classmethod
def validate(cls, value: str | None) -> "KeyType | None": def validate(cls, value: str | None) -> "KeyType | None":
if value: if value:
@@ -49,7 +52,7 @@ class KeyType(enum.Enum):
) )
raise ClanError(msg) raise ClanError(msg)
def collect_public_keys(self) -> list[str]: def collect_public_keys(self) -> list["SopsKey"]:
keyring = [] keyring = []
if self == self.AGE: if self == self.AGE:
@@ -66,7 +69,14 @@ class KeyType(enum.Enum):
f"in {key_path}: {public_key}" f"in {key_path}: {public_key}"
) )
keyring.append(public_key) keyring.append(
SopsKey(
pubkey=public_key,
username="",
key_type=self,
source=str(key_path),
)
)
except ClanError as e: except ClanError as e:
error_msg = f"Failed to read age keys from {key_path}" error_msg = f"Failed to read age keys from {key_path}"
raise ClanError(error_msg) from e raise ClanError(error_msg) from e
@@ -89,7 +99,14 @@ class KeyType(enum.Enum):
f"in the environment (SOPS_AGE_KEY): {public_key}" f"in the environment (SOPS_AGE_KEY): {public_key}"
) )
keyring.append(public_key) keyring.append(
SopsKey(
pubkey=public_key,
username="",
key_type=self,
source="SOPS_AGE_KEY",
)
)
except ClanError as e: except ClanError as e:
error_msg = "Failed to read age keys from SOPS_AGE_KEY" error_msg = "Failed to read age keys from SOPS_AGE_KEY"
raise ClanError(error_msg) from e raise ClanError(error_msg) from e
@@ -107,7 +124,11 @@ class KeyType(enum.Enum):
for fp in pgp_fingerprints.strip().split(","): for fp in pgp_fingerprints.strip().split(","):
msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}" msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}"
log.debug(msg) log.debug(msg)
keyring.append(fp) keyring.append(
SopsKey(
pubkey=fp, username="", key_type=self, source="SOPS_PGP_FP"
)
)
return keyring return keyring
msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys" msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys"
@@ -121,14 +142,19 @@ class SopsKey:
# if they don't have the same username: # if they don't have the same username:
username: str = dataclasses.field(compare=False) username: str = dataclasses.field(compare=False)
key_type: KeyType key_type: KeyType
source: str = dataclasses.field(compare=False)
def as_dict(self) -> dict[str, str]: def as_dict(self) -> dict[str, str]:
return { return {
"publickey": self.pubkey, "publickey": self.pubkey,
"username": self.username, "username": self.username,
"type": self.key_type.name.lower(), "type": self.key_type.name.lower(),
"source": self.source,
} }
def __str__(self) -> str:
return f"({self.key_type.name}) {self.pubkey} (source: {self.source})"
@classmethod @classmethod
def load_dir(cls, folder: Path) -> set["SopsKey"]: def load_dir(cls, folder: Path) -> set["SopsKey"]:
"""Load from the file named `keys.json` in the given directory.""" """Load from the file named `keys.json` in the given directory."""
@@ -136,11 +162,10 @@ class SopsKey:
@classmethod @classmethod
def collect_public_keys(cls) -> list["SopsKey"]: def collect_public_keys(cls) -> list["SopsKey"]:
return [ result = []
cls(pubkey=key, username="", key_type=key_type) for key_type in KeyType:
for key_type in KeyType result.extend(key_type.collect_public_keys())
for key in key_type.collect_public_keys() return result
]
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
@@ -384,7 +409,8 @@ def maybe_get_user(flake_dir: Path, keys: set[SopsKey]) -> set[SopsKey] | None:
user_keys = read_keys(user) user_keys = read_keys(user)
if len(keys.intersection(user_keys)): if len(keys.intersection(user_keys)):
return { return {
SopsKey(key.pubkey, user.name, key.key_type) for key in user_keys SopsKey(key.pubkey, user.name, key.key_type, key.source)
for key in user_keys
} }
return None return None
@@ -538,6 +564,7 @@ def get_recipients(secret_path: Path) -> set[SopsKey]:
pubkey=recipient[key_type.sops_recipient_attr], pubkey=recipient[key_type.sops_recipient_attr],
username="", username="",
key_type=key_type, key_type=key_type,
source="sops_file",
) )
) )
return keys return keys
@@ -623,7 +650,7 @@ def parse_key(key: Any) -> SopsKey:
if not publickey: if not publickey:
msg = f"{key} does not contain a public key" msg = f"{key} does not contain a public key"
raise ClanError(msg) raise ClanError(msg)
return SopsKey(publickey, "", key_type) return SopsKey(publickey, "", key_type, "key_file")
def read_key(path: Path) -> SopsKey: def read_key(path: Path) -> SopsKey:

View File

@@ -78,7 +78,9 @@ def remove_user(flake_dir: Path, name: str) -> None:
def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]: def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]:
keys = read_keys(sops_users_folder(flake_dir) / name) keys = read_keys(sops_users_folder(flake_dir) / name)
return {sops.SopsKey(key.pubkey, name, key.key_type) for key in keys} return {
sops.SopsKey(key.pubkey, name, key.key_type, source=key.source) for key in keys
}
def list_users(flake_dir: Path) -> list[str]: def list_users(flake_dir: Path) -> list[str]:
@@ -182,11 +184,17 @@ def _key_args(args: argparse.Namespace) -> Iterable[sops.SopsKey]:
) )
raise ClanError(err_msg) raise ClanError(err_msg)
age_keys = [sops.SopsKey(key, "", sops.KeyType.AGE) for key in age_keys] age_keys = [
sops.SopsKey(key, "", sops.KeyType.AGE, source="cmdline") for key in age_keys
]
if args.agekey: if args.agekey:
age_keys.append(sops.SopsKey(args.agekey, "", sops.KeyType.AGE)) age_keys.append(
sops.SopsKey(args.agekey, "", sops.KeyType.AGE, source="cmdline")
)
pgp_keys = [sops.SopsKey(key, "", sops.KeyType.PGP) for key in pgp_keys] pgp_keys = [
sops.SopsKey(key, "", sops.KeyType.PGP, source="cmdline") for key in pgp_keys
]
return age_keys + pgp_keys return age_keys + pgp_keys

View File

@@ -50,7 +50,9 @@ def _select_keys_interactive(pub_keys: list[SopsKey]) -> list[SopsKey]:
log.info("\nFound existing admin keys on this machine:") log.info("\nFound existing admin keys on this machine:")
selected_keys: list[SopsKey] = [] selected_keys: list[SopsKey] = []
for i, key in enumerate(pub_keys): for i, key in enumerate(pub_keys):
log.info(f"{i + 1}: {key}") log.info(
f"{i + 1}: type: {key.key_type}\n pubkey: {key.pubkey}\n source: {key.source}"
)
while not selected_keys: while not selected_keys:
choice = input( choice = input(
"Select keys to use (comma-separated list of numbers, or leave empty to select all): " "Select keys to use (comma-separated list of numbers, or leave empty to select all): "