Draft: clan-cli: secrets: Add support for PGP keys with sops-nix
To use a PGP key instead of an age key you can set `SOPS_PGP_FP`. (You
can use `gpg -k --fingerprint --fingerprint` to get your PGP encryption
key fingerprint, remove spaces from it).
The internal manifest file already supported a type field, and so I built
from there.
With those changes, I was able to add my PGP key, and update all my
secrets with it, instead of the age key originally generated:
```
% clan secrets key show | jq
{
"key": "ADB6276965590A096004F6D1E114CBAE8FA29165",
"type": "pgp"
}
% clan secrets key update
% for s in $(clan secrets list) ; do clan secrets users add-secret kal-pgp-from-2022-12-to-2024-12 "$s"; done
% for s in $(clan secrets list) ; do clan secrets users remove-secret --debug kal "$s" ; done
```
This commit is contained in:
@@ -5,8 +5,9 @@ from pathlib import Path
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_files
|
||||
|
||||
from . import sops
|
||||
from .secrets import update_secrets
|
||||
from .sops import default_admin_key_path, generate_private_key, get_public_key
|
||||
from .sops import default_admin_key_path, generate_private_key
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -45,10 +46,6 @@ def generate_key() -> str:
|
||||
return pub_key
|
||||
|
||||
|
||||
def show_key() -> str:
|
||||
return get_public_key(default_admin_key_path().read_text())
|
||||
|
||||
|
||||
def generate_command(args: argparse.Namespace) -> None:
|
||||
pub_key = generate_key()
|
||||
log.info(
|
||||
@@ -57,7 +54,9 @@ def generate_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def show_command(args: argparse.Namespace) -> None:
|
||||
print(show_key())
|
||||
key, type = sops.maybe_get_public_key()
|
||||
type_or_null = f'"{type.name.lower()}"' if type else "null"
|
||||
print(f'{{"key": "{key}", "type": {type_or_null}}}')
|
||||
|
||||
|
||||
def update_command(args: argparse.Namespace) -> None:
|
||||
@@ -76,7 +75,7 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser_generate = subparser.add_parser("generate", help="generate age key")
|
||||
parser_generate.set_defaults(func=generate_command)
|
||||
|
||||
parser_show = subparser.add_parser("show", help="show age public key")
|
||||
parser_show = subparser.add_parser("show", help="show public key")
|
||||
parser_show.set_defaults(func=show_command)
|
||||
|
||||
parser_update = subparser.add_parser(
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import argparse
|
||||
import functools
|
||||
import getpass
|
||||
import operator
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
@@ -20,6 +22,7 @@ from clan_cli.completions import (
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_files
|
||||
|
||||
from . import sops
|
||||
from .folders import (
|
||||
list_objects,
|
||||
sops_groups_folder,
|
||||
@@ -42,13 +45,13 @@ def update_secrets(
|
||||
changed_files.extend(
|
||||
update_keys(
|
||||
secret_path,
|
||||
sorted(collect_keys_for_path(secret_path)),
|
||||
sorted_keys(collect_keys_for_path(secret_path)),
|
||||
)
|
||||
)
|
||||
return changed_files
|
||||
|
||||
|
||||
def collect_keys_for_type(folder: Path) -> set[str]:
|
||||
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
if not folder.exists():
|
||||
return set()
|
||||
keys = set()
|
||||
@@ -68,7 +71,7 @@ def collect_keys_for_type(folder: Path) -> set[str]:
|
||||
return keys
|
||||
|
||||
|
||||
def collect_keys_for_path(path: Path) -> set[str]:
|
||||
def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
keys = set()
|
||||
keys.update(collect_keys_for_type(path / "machines"))
|
||||
keys.update(collect_keys_for_type(path / "users"))
|
||||
@@ -132,8 +135,8 @@ def encrypt_secret(
|
||||
|
||||
recipient_keys = collect_keys_for_path(secret_path)
|
||||
|
||||
if key.pubkey not in recipient_keys:
|
||||
recipient_keys.add(key.pubkey)
|
||||
if (key.pubkey, key.type) not in recipient_keys:
|
||||
recipient_keys.add((key.pubkey, key.type))
|
||||
files_to_commit.extend(
|
||||
allow_member(
|
||||
users_folder(secret_path),
|
||||
@@ -144,7 +147,7 @@ def encrypt_secret(
|
||||
)
|
||||
|
||||
secret_path = secret_path / "secret"
|
||||
encrypt_file(secret_path, value, sorted(recipient_keys))
|
||||
encrypt_file(secret_path, value, sorted_keys(recipient_keys))
|
||||
files_to_commit.append(secret_path)
|
||||
if git_commit:
|
||||
commit_files(
|
||||
@@ -228,7 +231,7 @@ def allow_member(
|
||||
changed.extend(
|
||||
update_keys(
|
||||
group_folder.parent,
|
||||
sorted(collect_keys_for_path(group_folder.parent)),
|
||||
sorted_keys(collect_keys_for_path(group_folder.parent)),
|
||||
)
|
||||
)
|
||||
return changed
|
||||
@@ -255,10 +258,13 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
|
||||
group_folder.parent.rmdir()
|
||||
|
||||
return update_keys(
|
||||
target.parent.parent, sorted(collect_keys_for_path(group_folder.parent))
|
||||
target.parent.parent, sorted_keys(collect_keys_for_path(group_folder.parent))
|
||||
)
|
||||
|
||||
|
||||
sorted_keys = functools.partial(sorted, key=operator.itemgetter(0))
|
||||
|
||||
|
||||
def has_secret(secret_path: Path) -> bool:
|
||||
return (secret_path / "secret").exists()
|
||||
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
@@ -19,13 +22,25 @@ from clan_cli.nix import nix_shell
|
||||
from .folders import sops_machines_folder, sops_users_folder
|
||||
|
||||
|
||||
class KeyType(enum.Enum):
|
||||
AGE = enum.auto()
|
||||
PGP = enum.auto()
|
||||
|
||||
@classmethod
|
||||
def validate(cls, value: str | None) -> KeyType | None: # noqa: ANN102
|
||||
if value:
|
||||
return cls.__members__.get(value.upper())
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SopsKey:
|
||||
pubkey: str
|
||||
username: str
|
||||
type: KeyType
|
||||
|
||||
|
||||
def get_public_key(privkey: str) -> str:
|
||||
def get_public_age_key(privkey: str) -> str:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
|
||||
try:
|
||||
res = subprocess.run(
|
||||
@@ -78,8 +93,8 @@ def get_user_name(flake_dir: Path, user: str) -> str:
|
||||
print(f"{flake_dir / user} already exists")
|
||||
|
||||
|
||||
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
|
||||
key = SopsKey(pub_key, username="")
|
||||
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str, type: KeyType) -> SopsKey | None:
|
||||
key = SopsKey(pub_key, username="", type=type)
|
||||
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
|
||||
|
||||
for folder in folders:
|
||||
@@ -87,7 +102,7 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
|
||||
for user in folder.iterdir():
|
||||
if not (user / "key.json").exists():
|
||||
continue
|
||||
if read_key(user) == pub_key:
|
||||
if read_key(user) == (pub_key, type):
|
||||
key.username = user.name
|
||||
return key
|
||||
|
||||
@@ -95,8 +110,8 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
|
||||
|
||||
|
||||
@API.register
|
||||
def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
|
||||
key = maybe_get_user_or_machine(flake_dir, pub_key)
|
||||
def ensure_user_or_machine(flake_dir: Path, pub_key: str, key_type: KeyType) -> SopsKey:
|
||||
key = maybe_get_user_or_machine(flake_dir, pub_key, key_type)
|
||||
if not key:
|
||||
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
||||
raise ClanError(msg)
|
||||
@@ -111,43 +126,51 @@ def default_admin_key_path() -> Path:
|
||||
|
||||
|
||||
@API.register
|
||||
def maybe_get_admin_public_key() -> str | None:
|
||||
key = os.environ.get("SOPS_AGE_KEY")
|
||||
if key:
|
||||
return get_public_key(key)
|
||||
def maybe_get_admin_public_key() -> tuple[str, KeyType | None]:
|
||||
age_key = os.environ.get("SOPS_AGE_KEY")
|
||||
pgp_key = os.environ.get("SOPS_PGP_FP")
|
||||
if age_key and pgp_key:
|
||||
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
|
||||
raise ClanError(msg)
|
||||
if age_key:
|
||||
return get_public_age_key(age_key), KeyType.AGE
|
||||
if pgp_key:
|
||||
return pgp_key, KeyType.PGP
|
||||
|
||||
path = default_admin_key_path()
|
||||
if path.exists():
|
||||
return get_public_key(path.read_text())
|
||||
return get_public_age_key(path.read_text()), KeyType.AGE
|
||||
|
||||
return None
|
||||
return "", None
|
||||
|
||||
|
||||
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
|
||||
pub_key = maybe_get_admin_public_key()
|
||||
if pub_key:
|
||||
return maybe_get_user_or_machine(flake_dir, pub_key)
|
||||
pub_key, key_type = maybe_get_admin_public_key()
|
||||
if key_type:
|
||||
return maybe_get_user_or_machine(flake_dir, pub_key, key_type)
|
||||
return None
|
||||
|
||||
|
||||
def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
||||
pub_key = maybe_get_admin_public_key()
|
||||
if not pub_key:
|
||||
pub_key, key_type = maybe_get_admin_public_key()
|
||||
if not key_type:
|
||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
raise ClanError(msg)
|
||||
return ensure_user_or_machine(flake_dir, pub_key)
|
||||
return ensure_user_or_machine(flake_dir, pub_key, key_type)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def sops_manifest(keys: list[str]) -> Iterator[Path]:
|
||||
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
|
||||
all_keys = {type.lower(): [] for type in KeyType.__members__.keys()}
|
||||
for key, type in keys:
|
||||
all_keys[type.name.lower()].append(key)
|
||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||
json.dump(
|
||||
{"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2
|
||||
)
|
||||
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
|
||||
manifest.flush()
|
||||
yield Path(manifest.name)
|
||||
|
||||
|
||||
def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
|
||||
def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]:
|
||||
with sops_manifest(keys) as manifest:
|
||||
secret_path = secret_path / "secret"
|
||||
time_before = secret_path.stat().st_mtime
|
||||
@@ -171,7 +194,7 @@ def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
|
||||
def encrypt_file(
|
||||
secret_path: Path,
|
||||
content: IO[str] | str | bytes | None,
|
||||
pubkeys: list[str],
|
||||
pubkeys: list[tuple[str, KeyType]],
|
||||
) -> None:
|
||||
folder = secret_path.parent
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
@@ -237,7 +260,7 @@ def get_meta(secret_path: Path) -> dict:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def write_key(path: Path, publickey: str, overwrite: bool) -> None:
|
||||
def write_key(path: Path, publickey: str, type: KeyType, overwrite: bool) -> None:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
try:
|
||||
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
|
||||
@@ -248,21 +271,23 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None:
|
||||
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
|
||||
raise ClanError(msg) from e
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
|
||||
contents = {"publickey": publickey, "type": type.name.lower()}
|
||||
json.dump(contents, f, indent=2)
|
||||
|
||||
|
||||
def read_key(path: Path) -> str:
|
||||
def read_key(path: Path) -> tuple[str, KeyType]:
|
||||
with Path(path / "key.json").open() as f:
|
||||
try:
|
||||
key = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to decode {path.name}: {e}"
|
||||
raise ClanError(msg) from e
|
||||
if key["type"] != "age":
|
||||
msg = f"{path.name} is not an age key but {key['type']}. This is not supported"
|
||||
type = KeyType.validate(key.get("type"))
|
||||
if type is None:
|
||||
msg = f"Invalid key type in {path.name}: \"{type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
|
||||
raise ClanError(msg)
|
||||
publickey = key.get("publickey")
|
||||
if not publickey:
|
||||
msg = f"{path.name} does not contain a public key"
|
||||
raise ClanError(msg)
|
||||
return publickey
|
||||
return publickey, type
|
||||
|
||||
@@ -5,7 +5,7 @@ from pathlib import Path
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
from .sops import get_public_key
|
||||
from .sops import get_public_age_key
|
||||
|
||||
VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$")
|
||||
VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
|
||||
@@ -24,7 +24,7 @@ def public_or_private_age_key_type(arg_value: str) -> str:
|
||||
if arg_value.startswith("age1"):
|
||||
return arg_value.strip()
|
||||
if arg_value.startswith("AGE-SECRET-KEY-"):
|
||||
return get_public_key(arg_value)
|
||||
return get_public_age_key(arg_value)
|
||||
if not arg_value.startswith("age1"):
|
||||
msg = f"Please provide an age key starting with age1, got: '{arg_value}'"
|
||||
raise ClanError(msg)
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import argparse
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_files
|
||||
|
||||
from . import secrets
|
||||
from . import secrets, sops
|
||||
from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder
|
||||
from .secrets import update_secrets
|
||||
from .sops import read_key, write_key
|
||||
@@ -17,13 +18,19 @@ from .types import (
|
||||
)
|
||||
|
||||
|
||||
def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None:
|
||||
def add_user(
|
||||
flake_dir: Path,
|
||||
name: str,
|
||||
key: str,
|
||||
key_type: sops.KeyType,
|
||||
force: bool,
|
||||
) -> None:
|
||||
path = sops_users_folder(flake_dir) / name
|
||||
|
||||
def filter_user_secrets(secret: Path) -> bool:
|
||||
return secret.joinpath("users", name).exists()
|
||||
|
||||
write_key(path, key, force)
|
||||
write_key(path, key, key_type, overwrite=force)
|
||||
paths = [path]
|
||||
paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets))
|
||||
commit_files(
|
||||
@@ -42,7 +49,7 @@ def remove_user(flake_dir: Path, name: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def get_user(flake_dir: Path, name: str) -> str:
|
||||
def get_user(flake_dir: Path, name: str) -> tuple[str, sops.KeyType]:
|
||||
return read_key(sops_users_folder(flake_dir) / name)
|
||||
|
||||
|
||||
@@ -95,14 +102,18 @@ def add_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
add_user(args.flake.path, args.user, args.key, args.force)
|
||||
key_type = sops.KeyType.AGE if args.key_age else sops.KeyType.PGP
|
||||
key = args.key_age or args.key_pgp
|
||||
add_user(args.flake.path, args.user, key, key_type, args.force)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
print(get_user(args.flake.path, args.user))
|
||||
key, type = get_user(args.flake.path, args.user)
|
||||
type_or_null = f'"{type.name.lower()}"' if type else "null"
|
||||
print(f'{{"key": "{key}", "type": {type_or_null}}}')
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
@@ -141,13 +152,21 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
"-f", "--force", help="overwrite existing user", action="store_true"
|
||||
)
|
||||
add_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
||||
add_parser.add_argument(
|
||||
"key",
|
||||
help="public key or private key of the user."
|
||||
"Execute 'clan secrets key --help' on how to retrieve a key."
|
||||
key_type = add_parser.add_mutually_exclusive_group(required=True)
|
||||
key_type.add_argument(
|
||||
"--key-age",
|
||||
help="public or private age key of the user. "
|
||||
"Execute 'clan secrets key --help' on how to retrieve a key. "
|
||||
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
|
||||
type=public_or_private_age_key_type,
|
||||
)
|
||||
key_type.add_argument(
|
||||
"--key-pgp",
|
||||
help=(
|
||||
"public PGP encryption key of the user. "
|
||||
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
|
||||
),
|
||||
)
|
||||
add_parser.set_defaults(func=add_command)
|
||||
|
||||
get_parser = subparser.add_parser("get", help="get a user public key")
|
||||
|
||||
Reference in New Issue
Block a user