Merge pull request 'feat(clan-cli): support multiple keys for a user' (#3230) from feat/vars-multiple-user-secrets into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/3230
This commit is contained in:
Mic92
2025-04-09 13:05:24 +00:00
10 changed files with 426 additions and 112 deletions

View File

@@ -37,9 +37,9 @@ Also add your age public key to the repository with 'clan secrets users add YOUR
!!! note
It's safe to add any secrets created by the clan CLI and placed in your repository to version control systems like `git`.
### Add Your Public Key
### Add Your Public Key(s)
```bash
```console
clan secrets users add $USER --age-key <your_public_key>
```
@@ -54,3 +54,46 @@ sops/
└── key.json
```
If you followed the quickstart tutorial all necessary secrets are initialized at this point.
!!! note
You can add multiple age keys for a user by providing multiple `--age-key <your_public_key>` flags:
```console
clan secrets users add $USER \
--age-key <your_public_key_1> \
--age-key <your_public_key_2> \
...
```
### Manage Your Public Key(s)
You can list keys for your user with `clan secrets users get $USER`:
```console
bin/clan secrets users get alice
[
{
"publickey": "age1hrrcspp645qtlj29krjpq66pqg990ejaq0djcms6y6evnmgglv5sq0gewu",
"type": "age",
"username": "alice"
},
{
"publickey": "age13kh4083t3g4x3ktr52nav6h7sy8ynrnky2x58pyp96c5s5nvqytqgmrt79",
"type": "age",
"username": "alice"
}
]
```
To add a new key to your user:
```console
clan secrets users add-key $USER --age-key <your_public_key>
```
To remove a key from your user:
```console
clan secrets users remove-key $USER --age-key <your_public_key>
```

View File

@@ -25,7 +25,7 @@ from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None:
machine_path = sops_machines_folder(flake_dir) / name
write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
write_key(machine_path, sops.SopsKey(pubkey, "", sops.KeyType.AGE), overwrite=force)
paths = [machine_path]
filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name)
@@ -49,8 +49,8 @@ def remove_machine(flake_dir: Path, name: str) -> None:
def get_machine(flake_dir: Path, name: str) -> str:
key, _ = read_key(sops_machines_folder(flake_dir) / name)
return key
key = read_key(sops_machines_folder(flake_dir) / name)
return key.pubkey
def has_machine(flake_dir: Path, name: str) -> bool:

View File

