feat(clan-cli): support multiple keys for a user

This commit is contained in:
Brian McGee
2025-04-02 16:08:42 +01:00
committed by Mic92
parent 7e5505bd71
commit ed5fc20069
10 changed files with 426 additions and 112 deletions

View File

@@ -25,7 +25,7 @@ from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None:
machine_path = sops_machines_folder(flake_dir) / name
write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
write_key(machine_path, sops.SopsKey(pubkey, "", sops.KeyType.AGE), overwrite=force)
paths = [machine_path]
filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name)
@@ -49,8 +49,8 @@ def remove_machine(flake_dir: Path, name: str) -> None:
def get_machine(flake_dir: Path, name: str) -> str:
key, _ = read_key(sops_machines_folder(flake_dir) / name)
return key
key = read_key(sops_machines_folder(flake_dir) / name)
return key.pubkey
def has_machine(flake_dir: Path, name: str) -> bool:

View File

@@ -30,7 +30,7 @@ from .folders import (
from .sops import (
decrypt_file,
encrypt_file,
read_key,
read_keys,
update_keys,
)
from .types import VALID_SECRET_NAME, secret_name_type
@@ -104,7 +104,7 @@ def cleanup_dangling_symlinks(folder: Path) -> list[Path]:
return removed
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
def collect_keys_for_type(folder: Path) -> set[sops.SopsKey]:
if not folder.exists():
return set()
keys = set()
@@ -122,11 +122,11 @@ def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
f"Expected {p} to point to {folder} but points to {target.parent}"
)
continue
keys.add(read_key(target))
keys.update(read_keys(target))
return keys
def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
def collect_keys_for_path(path: Path) -> set[sops.SopsKey]:
keys = set()
keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users"))
@@ -154,7 +154,16 @@ def encrypt_secret(
add_machines = []
if add_users is None:
add_users = []
key = sops.ensure_admin_public_key(flake_dir)
keys = sops.ensure_admin_public_key(flake_dir)
if not keys:
# todo double check the correct command to run
msg = "No keys found. Please run 'clan secrets add-key' to add a key."
raise ClanError(msg)
username = next(iter(keys)).username
recipient_keys = set()
# encrypt_secret can be called before the secret has been created
@@ -194,13 +203,14 @@ def encrypt_secret(
recipient_keys = collect_keys_for_path(secret_path)
if (key.pubkey, key.key_type) not in recipient_keys:
recipient_keys.add((key.pubkey, key.key_type))
if not keys.intersection(recipient_keys):
recipient_keys.update(keys)
files_to_commit.extend(
allow_member(
users_folder(secret_path),
sops_users_folder(flake_dir),
key.username,
username,
do_update_keys,
)
)

View File

@@ -10,7 +10,7 @@ from collections.abc import Iterable, Sequence
from contextlib import suppress
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import IO
from typing import IO, Any
from clan_cli.api import API
from clan_cli.cmd import Log, RunOpts, run
@@ -99,7 +99,7 @@ class KeyType(enum.Enum):
raise ClanError(msg)
@dataclasses.dataclass(frozen=True)
@dataclasses.dataclass(frozen=True, order=True)
class SopsKey:
pubkey: str
# Two SopsKey are considered equal even
@@ -115,11 +115,9 @@ class SopsKey:
}
@classmethod
def load_dir(cls, folder: Path) -> "SopsKey":
def load_dir(cls, folder: Path) -> set["SopsKey"]:
"""Load from the file named `keys.json` in the given directory."""
pubkey, key_type = read_key(folder)
username = ""
return cls(pubkey, username, key_type)
return read_keys(folder)
@classmethod
def collect_public_keys(cls) -> Sequence["SopsKey"]:
@@ -180,7 +178,7 @@ class Operation(enum.StrEnum):
def sops_run(
call: Operation,
secret_path: Path,
public_keys: Iterable[tuple[str, KeyType]],
public_keys: Iterable[SopsKey],
run_opts: RunOpts | None = None,
) -> tuple[int, str]:
"""Call the sops binary for the given operation."""
@@ -201,8 +199,8 @@ def sops_run(
keys_by_type: dict[KeyType, list[str]] = {}
keys_by_type = {key_type: [] for key_type in KeyType}
for key, key_type in public_keys:
keys_by_type[key_type].append(key)
for key in public_keys:
keys_by_type[key.key_type].append(key.pubkey)
it = keys_by_type.items()
key_groups = [{key_type.name.lower(): keys for key_type, keys in it}]
rules = {"creation_rules": [{"key_groups": key_groups}]}
@@ -299,7 +297,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
@@ -307,19 +305,35 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
this_pub_key, this_key_type = read_key(user)
if key.pubkey == this_pub_key and key.key_type == this_key_type:
return SopsKey(key.pubkey, user.name, key.key_type)
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type)}
return None
def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folder = sops_users_folder(flake_dir)
if folder.exists():
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type)}
return None
@API.register
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
maybe_key = maybe_get_user_or_machine(flake_dir, key)
if maybe_key:
return maybe_key
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey]:
maybe_keys = maybe_get_user_or_machine(flake_dir, key)
if maybe_keys:
return maybe_keys
msg = f"A sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
@@ -351,7 +365,7 @@ def maybe_get_admin_public_key() -> None | SopsKey:
return keyring[0]
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
def ensure_admin_public_key(flake_dir: Path) -> set[SopsKey]:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
@@ -359,7 +373,7 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
raise ClanError(msg)
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]:
secret_path = secret_path / "secret"
error_msg = f"Could not update keys for {secret_path}"
@@ -376,7 +390,7 @@ def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[
def encrypt_file(
secret_path: Path,
content: str | IO[bytes] | bytes | None,
pubkeys: list[tuple[str, KeyType]],
pubkeys: list[SopsKey],
) -> None:
folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True)
@@ -438,7 +452,7 @@ def encrypt_file(
def decrypt_file(secret_path: Path) -> str:
# decryption uses private keys from the environment or default paths:
no_public_keys_needed: list[tuple[str, KeyType]] = []
no_public_keys_needed: list[SopsKey] = []
_, stdout = sops_run(
Operation.DECRYPT,
@@ -475,34 +489,100 @@ def get_meta(secret_path: Path) -> dict:
return json.load(f)
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
def write_key(path: Path, key: SopsKey, overwrite: bool) -> None:
return write_keys(path, [key], overwrite)
def write_keys(path: Path, keys: Iterable[SopsKey], overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
if not overwrite:
flags |= os.O_EXCL
fd = os.open(path / "key.json", flags)
except FileExistsError as e:
except FileExistsError:
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
raise ClanError(msg) from e
raise ClanError(msg) from None
with os.fdopen(fd, "w") as f:
contents = {"publickey": publickey, "type": key_type.name.lower()}
contents = [
{"publickey": key.pubkey, "type": key.key_type.name.lower()}
for key in sorted(keys)
]
json.dump(contents, f, indent=2)
def read_key(path: Path) -> tuple[str, KeyType]:
with Path(path / "key.json").open() as f:
try:
key = json.load(f)
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
def append_keys(path: Path, keys: Iterable[SopsKey]) -> None:
path.mkdir(parents=True, exist_ok=True)
# key file must already exist
try:
current_keys = set(read_keys(path))
except FileNotFoundError:
msg = f"{path} does not exist."
raise ClanError(msg) from None
# add the specified keys to the set
# de-duplication is natural
current_keys.update(keys)
# write the new key set
return write_keys(path, sorted(current_keys), overwrite=True)
def remove_keys(path: Path, keys: Iterable[SopsKey]) -> None:
path.mkdir(parents=True, exist_ok=True)
# key file must already exist
try:
current_keys = set(read_keys(path))
except FileNotFoundError:
msg = f"{path} does not exist."
raise ClanError(msg) from None
current_keys.difference_update(keys)
if not current_keys:
msg = f"No keys would remain in {path}. At least one key is required. Aborting."
raise ClanError(msg)
# write the new key set
return write_keys(path, sorted(current_keys), overwrite=True)
def parse_key(key: Any) -> SopsKey:
if not isinstance(key, dict):
msg = f"Expected a dict but {type(key)!r} was provided"
raise ClanError(msg)
key_type = KeyType.validate(key.get("type"))
if key_type is None:
msg = f'Invalid key type in {path.name}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
msg = f'Invalid key type in {key}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
raise ClanError(msg)
publickey = key.get("publickey")
if not publickey:
msg = f"{path.name} does not contain a public key"
msg = f"{key} does not contain a public key"
raise ClanError(msg)
return publickey, key_type
return SopsKey(publickey, "", key_type)
def read_key(path: Path) -> SopsKey:
keys = read_keys(path)
if len(keys) != 1:
msg = f"Expected exactly one key but {len(keys)} were found"
raise ClanError(msg)
return next(iter(keys))
def read_keys(path: Path) -> set[SopsKey]:
with Path(path / "key.json").open() as f:
try:
keys = json.load(f)
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
if isinstance(keys, dict):
return {parse_key(keys)}
if isinstance(keys, list):
return set(map(parse_key, keys))
msg = f"Expected a dict or array but {type(keys)!r} was provided"
raise ClanError(msg)

View File

@@ -2,6 +2,7 @@ import argparse
import json
import logging
import sys
from collections.abc import Iterable
from pathlib import Path
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
@@ -18,7 +19,7 @@ from .folders import (
sops_users_folder,
)
from .secrets import update_secrets
from .sops import read_key, write_key
from .sops import append_keys, read_keys, remove_keys, write_keys
from .types import (
VALID_USER_NAME,
public_or_private_age_key_type,
@@ -32,13 +33,12 @@ log = logging.getLogger(__name__)
def add_user(
flake_dir: Path,
name: str,
key: str,
key_type: sops.KeyType,
keys: Iterable[sops.SopsKey],
force: bool,
) -> None:
path = sops_users_folder(flake_dir) / name
write_key(path, key, key_type, overwrite=force)
write_keys(path, keys, overwrite=force)
updated_paths = [path]
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
@@ -74,9 +74,9 @@ def remove_user(flake_dir: Path, name: str) -> None:
commit_files(updated_paths, flake_dir, f"Remove user {name}")
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
key, key_type = read_key(sops_users_folder(flake_dir) / name)
return sops.SopsKey(key, name, key_type)
def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]:
keys = read_keys(sops_users_folder(flake_dir) / name)
return {sops.SopsKey(key.pubkey, name, key.key_type) for key in keys}
def list_users(flake_dir: Path) -> list[str]:
@@ -124,32 +124,82 @@ def list_command(args: argparse.Namespace) -> None:
print("\n".join(lst))
def add_user_key(
flake_dir: Path,
name: str,
keys: Iterable[sops.SopsKey],
) -> None:
path = sops_users_folder(flake_dir) / name
append_keys(path, keys)
updated_paths = [path]
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
commit_files(
updated_paths,
flake_dir,
f"Add key(s) for user {name} to secrets",
)
def remove_user_key(
flake_dir: Path,
name: str,
keys: Iterable[sops.SopsKey],
) -> None:
path = sops_users_folder(flake_dir) / name
remove_keys(path, keys)
updated_paths = [path]
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
commit_files(
updated_paths,
flake_dir,
f"Remove key(s) for user {name} from secrets",
)
def _key_args(args: argparse.Namespace) -> Iterable[sops.SopsKey]:
age_keys = args.age_key or []
pgp_keys = args.pgp_key or []
key_count = len(age_keys) + len(pgp_keys)
if args.agekey:
key_count += 1
if key_count == 0:
err_msg = (
"Please provide at least one key through `--pgp-key`, "
"`--age-key`, or as a positional (age key) argument."
)
raise ClanError(err_msg)
age_keys = [sops.SopsKey(key, "", sops.KeyType.AGE) for key in age_keys]
if args.agekey:
age_keys.append(sops.SopsKey(args.agekey, "", sops.KeyType.AGE))
pgp_keys = [sops.SopsKey(key, "", sops.KeyType.PGP) for key in pgp_keys]
return age_keys + pgp_keys
def add_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
keys_args = (args.age_key, args.agekey, args.pgp_key)
keys_count = sum(1 if key else 0 for key in keys_args)
if keys_count != 1:
err_msg = (
f"Please provide one key (got {keys_count}) through `--pgp-key`, "
f"`--age-key`, or as a positional (age key) argument."
)
raise ClanError(err_msg)
if args.age_key or args.agekey:
key_type = sops.KeyType.AGE
else:
key_type = sops.KeyType.PGP
key = args.agekey or args.age_key or args.pgp_key
add_user(args.flake.path, args.user, key, key_type, args.force)
add_user(args.flake.path, args.user, _key_args(args), args.force)
def get_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
key = get_user(args.flake.path, args.user)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
keys = get_user(args.flake.path, args.user)
json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True)
def remove_command(args: argparse.Namespace) -> None:
@@ -173,6 +223,22 @@ def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.flake.path, args.user, args.secret)
def add_key_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_user_key(args.flake.path, args.user, _key_args(args))
def remove_key_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
remove_user_key(args.flake.path, args.user, _key_args(args))
def register_users_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
@@ -188,30 +254,7 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
"-f", "--force", help="overwrite existing user", action="store_true"
)
add_parser.add_argument("user", help="the name of the user", type=user_name_type)
key_type = add_parser.add_mutually_exclusive_group(required=True)
key_type.add_argument(
"agekey",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
nargs="?",
)
key_type.add_argument(
"--age-key",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
)
key_type.add_argument(
"--pgp-key",
help=(
"public PGP encryption key of the user. "
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
),
)
_add_key_flags(add_parser)
add_parser.set_defaults(func=add_command)
get_parser = subparser.add_parser("get", help="get a user public key")
@@ -253,3 +296,52 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
)
add_dynamic_completer(remove_secrets_action, complete_secrets)
remove_secret_parser.set_defaults(func=remove_secret_command)
add_key_parser = subparser.add_parser(
"add-key", help="add one or more keys for a user"
)
add_key_user_action = add_key_parser.add_argument(
"user", help="the name of the user", type=user_name_type
)
add_dynamic_completer(add_key_user_action, complete_users)
_add_key_flags(add_key_parser)
add_key_parser.set_defaults(func=add_key_command)
remove_key_parser = subparser.add_parser(
"remove-key", help="remove one or more keys for a user"
)
remove_key_user_action = remove_key_parser.add_argument(
"user", help="the name of the user", type=user_name_type
)
add_dynamic_completer(remove_key_user_action, complete_users)
_add_key_flags(remove_key_parser)
remove_key_parser.set_defaults(func=remove_key_command)
def _add_key_flags(parser: argparse.ArgumentParser) -> None:
key_type = parser.add_mutually_exclusive_group(required=True)
key_type.add_argument(
"agekey",
help="public or private age key for a user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
nargs="?",
)
key_type.add_argument(
"--age-key",
help="public or private age key for a user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
action="append",
)
key_type.add_argument(
"--pgp-key",
help=(
"public PGP encryption key for a user. "
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
),
action="append",
)