Merge pull request 'feat(clan-cli): support multiple keys for a user' (#3230) from feat/vars-multiple-user-secrets into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/3230
This commit is contained in:
@@ -37,9 +37,9 @@ Also add your age public key to the repository with 'clan secrets users add YOUR
|
||||
!!! note
|
||||
It's safe to add any secrets created by the clan CLI and placed in your repository to version control systems like `git`.
|
||||
|
||||
### Add Your Public Key
|
||||
### Add Your Public Key(s)
|
||||
|
||||
```bash
|
||||
```console
|
||||
clan secrets users add $USER --age-key <your_public_key>
|
||||
```
|
||||
|
||||
@@ -54,3 +54,46 @@ sops/
|
||||
└── key.json
|
||||
```
|
||||
If you followed the quickstart tutorial all necessary secrets are initialized at this point.
|
||||
|
||||
!!! note
|
||||
You can add multiple age keys for a user by providing multiple `--age-key <your_public_key>` flags:
|
||||
|
||||
```console
|
||||
clan secrets users add $USER \
|
||||
--age-key <your_public_key_1> \
|
||||
--age-key <your_public_key_2> \
|
||||
...
|
||||
```
|
||||
|
||||
### Manage Your Public Key(s)
|
||||
|
||||
You can list keys for your user with `clan secrets users get $USER`:
|
||||
|
||||
```console
|
||||
❯ bin/clan secrets users get alice
|
||||
|
||||
[
|
||||
{
|
||||
"publickey": "age1hrrcspp645qtlj29krjpq66pqg990ejaq0djcms6y6evnmgglv5sq0gewu",
|
||||
"type": "age",
|
||||
"username": "alice"
|
||||
},
|
||||
{
|
||||
"publickey": "age13kh4083t3g4x3ktr52nav6h7sy8ynrnky2x58pyp96c5s5nvqytqgmrt79",
|
||||
"type": "age",
|
||||
"username": "alice"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
To add a new key to your user:
|
||||
|
||||
```console
|
||||
clan secrets users add-key $USER --age-key <your_public_key>
|
||||
```
|
||||
|
||||
To remove a key from your user:
|
||||
|
||||
```console
|
||||
clan secrets users remove-key $USER --age-key <your_public_key>
|
||||
```
|
||||
@@ -25,7 +25,7 @@ from .types import public_or_private_age_key_type, secret_name_type
|
||||
|
||||
def add_machine(flake_dir: Path, name: str, pubkey: str, force: bool) -> None:
|
||||
machine_path = sops_machines_folder(flake_dir) / name
|
||||
write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
|
||||
write_key(machine_path, sops.SopsKey(pubkey, "", sops.KeyType.AGE), overwrite=force)
|
||||
paths = [machine_path]
|
||||
|
||||
filter_machine_secrets = get_secrets_filter_for_machine(flake_dir, name)
|
||||
@@ -49,8 +49,8 @@ def remove_machine(flake_dir: Path, name: str) -> None:
|
||||
|
||||
|
||||
def get_machine(flake_dir: Path, name: str) -> str:
|
||||
key, _ = read_key(sops_machines_folder(flake_dir) / name)
|
||||
return key
|
||||
key = read_key(sops_machines_folder(flake_dir) / name)
|
||||
return key.pubkey
|
||||
|
||||
|
||||
def has_machine(flake_dir: Path, name: str) -> bool:
|
||||
|
||||
@@ -30,7 +30,7 @@ from .folders import (
|
||||
from .sops import (
|
||||
decrypt_file,
|
||||
encrypt_file,
|
||||
read_key,
|
||||
read_keys,
|
||||
update_keys,
|
||||
)
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
@@ -104,7 +104,7 @@ def cleanup_dangling_symlinks(folder: Path) -> list[Path]:
|
||||
return removed
|
||||
|
||||
|
||||
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
def collect_keys_for_type(folder: Path) -> set[sops.SopsKey]:
|
||||
if not folder.exists():
|
||||
return set()
|
||||
keys = set()
|
||||
@@ -122,11 +122,11 @@ def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
f"Expected {p} to point to {folder} but points to {target.parent}"
|
||||
)
|
||||
continue
|
||||
keys.add(read_key(target))
|
||||
keys.update(read_keys(target))
|
||||
return keys
|
||||
|
||||
|
||||
def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
def collect_keys_for_path(path: Path) -> set[sops.SopsKey]:
|
||||
keys = set()
|
||||
keys.update(collect_keys_for_type(path / "machines"))
|
||||
keys.update(collect_keys_for_type(path / "users"))
|
||||
@@ -154,7 +154,16 @@ def encrypt_secret(
|
||||
add_machines = []
|
||||
if add_users is None:
|
||||
add_users = []
|
||||
key = sops.ensure_admin_public_key(flake_dir)
|
||||
|
||||
keys = sops.ensure_admin_public_key(flake_dir)
|
||||
|
||||
if not keys:
|
||||
# todo double check the correct command to run
|
||||
msg = "No keys found. Please run 'clan secrets add-key' to add a key."
|
||||
raise ClanError(msg)
|
||||
|
||||
username = next(iter(keys)).username
|
||||
|
||||
recipient_keys = set()
|
||||
|
||||
# encrypt_secret can be called before the secret has been created
|
||||
@@ -194,13 +203,14 @@ def encrypt_secret(
|
||||
|
||||
recipient_keys = collect_keys_for_path(secret_path)
|
||||
|
||||
if (key.pubkey, key.key_type) not in recipient_keys:
|
||||
recipient_keys.add((key.pubkey, key.key_type))
|
||||
if not keys.intersection(recipient_keys):
|
||||
recipient_keys.update(keys)
|
||||
|
||||
files_to_commit.extend(
|
||||
allow_member(
|
||||
users_folder(secret_path),
|
||||
sops_users_folder(flake_dir),
|
||||
key.username,
|
||||
username,
|
||||
do_update_keys,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -10,7 +10,7 @@ from collections.abc import Iterable, Sequence
|
||||
from contextlib import suppress
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO
|
||||
from typing import IO, Any
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
@@ -99,7 +99,7 @@ class KeyType(enum.Enum):
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
@dataclasses.dataclass(frozen=True, order=True)
|
||||
class SopsKey:
|
||||
pubkey: str
|
||||
# Two SopsKey are considered equal even
|
||||
@@ -115,11 +115,9 @@ class SopsKey:
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def load_dir(cls, folder: Path) -> "SopsKey":
|
||||
def load_dir(cls, folder: Path) -> set["SopsKey"]:
|
||||
"""Load from the file named `keys.json` in the given directory."""
|
||||
pubkey, key_type = read_key(folder)
|
||||
username = ""
|
||||
return cls(pubkey, username, key_type)
|
||||
return read_keys(folder)
|
||||
|
||||
@classmethod
|
||||
def collect_public_keys(cls) -> Sequence["SopsKey"]:
|
||||
@@ -180,7 +178,7 @@ class Operation(enum.StrEnum):
|
||||
def sops_run(
|
||||
call: Operation,
|
||||
secret_path: Path,
|
||||
public_keys: Iterable[tuple[str, KeyType]],
|
||||
public_keys: Iterable[SopsKey],
|
||||
run_opts: RunOpts | None = None,
|
||||
) -> tuple[int, str]:
|
||||
"""Call the sops binary for the given operation."""
|
||||
@@ -201,8 +199,8 @@ def sops_run(
|
||||
|
||||
keys_by_type: dict[KeyType, list[str]] = {}
|
||||
keys_by_type = {key_type: [] for key_type in KeyType}
|
||||
for key, key_type in public_keys:
|
||||
keys_by_type[key_type].append(key)
|
||||
for key in public_keys:
|
||||
keys_by_type[key.key_type].append(key.pubkey)
|
||||
it = keys_by_type.items()
|
||||
key_groups = [{key_type.name.lower(): keys for key_type, keys in it}]
|
||||
rules = {"creation_rules": [{"key_groups": key_groups}]}
|
||||
@@ -299,7 +297,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
|
||||
print(f"{flake_dir / user} already exists")
|
||||
|
||||
|
||||
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
|
||||
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
|
||||
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
|
||||
|
||||
for folder in folders:
|
||||
@@ -307,19 +305,35 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
|
||||
for user in folder.iterdir():
|
||||
if not (user / "key.json").exists():
|
||||
continue
|
||||
this_pub_key, this_key_type = read_key(user)
|
||||
if key.pubkey == this_pub_key and key.key_type == this_key_type:
|
||||
return SopsKey(key.pubkey, user.name, key.key_type)
|
||||
|
||||
keys = read_keys(user)
|
||||
if key in keys:
|
||||
return {SopsKey(key.pubkey, user.name, key.key_type)}
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
|
||||
folder = sops_users_folder(flake_dir)
|
||||
|
||||
if folder.exists():
|
||||
for user in folder.iterdir():
|
||||
if not (user / "key.json").exists():
|
||||
continue
|
||||
|
||||
keys = read_keys(user)
|
||||
if key in keys:
|
||||
return {SopsKey(key.pubkey, user.name, key.key_type)}
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@API.register
|
||||
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
|
||||
maybe_key = maybe_get_user_or_machine(flake_dir, key)
|
||||
if maybe_key:
|
||||
return maybe_key
|
||||
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
|
||||
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey]:
|
||||
maybe_keys = maybe_get_user_or_machine(flake_dir, key)
|
||||
if maybe_keys:
|
||||
return maybe_keys
|
||||
msg = f"A sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@@ -351,7 +365,7 @@ def maybe_get_admin_public_key() -> None | SopsKey:
|
||||
return keyring[0]
|
||||
|
||||
|
||||
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||
def ensure_admin_public_key(flake_dir: Path) -> set[SopsKey]:
|
||||
key = maybe_get_admin_public_key()
|
||||
if key:
|
||||
return ensure_user_or_machine(flake_dir, key)
|
||||
@@ -359,7 +373,7 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
|
||||
def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]:
|
||||
secret_path = secret_path / "secret"
|
||||
error_msg = f"Could not update keys for {secret_path}"
|
||||
|
||||
@@ -376,7 +390,7 @@ def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[
|
||||
def encrypt_file(
|
||||
secret_path: Path,
|
||||
content: str | IO[bytes] | bytes | None,
|
||||
pubkeys: list[tuple[str, KeyType]],
|
||||
pubkeys: list[SopsKey],
|
||||
) -> None:
|
||||
folder = secret_path.parent
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
@@ -438,7 +452,7 @@ def encrypt_file(
|
||||
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
# decryption uses private keys from the environment or default paths:
|
||||
no_public_keys_needed: list[tuple[str, KeyType]] = []
|
||||
no_public_keys_needed: list[SopsKey] = []
|
||||
|
||||
_, stdout = sops_run(
|
||||
Operation.DECRYPT,
|
||||
@@ -475,34 +489,100 @@ def get_meta(secret_path: Path) -> dict:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
|
||||
def write_key(path: Path, key: SopsKey, overwrite: bool) -> None:
|
||||
return write_keys(path, [key], overwrite)
|
||||
|
||||
|
||||
def write_keys(path: Path, keys: Iterable[SopsKey], overwrite: bool) -> None:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
|
||||
if not overwrite:
|
||||
flags |= os.O_EXCL
|
||||
fd = os.open(path / "key.json", flags)
|
||||
except FileExistsError as e:
|
||||
except FileExistsError:
|
||||
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
|
||||
raise ClanError(msg) from e
|
||||
raise ClanError(msg) from None
|
||||
with os.fdopen(fd, "w") as f:
|
||||
contents = {"publickey": publickey, "type": key_type.name.lower()}
|
||||
contents = [
|
||||
{"publickey": key.pubkey, "type": key.key_type.name.lower()}
|
||||
for key in sorted(keys)
|
||||
]
|
||||
json.dump(contents, f, indent=2)
|
||||
|
||||
|
||||
def read_key(path: Path) -> tuple[str, KeyType]:
|
||||
with Path(path / "key.json").open() as f:
|
||||
try:
|
||||
key = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to decode {path.name}: {e}"
|
||||
raise ClanError(msg) from e
|
||||
def append_keys(path: Path, keys: Iterable[SopsKey]) -> None:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# key file must already exist
|
||||
try:
|
||||
current_keys = set(read_keys(path))
|
||||
except FileNotFoundError:
|
||||
msg = f"{path} does not exist."
|
||||
raise ClanError(msg) from None
|
||||
|
||||
# add the specified keys to the set
|
||||
# de-duplication is natural
|
||||
current_keys.update(keys)
|
||||
|
||||
# write the new key set
|
||||
return write_keys(path, sorted(current_keys), overwrite=True)
|
||||
|
||||
|
||||
def remove_keys(path: Path, keys: Iterable[SopsKey]) -> None:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# key file must already exist
|
||||
try:
|
||||
current_keys = set(read_keys(path))
|
||||
except FileNotFoundError:
|
||||
msg = f"{path} does not exist."
|
||||
raise ClanError(msg) from None
|
||||
|
||||
current_keys.difference_update(keys)
|
||||
|
||||
if not current_keys:
|
||||
msg = f"No keys would remain in {path}. At least one key is required. Aborting."
|
||||
raise ClanError(msg)
|
||||
|
||||
# write the new key set
|
||||
return write_keys(path, sorted(current_keys), overwrite=True)
|
||||
|
||||
|
||||
def parse_key(key: Any) -> SopsKey:
|
||||
if not isinstance(key, dict):
|
||||
msg = f"Expected a dict but {type(key)!r} was provided"
|
||||
raise ClanError(msg)
|
||||
key_type = KeyType.validate(key.get("type"))
|
||||
if key_type is None:
|
||||
msg = f'Invalid key type in {path.name}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
|
||||
msg = f'Invalid key type in {key}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
|
||||
raise ClanError(msg)
|
||||
publickey = key.get("publickey")
|
||||
if not publickey:
|
||||
msg = f"{path.name} does not contain a public key"
|
||||
msg = f"{key} does not contain a public key"
|
||||
raise ClanError(msg)
|
||||
return publickey, key_type
|
||||
return SopsKey(publickey, "", key_type)
|
||||
|
||||
|
||||
def read_key(path: Path) -> SopsKey:
|
||||
keys = read_keys(path)
|
||||
if len(keys) != 1:
|
||||
msg = f"Expected exactly one key but {len(keys)} were found"
|
||||
raise ClanError(msg)
|
||||
return next(iter(keys))
|
||||
|
||||
|
||||
def read_keys(path: Path) -> set[SopsKey]:
|
||||
with Path(path / "key.json").open() as f:
|
||||
try:
|
||||
keys = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to decode {path.name}: {e}"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
if isinstance(keys, dict):
|
||||
return {parse_key(keys)}
|
||||
if isinstance(keys, list):
|
||||
return set(map(parse_key, keys))
|
||||
msg = f"Expected a dict or array but {type(keys)!r} was provided"
|
||||
raise ClanError(msg)
|
||||
|
||||
@@ -2,6 +2,7 @@ import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
|
||||
@@ -18,7 +19,7 @@ from .folders import (
|
||||
sops_users_folder,
|
||||
)
|
||||
from .secrets import update_secrets
|
||||
from .sops import read_key, write_key
|
||||
from .sops import append_keys, read_keys, remove_keys, write_keys
|
||||
from .types import (
|
||||
VALID_USER_NAME,
|
||||
public_or_private_age_key_type,
|
||||
@@ -32,13 +33,12 @@ log = logging.getLogger(__name__)
|
||||
def add_user(
|
||||
flake_dir: Path,
|
||||
name: str,
|
||||
key: str,
|
||||
key_type: sops.KeyType,
|
||||
keys: Iterable[sops.SopsKey],
|
||||
force: bool,
|
||||
) -> None:
|
||||
path = sops_users_folder(flake_dir) / name
|
||||
|
||||
write_key(path, key, key_type, overwrite=force)
|
||||
write_keys(path, keys, overwrite=force)
|
||||
updated_paths = [path]
|
||||
|
||||
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
|
||||
@@ -74,9 +74,9 @@ def remove_user(flake_dir: Path, name: str) -> None:
|
||||
commit_files(updated_paths, flake_dir, f"Remove user {name}")
|
||||
|
||||
|
||||
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
|
||||
key, key_type = read_key(sops_users_folder(flake_dir) / name)
|
||||
return sops.SopsKey(key, name, key_type)
|
||||
def get_user(flake_dir: Path, name: str) -> set[sops.SopsKey]:
|
||||
keys = read_keys(sops_users_folder(flake_dir) / name)
|
||||
return {sops.SopsKey(key.pubkey, name, key.key_type) for key in keys}
|
||||
|
||||
|
||||
def list_users(flake_dir: Path) -> list[str]:
|
||||
@@ -124,32 +124,82 @@ def list_command(args: argparse.Namespace) -> None:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def add_user_key(
|
||||
flake_dir: Path,
|
||||
name: str,
|
||||
keys: Iterable[sops.SopsKey],
|
||||
) -> None:
|
||||
path = sops_users_folder(flake_dir) / name
|
||||
|
||||
append_keys(path, keys)
|
||||
updated_paths = [path]
|
||||
|
||||
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
|
||||
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
|
||||
commit_files(
|
||||
updated_paths,
|
||||
flake_dir,
|
||||
f"Add key(s) for user {name} to secrets",
|
||||
)
|
||||
|
||||
|
||||
def remove_user_key(
|
||||
flake_dir: Path,
|
||||
name: str,
|
||||
keys: Iterable[sops.SopsKey],
|
||||
) -> None:
|
||||
path = sops_users_folder(flake_dir) / name
|
||||
|
||||
remove_keys(path, keys)
|
||||
updated_paths = [path]
|
||||
|
||||
filter_user_secrets = get_secrets_filter_for_user(flake_dir, name)
|
||||
updated_paths.extend(update_secrets(flake_dir, filter_user_secrets))
|
||||
commit_files(
|
||||
updated_paths,
|
||||
flake_dir,
|
||||
f"Remove key(s) for user {name} from secrets",
|
||||
)
|
||||
|
||||
|
||||
def _key_args(args: argparse.Namespace) -> Iterable[sops.SopsKey]:
|
||||
age_keys = args.age_key or []
|
||||
pgp_keys = args.pgp_key or []
|
||||
|
||||
key_count = len(age_keys) + len(pgp_keys)
|
||||
if args.agekey:
|
||||
key_count += 1
|
||||
|
||||
if key_count == 0:
|
||||
err_msg = (
|
||||
"Please provide at least one key through `--pgp-key`, "
|
||||
"`--age-key`, or as a positional (age key) argument."
|
||||
)
|
||||
raise ClanError(err_msg)
|
||||
|
||||
age_keys = [sops.SopsKey(key, "", sops.KeyType.AGE) for key in age_keys]
|
||||
if args.agekey:
|
||||
age_keys.append(sops.SopsKey(args.agekey, "", sops.KeyType.AGE))
|
||||
|
||||
pgp_keys = [sops.SopsKey(key, "", sops.KeyType.PGP) for key in pgp_keys]
|
||||
|
||||
return age_keys + pgp_keys
|
||||
|
||||
|
||||
def add_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
keys_args = (args.age_key, args.agekey, args.pgp_key)
|
||||
keys_count = sum(1 if key else 0 for key in keys_args)
|
||||
if keys_count != 1:
|
||||
err_msg = (
|
||||
f"Please provide one key (got {keys_count}) through `--pgp-key`, "
|
||||
f"`--age-key`, or as a positional (age key) argument."
|
||||
)
|
||||
raise ClanError(err_msg)
|
||||
if args.age_key or args.agekey:
|
||||
key_type = sops.KeyType.AGE
|
||||
else:
|
||||
key_type = sops.KeyType.PGP
|
||||
key = args.agekey or args.age_key or args.pgp_key
|
||||
add_user(args.flake.path, args.user, key, key_type, args.force)
|
||||
|
||||
add_user(args.flake.path, args.user, _key_args(args), args.force)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
key = get_user(args.flake.path, args.user)
|
||||
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
|
||||
keys = get_user(args.flake.path, args.user)
|
||||
json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True)
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
@@ -173,6 +223,22 @@ def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
remove_secret(args.flake.path, args.user, args.secret)
|
||||
|
||||
|
||||
def add_key_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
|
||||
add_user_key(args.flake.path, args.user, _key_args(args))
|
||||
|
||||
|
||||
def remove_key_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
|
||||
remove_user_key(args.flake.path, args.user, _key_args(args))
|
||||
|
||||
|
||||
def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
subparser = parser.add_subparsers(
|
||||
title="command",
|
||||
@@ -188,30 +254,7 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
"-f", "--force", help="overwrite existing user", action="store_true"
|
||||
)
|
||||
add_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
||||
key_type = add_parser.add_mutually_exclusive_group(required=True)
|
||||
key_type.add_argument(
|
||||
"agekey",
|
||||
help="public or private age key of the user. "
|
||||
"Execute 'clan secrets key --help' on how to retrieve a key. "
|
||||
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
|
||||
type=public_or_private_age_key_type,
|
||||
nargs="?",
|
||||
)
|
||||
key_type.add_argument(
|
||||
"--age-key",
|
||||
help="public or private age key of the user. "
|
||||
"Execute 'clan secrets key --help' on how to retrieve a key. "
|
||||
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
|
||||
type=public_or_private_age_key_type,
|
||||
)
|
||||
key_type.add_argument(
|
||||
"--pgp-key",
|
||||
help=(
|
||||
"public PGP encryption key of the user. "
|
||||
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
|
||||
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
|
||||
),
|
||||
)
|
||||
_add_key_flags(add_parser)
|
||||
add_parser.set_defaults(func=add_command)
|
||||
|
||||
get_parser = subparser.add_parser("get", help="get a user public key")
|
||||
@@ -253,3 +296,52 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
)
|
||||
add_dynamic_completer(remove_secrets_action, complete_secrets)
|
||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||
|
||||
add_key_parser = subparser.add_parser(
|
||||
"add-key", help="add one or more keys for a user"
|
||||
)
|
||||
add_key_user_action = add_key_parser.add_argument(
|
||||
"user", help="the name of the user", type=user_name_type
|
||||
)
|
||||
add_dynamic_completer(add_key_user_action, complete_users)
|
||||
_add_key_flags(add_key_parser)
|
||||
add_key_parser.set_defaults(func=add_key_command)
|
||||
|
||||
remove_key_parser = subparser.add_parser(
|
||||
"remove-key", help="remove one or more keys for a user"
|
||||
)
|
||||
remove_key_user_action = remove_key_parser.add_argument(
|
||||
"user", help="the name of the user", type=user_name_type
|
||||
)
|
||||
add_dynamic_completer(remove_key_user_action, complete_users)
|
||||
_add_key_flags(remove_key_parser)
|
||||
remove_key_parser.set_defaults(func=remove_key_command)
|
||||
|
||||
|
||||
def _add_key_flags(parser: argparse.ArgumentParser) -> None:
|
||||
key_type = parser.add_mutually_exclusive_group(required=True)
|
||||
key_type.add_argument(
|
||||
"agekey",
|
||||
help="public or private age key for a user. "
|
||||
"Execute 'clan secrets key --help' on how to retrieve a key. "
|
||||
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
|
||||
type=public_or_private_age_key_type,
|
||||
nargs="?",
|
||||
)
|
||||
key_type.add_argument(
|
||||
"--age-key",
|
||||
help="public or private age key for a user. "
|
||||
"Execute 'clan secrets key --help' on how to retrieve a key. "
|
||||
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
|
||||
type=public_or_private_age_key_type,
|
||||
action="append",
|
||||
)
|
||||
key_type.add_argument(
|
||||
"--pgp-key",
|
||||
help=(
|
||||
"public PGP encryption key for a user. "
|
||||
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
|
||||
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
|
||||
),
|
||||
action="append",
|
||||
)
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import dataclasses
|
||||
import json
|
||||
import os
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
@@ -7,18 +9,18 @@ from clan_cli.secrets.folders import sops_secrets_folder
|
||||
from clan_cli.tests.helpers import cli
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class KeyPair:
|
||||
def __init__(self, pubkey: str, privkey: str) -> None:
|
||||
self.pubkey = pubkey
|
||||
self.privkey = privkey
|
||||
pubkey: str
|
||||
privkey: str
|
||||
|
||||
|
||||
class SopsSetup:
|
||||
"""Hold a list of three key pairs and create an "admin" user in the clan.
|
||||
"""Hold a list of key pairs and create an "admin" user in the clan.
|
||||
|
||||
The first key in the list is used as the admin key and
|
||||
the private part of the key is exposed in the
|
||||
`SOPS_AGE_KEY` environment variable, the two others can
|
||||
`SOPS_AGE_KEY` environment variable, the others can
|
||||
be used to add machines or other users.
|
||||
"""
|
||||
|
||||
@@ -52,6 +54,14 @@ KEYS = [
|
||||
"age1dhuh9xtefhgpr2sjjf7gmp9q2pr37z92rv4wsadxuqdx48989g7qj552qp",
|
||||
"AGE-SECRET-KEY-169N3FT32VNYQ9WYJMLUSVTMA0TTZGVJF7YZWS8AHTWJ5RR9VGR7QCD8SKF",
|
||||
),
|
||||
KeyPair(
|
||||
"age1n58rxm8y6h9prmwn0qk7nggfsu9f9j4u35dxg7akpkjd5vgsavssfzmq9y",
|
||||
"AGE-SECRET-KEY-1YU2JVE445KT6S8UN3403NHH6EZU404RMEH9RTME9SPWXWMLJS0LQM5NWM7",
|
||||
),
|
||||
KeyPair(
|
||||
"age1eyyhln9g3cdwtrwpckugvqgtf5p8ugt0426sw38ra3wkc0t4rfhslq7txv",
|
||||
"AGE-SECRET-KEY-1567QKA63Y9P62SHF5TCHVCT5GZX2LZ8NS0E9RKA2QHDA662SF5LQ2VJJYX",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@@ -73,7 +83,7 @@ def sops_setup(
|
||||
def assert_secrets_file_recipients(
|
||||
flake_path: Path,
|
||||
secret_name: str,
|
||||
expected_age_recipients_keypairs: list["KeyPair"],
|
||||
expected_age_recipients_keypairs: Iterable["KeyPair"],
|
||||
err_msg: str | None = None,
|
||||
) -> None:
|
||||
"""Checks that the recipients of a secrets file matches expectations.
|
||||
|
||||
@@ -59,7 +59,7 @@ def test_machine_delete(
|
||||
) -> None:
|
||||
flake = flake_with_sops
|
||||
|
||||
admin_key, machine_key, machine2_key = sops_setup.keys
|
||||
admin_key, machine_key, machine2_key, *xs = sops_setup.keys
|
||||
|
||||
# create a couple machines with their keys
|
||||
for name, key in (("my-machine", machine_key), ("my-machine2", machine2_key)):
|
||||
|
||||
@@ -159,6 +159,86 @@ def test_users(
|
||||
) -> None:
|
||||
_test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
|
||||
|
||||
# some additional user-specific tests
|
||||
|
||||
admin_key = age_keys[2]
|
||||
sops_folder = test_flake.path / "sops"
|
||||
|
||||
user_keys = {
|
||||
"bob": [age_keys[0], age_keys[1]],
|
||||
"alice": [age_keys[2]],
|
||||
"charlie": [age_keys[3], age_keys[4]],
|
||||
}
|
||||
|
||||
for user, keys in user_keys.items():
|
||||
key_args = [f"--age-key={key.pubkey}" for key in keys]
|
||||
|
||||
# add the user keys
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"users",
|
||||
"add",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
user,
|
||||
*key_args,
|
||||
]
|
||||
)
|
||||
assert (sops_folder / "users" / user / "key.json").exists()
|
||||
|
||||
# check they are returned in get
|
||||
with capture_output as output:
|
||||
cli.run(["secrets", "users", "get", "--flake", str(test_flake.path), user])
|
||||
|
||||
for key in keys:
|
||||
assert key.pubkey in output.out
|
||||
|
||||
# set a secret
|
||||
secret_name = f"{user}_secret"
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"set",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"--user",
|
||||
user,
|
||||
secret_name,
|
||||
]
|
||||
)
|
||||
|
||||
# check the secret has each of our user's keys as a recipient
|
||||
# in addition the admin key should be there
|
||||
assert_secrets_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[admin_key, *keys],
|
||||
)
|
||||
|
||||
if len(keys) == 1:
|
||||
continue
|
||||
|
||||
# remove one of the keys
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"users",
|
||||
"remove-key",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
user,
|
||||
keys[0].pubkey,
|
||||
]
|
||||
)
|
||||
|
||||
# check the secret has been updated
|
||||
assert_secrets_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[admin_key, *keys[1:]],
|
||||
)
|
||||
|
||||
|
||||
def test_machines(
|
||||
test_flake: FlakeForTest,
|
||||
@@ -786,7 +866,10 @@ def test_secrets_key_generate_gpg(
|
||||
"testuser",
|
||||
]
|
||||
)
|
||||
key = json.loads(output.out)
|
||||
keys = json.loads(output.out)
|
||||
assert len(keys) == 1
|
||||
|
||||
key = keys[0]
|
||||
assert key["type"] == "pgp"
|
||||
assert key["publickey"] == gpg_key.fingerprint
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import os
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.flake import Flake
|
||||
from clan_cli.secrets.key import generate_key
|
||||
from clan_cli.secrets.sops import KeyType, maybe_get_admin_public_key
|
||||
from clan_cli.secrets.sops import maybe_get_admin_public_key
|
||||
from clan_cli.secrets.users import add_user
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -24,8 +24,7 @@ def keygen(user: str | None, flake: Flake, force: bool) -> None:
|
||||
add_user(
|
||||
flake_dir=flake.path,
|
||||
name=user,
|
||||
key=pub_key.pubkey,
|
||||
key_type=KeyType.AGE,
|
||||
keys=[pub_key],
|
||||
force=force,
|
||||
)
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ class SecretStore(StoreBase):
|
||||
secret_path = self.secret_path(generator, secret_name)
|
||||
recipient = sops.SopsKey.load_dir(key_dir)
|
||||
recipients = sops.get_recipients(secret_path)
|
||||
return recipient in recipients
|
||||
return len(recipient.intersection(recipients)) > 0
|
||||
|
||||
def secret_path(self, generator: Generator, secret_name: str) -> Path:
|
||||
return self.directory(generator, secret_name)
|
||||
@@ -258,10 +258,7 @@ class SecretStore(StoreBase):
|
||||
)
|
||||
)
|
||||
|
||||
return {
|
||||
sops.SopsKey(pubkey=key, username="", key_type=key_type)
|
||||
for (key, key_type) in keys
|
||||
}
|
||||
return keys
|
||||
|
||||
# }
|
||||
def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]:
|
||||
|
||||
Reference in New Issue
Block a user