@@ -30,7 +30,7 @@ from .folders import (
from .sops import (
decrypt_file,
encrypt_file,
read_key,
read_keys,
update_keys,
)
from .types import VALID_SECRET_NAME, secret_name_type
@@ -104,7 +104,7 @@ def cleanup_dangling_symlinks(folder: Path) -> list[Path]:
return removed
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
def collect_keys_for_type(folder: Path) -> set[sops.SopsKey]:
if not folder.exists():
return set()
keys = set()
@@ -122,11 +122,11 @@ def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
f"Expected {p} to point to {folder} but points to {target.parent}"
)
continue
keys.add(read_key(target))
keys.update(read_keys(target))
return keys
def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
def collect_keys_for_path(path: Path) -> set[sops.SopsKey]:
keys = set()
keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users"))
@@ -154,7 +154,16 @@ def encrypt_secret(
add_machines = []
if add_users is None:
add_users = []
key = sops.ensure_admin_public_key(flake_dir)
keys = sops.ensure_admin_public_key(flake_dir)
if not keys:
# todo double check the correct command to run
msg = "No keys found. Please run 'clan secrets add-key' to add a key."
raise ClanError(msg)
username = next(iter(keys)).username
recipient_keys = set()
# encrypt_secret can be called before the secret has been created
@@ -194,13 +203,14 @@ def encrypt_secret(
recipient_keys = collect_keys_for_path(secret_path)
if (key.pubkey, key.key_type) not in recipient_keys:
recipient_keys.add((key.pubkey, key.key_type))
if not keys.intersection(recipient_keys):
recipient_keys.update(keys)
files_to_commit.extend(
allow_member(
users_folder(secret_path),
sops_users_folder(flake_dir),
key.username,
username,
do_update_keys,
)
)

View File

@@ -10,7 +10,7 @@ from collections.abc import Iterable, Sequence
from contextlib import suppress
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import IO
from typing import IO, Any
from clan_cli.api import API
from clan_cli.cmd import Log, RunOpts, run
@@ -99,7 +99,7 @@ class KeyType(enum.Enum):
raise ClanError(msg)
@dataclasses.dataclass(frozen=True)
@dataclasses.dataclass(frozen=True, order=True)
class SopsKey:
pubkey: str
# Two SopsKey are considered equal even
@@ -115,11 +115,9 @@ class SopsKey:
}
@classmethod
def load_dir(cls, folder: Path) -> "SopsKey":
def load_dir(cls, folder: Path) -> set["SopsKey"]:
"""Load from the file named `keys.json` in the given directory."""
pubkey, key_type = read_key(folder)
username = ""
return cls(pubkey, username, key_type)
return read_keys(folder)
@classmethod
def collect_public_keys(cls) -> Sequence["SopsKey"]:
@@ -180,7 +178,7 @@ class Operation(enum.StrEnum):
def sops_run(
call: Operation,
secret_path: Path,
public_keys: Iterable[tuple[str, KeyType]],
public_keys: Iterable[SopsKey],
run_opts: RunOpts | None = None,
) -> tuple[int, str]:
"""Call the sops binary for the given operation."""
@@ -201,8 +199,8 @@ def sops_run(
keys_by_type: dict[KeyType, list[str]] = {}
keys_by_type = {key_type: [] for key_type in KeyType}
for key, key_type in public_keys:
keys_by_type[key_type].append(key)
for key in public_keys:
keys_by_type[key.key_type].append(key.pubkey)
it = keys_by_type.items()
key_groups = [{key_type.name.lower(): keys for key_type, keys in it}]
rules = {"creation_rules": [{"key_groups": key_groups}]}
@@ -299,7 +297,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
@@ -307,19 +305,35 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
this_pub_key, this_key_type = read_key(user)
if key.pubkey == this_pub_key and key.key_type == this_key_type:
return SopsKey(key.pubkey, user.name, key.key_type)
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type)}
return None
def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folder = sops_users_folder(flake_dir)
if folder.exists():
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type)}
return None
@API.register
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
maybe_key = maybe_get_user_or_machine(flake_dir, key)
if maybe_key:
return maybe_key
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey]:
maybe_keys = maybe_get_user_or_machine(flake_dir, key)
if maybe_keys:
return maybe_keys
msg = f"A sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
@@ -351,7 +365,7 @@ def maybe_get_admin_public_key() -> None | SopsKey:
return keyring[0]
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
def ensure_admin_public_key(flake_dir: Path) -> set[SopsKey]:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
@@ -359,7 +373,7 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
raise ClanError(msg)
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]:
secret_path = secret_path / "secret"
error_msg = f"Could not update keys for {secret_path}"
@@ -376,7 +390,7 @@ def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[
def encrypt_file(
secret_path: Path,
content: str | IO[bytes] | bytes | None,
pubkeys: list[tuple[str, KeyType]],
pubkeys: list[SopsKey],
) -> None:
folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True)
@@ -438,7 +452,7 @@ def encrypt_file(
def decrypt_file(secret_path: Path) -> str:
# decryption uses private keys from the environment or default paths:
no_public_keys_needed: list[tuple[str, KeyType]] = []
no_public_keys_needed: list[SopsKey] = []
_, stdout = sops_run(
Operation.DECRYPT,
@@ -475,34 +489,100 @@ def get_meta(secret_path: Path) -> dict:
return json.load(f)
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
def write_key(path: Path, key: SopsKey, overwrite: bool) -> None:
return write_keys(path, [key], overwrite)
def write_keys(path: Path, keys: Iterable[SopsKey], overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
if not overwrite:
flags |= os.O_EXCL
fd = os.open(path / "key.json", flags)
except FileExistsError as e:
except FileExistsError:
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
raise ClanError(msg) from e
raise ClanError(msg) from None
with os.fdopen(fd, "w") as f:
contents = {"publickey": publickey, "type": key_type.name.lower()}
contents = [
{"publickey": key.pubkey, "type": key.key_type.name.lower()}
for key in sorted(keys)
]
json.dump(contents, f, indent=2)
def read_key(path: Path) -> tuple[str, KeyType]:
with Path(path / "key.json").open() as f:
def append_keys(path: Path, keys: Iterable[SopsKey]) -> None:
path.mkdir(parents=True, exist_ok=True)
# key file must already exist
try:
key = json.load(f)
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
current_keys = set(read_keys(path))
except FileNotFoundError:
msg = f"{path} does not exist."
raise ClanError(msg) from None
# add the specified keys to the set
# de-duplication is natural
current_keys.update(keys)
# write the new key set
return write_keys(path, sorted(current_keys), overwrite=True)
def remove_keys(path: Path, keys: Iterable[SopsKey]) -> None:
path.mkdir(parents=True, exist_ok=True)
# key file must already exist
try:
current_keys = set(read_keys(path))
except FileNotFoundError:
msg = f"{path} does not exist."
raise ClanError(msg) from None
current_keys.difference_update(keys)
if not current_keys:
msg = f"No keys would remain in {path}. At least one key is required. Aborting."
raise ClanError(msg)
# write the new key set
return write_keys(path, sorted(current_keys), overwrite=True)
def parse_key(key: Any) -> SopsKey:
if not isinstance(key, dict):
msg = f"Expected a dict but {type(key)!r} was provided"
raise ClanError(msg)
key_type = KeyType.validate(key.get("type"))
if key_type is None:
msg = f'Invalid key type in {path.name}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
msg = f'Invalid key type in {key}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
raise ClanError(msg)
publickey = key.get("publickey")
if not publickey:
msg = f"{path.name} does not contain a public key"
msg = f"{key} does not contain a public key"
raise ClanError(msg)
return SopsKey(publickey, "", key_type)
def read_key(path: Path) -> SopsKey:
keys = read_keys(path)
if len(keys) != 1:
msg = f"Expected exactly one key but {len(keys)} were found"
raise ClanError(msg)
return next(iter(keys))
def read_keys(path: Path) -> set[SopsKey]:
with Path(path / "key.json").open() as f:
try:
keys = json.load(f)
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
if isinstance(keys, dict):
return {parse_key(keys)}
if isinstance(keys, list):
return set(map(parse_key, keys))
msg = f"Expected a dict or array but {type(keys)!r} was provided"
raise ClanError(msg)
return publickey, key_type

View File

@@ -2,6 +2,7 @@ import argparse
import json
import logging
import sys
from collections.abc import Iterable
from pathlib import Path
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
@@ -18,7 +19,7 @@ from .folders import (
sops_users_folder,
)
from .secrets import update_secrets
from .sops import read_key, write_key
from .sops import append_keys, read_keys, remove_keys, write_keys
from .types import (
VALID_USER_NAME,
public_or_private_age_key_type,
@@ -32,13 +33,12 @@ log = logging.getLogger(__name__)
def add_user(
flake_dir: Path,
name: str,
key: str,
key_type: sops.KeyType,
keys: Iterable[sops.SopsKey],
force: bool,
) -> None:
path = sops_users_folder(flake_dir) / name
write_key(path, key, key_type, overwrite=force)
write_keys(path, keys, overwrite=force)
updated_paths = [path]
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
@@ -74,9 +74,9 @@ def remove_user(flake_dir: Path, name: str) -> None:
commit_files(updated_paths, flake_dir, f"Remove user {name}")
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
key, key_type = read_key(sops_users_folder(flake_dir) / name)
return sops.SopsKey(key, name, key_type)
def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]:
keys = read_keys(sops_users_folder(flake_dir) / name)
return {sops.SopsKey(key.pubkey, name, key.key_type) for key in keys}
def list_users(flake_dir: Path) -> list[str]:
@@ -124,32 +124,82 @@ def list_command(args: argparse.Namespace) -> None:
print("\n".join(lst))
def add_user_key(
flake_dir: Path,
name: str,
keys: Iterable[sops.SopsKey],
) -> None:
path = sops_users_folder(flake_dir) / name
append_keys(path, keys)
updated_paths = [path]
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
commit_files(
updated_paths,
flake_dir,
f"Add key(s) for user {name} to secrets",
)
def remove_user_key(
flake_dir: Path,
name: str,
keys: Iterable[sops.SopsKey],
) -> None:
path = sops_users_folder(flake_dir) / name
remove_keys(path, keys)
updated_paths = [path]
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
commit_files(
updated_paths,
flake_dir,
f"Remove key(s) for user {name} from secrets",
)
def _key_args(args: argparse.Namespace) -> Iterable[sops.SopsKey]:
age_keys = args.age_key or []
pgp_keys = args.pgp_key or []
key_count = len(age_keys) + len(pgp_keys)
if args.agekey:
key_count += 1
if key_count == 0:
err_msg = (
"Please provide at least one key through `--pgp-key`, "
"`--age-key`, or as a positional (age key) argument."
)
raise ClanError(err_msg)
age_keys = [sops.SopsKey(key, "", sops.KeyType.AGE) for key in age_keys]
if args.agekey:
age_keys.append(sops.SopsKey(args.agekey, "", sops.KeyType.AGE))
pgp_keys = [sops.SopsKey(key, "", sops.KeyType.PGP) for key in pgp_keys]
return age_keys + pgp_keys
def add_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
keys_args = (args.age_key, args.agekey, args.pgp_key)
keys_count = sum(1 if key else 0 for key in keys_args)
if keys_count != 1:
err_msg = (
f"Please provide one key (got {keys_count}) through `--pgp-key`, "
f"`--age-key`, or as a positional (age key) argument."
)
raise ClanError(err_msg)
if args.age_key or args.agekey:
key_type = sops.KeyType.AGE
else:
key_type = sops.KeyType.PGP
key = args.agekey or args.age_key or args.pgp_key
add_user(args.flake.path, args.user, key, key_type, args.force)
add_user(args.flake.path, args.user, _key_args(args), args.force)
def get_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
key = get_user(args.flake.path, args.user)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
keys = get_user(args.flake.path, args.user)
json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True)
def remove_command(args: argparse.Namespace) -> None:
@@ -173,6 +223,22 @@ def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.flake.path, args.user, args.secret)
def add_key_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_user_key(args.flake.path, args.user, _key_args(args))
def remove_key_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
remove_user_key(args.flake.path, args.user, _key_args(args))
def register_users_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
@@ -188,30 +254,7 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
"-f", "--force", help="overwrite existing user", action="store_true"
)
add_parser.add_argument("user", help="the name of the user", type=user_name_type)
key_type = add_parser.add_mutually_exclusive_group(required=True)
key_type.add_argument(
"agekey",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
nargs="?",
)
key_type.add_argument(
"--age-key",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
)
key_type.add_argument(
"--pgp-key",
help=(
"public PGP encryption key of the user. "
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
),
)
_add_key_flags(add_parser)
add_parser.set_defaults(func=add_command)
get_parser = subparser.add_parser("get", help="get a user public key")
@@ -253,3 +296,52 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
)
add_dynamic_completer(remove_secrets_action, complete_secrets)
remove_secret_parser.set_defaults(func=remove_secret_command)
add_key_parser = subparser.add_parser(
"add-key", help="add one or more keys for a user"
)
add_key_user_action = add_key_parser.add_argument(
"user", help="the name of the user", type=user_name_type
)
add_dynamic_completer(add_key_user_action, complete_users)
_add_key_flags(add_key_parser)
add_key_parser.set_defaults(func=add_key_command)
remove_key_parser = subparser.add_parser(
"remove-key", help="remove one or more keys for a user"
)
remove_key_user_action = remove_key_parser.add_argument(
"user", help="the name of the user", type=user_name_type
)
add_dynamic_completer(remove_key_user_action, complete_users)
_add_key_flags(remove_key_parser)
remove_key_parser.set_defaults(func=remove_key_command)
def _add_key_flags(parser: argparse.ArgumentParser) -> None:
key_type = parser.add_mutually_exclusive_group(required=True)
key_type.add_argument(
"agekey",
help="public or private age key for a user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
nargs="?",
)
key_type.add_argument(
"--age-key",
help="public or private age key for a user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
action="append",
)
key_type.add_argument(
"--pgp-key",
help=(
"public PGP encryption key for a user. "
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
),
action="append",
)

View File

@@ -1,5 +1,7 @@
import dataclasses
import json
import os
from collections.abc import Iterable
from pathlib import Path
import pytest
@@ -7,18 +9,18 @@ from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.tests.helpers import cli
@dataclasses.dataclass(frozen=True)
class KeyPair:
def __init__(self, pubkey: str, privkey: str) -> None:
self.pubkey = pubkey
self.privkey = privkey
pubkey: str
privkey: str
class SopsSetup:
"""Hold a list of three key pairs and create an "admin" user in the clan.
"""Hold a list of key pairs and create an "admin" user in the clan.
The first key in the list is used as the admin key and
the private part of the key is exposed in the
`SOPS_AGE_KEY` environment variable, the two others can
`SOPS_AGE_KEY` environment variable, the others can
be used to add machines or other users.
"""
@@ -52,6 +54,14 @@ KEYS = [
"age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp",
"AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF",
),
KeyPair(
"age1n58rxm8y6h9prmwn0qk7nggfsu9f9j4u35dxg7akpkjd5vgsavssfzmq9y",
"AGE-SECRET-KEY-1YU2JVE445KT6S8UN3403NHH6EZU404RMEH9RTME9SPWXWMLJS0LQM5NWM7",
),
KeyPair(
"age1eyyhln9g3cdwtrwpckugvqgtf5p8ugt0426sw38ra3wkc0t4rfhslq7txv",
"AGE-SECRET-KEY-1567QKA63Y9P62SHF5TCHVCT5GZX2LZ8NS0E9RKA2QHDA662SF5LQ2VJJYX",
),
]
@@ -73,7 +83,7 @@ def sops_setup(
def assert_secrets_file_recipients(
flake_path: Path,
secret_name: str,
expected_age_recipients_keypairs: list["KeyPair"],
expected_age_recipients_keypairs: Iterable["KeyPair"],
err_msg: str | None = None,
) -> None:
"""Checks that the recipients of a secrets file matches expectations.

View File

@@ -59,7 +59,7 @@ def test_machine_delete(
) -> None:
flake = flake_with_sops
admin_key, machine_key, machine2_key = sops_setup.keys
admin_key, machine_key, machine2_key, *xs = sops_setup.keys
# create a couple machines with their keys
for name, key in (("my-machine", machine_key), ("my-machine2", machine2_key)):

View File

@@ -159,6 +159,86 @@ def test_users(
) -> None:
_test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
# some additional user-specific tests
admin_key = age_keys[2]
sops_folder = test_flake.path / "sops"
user_keys = {
"bob": [age_keys[0], age_keys[1]],
"alice": [age_keys[2]],
"charlie": [age_keys[3], age_keys[4]],
}
for user, keys in user_keys.items():
key_args = [f"--age-key={key.pubkey}" for key in keys]
# add the user keys
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
user,
*key_args,
]
)
assert (sops_folder / "users" / user / "key.json").exists()
# check they are returned in get
with capture_output as output:
cli.run(["secrets", "users", "get", "--flake", str(test_flake.path), user])
for key in keys:
assert key.pubkey in output.out
# set a secret
secret_name = f"{user}_secret"
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
"--user",
user,
secret_name,
]
)
# check the secret has each of our user's keys as a recipient
# in addition the admin key should be there
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_key, *keys],
)
if len(keys) == 1:
continue
# remove one of the keys
cli.run(
[
"secrets",
"users",
"remove-key",
"--flake",
str(test_flake.path),
user,
keys[0].pubkey,
]
)
# check the secret has been updated
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_key, *keys[1:]],
)
def test_machines(
test_flake: FlakeForTest,
@@ -786,7 +866,10 @@ def test_secrets_key_generate_gpg(
"testuser",
]
)
key = json.loads(output.out)
keys = json.loads(output.out)
assert len(keys) == 1
key = keys[0]
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key.fingerprint

View File

@@ -5,7 +5,7 @@ import os
from clan_cli.errors import ClanError
from clan_cli.flake import Flake
from clan_cli.secrets.key import generate_key
from clan_cli.secrets.sops import KeyType, maybe_get_admin_public_key
from clan_cli.secrets.sops import maybe_get_admin_public_key
from clan_cli.secrets.users import add_user
log = logging.getLogger(__name__)
@@ -24,8 +24,7 @@ def keygen(user: str | None, flake: Flake, force: bool) -> None:
add_user(
flake_dir=flake.path,
name=user,
key=pub_key.pubkey,
key_type=KeyType.AGE,
keys=[pub_key],
force=force,
)

View File

@@ -92,7 +92,7 @@ class SecretStore(StoreBase):
secret_path = self.secret_path(generator, secret_name)
recipient = sops.SopsKey.load_dir(key_dir)
recipients = sops.get_recipients(secret_path)
return recipient in recipients
return len(recipient.intersection(recipients)) > 0
def secret_path(self, generator: Generator, secret_name: str) -> Path:
return self.directory(generator, secret_name)
@@ -258,10 +258,7 @@ class SecretStore(StoreBase):
)
)
return {
sops.SopsKey(pubkey=key, username="", key_type=key_type)
for (key, key_type) in keys
}
return keys
# }
def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]: