Merge pull request 'clan-cli: secrets: Add support for PGP keys with sops-nix' (#2186) from lopter/clan-core:lo-sops-nix-pgp-support into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2186
This commit is contained in:
Mic92
2024-10-04 15:36:30 +00:00
11 changed files with 361 additions and 131 deletions

View File

@@ -32,6 +32,7 @@ Note: This module assumes the presence of other modules and classes such as `Cla
import dataclasses import dataclasses
import json import json
from dataclasses import dataclass, fields, is_dataclass from dataclasses import dataclass, fields, is_dataclass
from enum import Enum
from pathlib import Path from pathlib import Path
from types import UnionType from types import UnionType
from typing import ( from typing import (
@@ -179,25 +180,31 @@ def construct_value(
# Nested types # Nested types
# list # list
# dict # dict
if get_origin(t) is list: origin = get_origin(t)
if origin is list:
if not isinstance(field_value, list): if not isinstance(field_value, list):
msg = f"Expected list, got {field_value}" msg = f"Expected list, got {field_value}"
raise ClanError(msg, location=f"{loc}") raise ClanError(msg, location=f"{loc}")
return [construct_value(get_args(t)[0], item) for item in field_value] return [construct_value(get_args(t)[0], item) for item in field_value]
if get_origin(t) is dict and isinstance(field_value, dict): if origin is dict and isinstance(field_value, dict):
return { return {
key: construct_value(get_args(t)[1], value) key: construct_value(get_args(t)[1], value)
for key, value in field_value.items() for key, value in field_value.items()
} }
if get_origin(t) is Literal: if origin is Literal:
valid_values = get_args(t) valid_values = get_args(t)
if field_value not in valid_values: if field_value not in valid_values:
msg = f"Expected one of {valid_values}, got {field_value}" msg = f"Expected one of {', '.join(valid_values)}, got {field_value}"
raise ClanError(msg, location=f"{loc}") raise ClanError(msg, location=f"{loc}")
return field_value return field_value
if get_origin(t) is Annotated: if origin is Enum:
if field_value not in origin.__members__:
msg = f"Expected one of {', '.join(origin.__members__)}, got {field_value}"
raise ClanError(msg, location=f"{loc}")
if origin is Annotated:
(base_type,) = get_args(t) (base_type,) = get_args(t)
return construct_value(base_type, field_value) return construct_value(base_type, field_value)

View File

@@ -2,6 +2,7 @@ import copy
import dataclasses import dataclasses
import pathlib import pathlib
from dataclasses import MISSING from dataclasses import MISSING
from enum import EnumType
from types import NoneType, UnionType from types import NoneType, UnionType
from typing import ( from typing import (
Annotated, Annotated,
@@ -77,13 +78,16 @@ def type_to_dict(
if dataclasses.is_dataclass(t): if dataclasses.is_dataclass(t):
fields = dataclasses.fields(t) fields = dataclasses.fields(t)
properties = { properties = {}
f.metadata.get("alias", f.name): type_to_dict( for f in fields:
if f.name.startswith("_"):
continue
assert not isinstance(
f.type, str
), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?"
properties[f.metadata.get("alias", f.name)] = type_to_dict(
f.type, f"{scope} {t.__name__}.{f.name}", type_map f.type, f"{scope} {t.__name__}.{f.name}", type_map
) )
for f in fields
if not f.name.startswith("_")
}
required = set() required = set()
for pn, pv in properties.items(): for pn, pv in properties.items():
@@ -192,6 +196,11 @@ def type_to_dict(
return {"type": "boolean"} return {"type": "boolean"}
if t is object: if t is object:
return {"type": "object"} return {"type": "object"}
if type(t) is EnumType:
return {
"type": "string",
"enum": list(t.__members__),
}
if t is Any: if t is Any:
msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}" msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}"
raise JSchemaTypeError(msg) raise JSchemaTypeError(msg)
@@ -208,7 +217,7 @@ def type_to_dict(
if t is NoneType: if t is NoneType:
return {"type": "null"} return {"type": "null"}
msg = f"{scope} - Error primitive type not supported {t!s}" msg = f"{scope} - Basic type '{t!s}' is not supported"
raise JSchemaTypeError(msg) raise JSchemaTypeError(msg)
msg = f"{scope} - Error type not supported {t!s}" msg = f"{scope} - Type '{t!s}' is not supported"
raise JSchemaTypeError(msg) raise JSchemaTypeError(msg)

View File

@@ -1,63 +1,49 @@
import argparse import argparse
import json
import logging import logging
from pathlib import Path import sys
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.git import commit_files from clan_cli.git import commit_files
from . import sops
from .secrets import update_secrets from .secrets import update_secrets
from .sops import default_admin_key_path, generate_private_key, get_public_key from .sops import (
default_admin_key_path,
generate_private_key,
maybe_get_admin_public_key,
)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def extract_public_key(filepath: Path) -> str: def generate_key() -> sops.SopsKey:
""" key = maybe_get_admin_public_key()
Extracts the public key from a given text file. if key is not None:
""" print(f"{key.key_type.name} key {key.pubkey} is already set")
try: return key
with filepath.open() as file:
for line in file:
# Check if the line contains the public key
if line.startswith("# public key:"):
# Extract and return the public key part after the prefix
return line.strip().split(": ")[1]
except FileNotFoundError as e:
msg = f"The file at {filepath} was not found."
raise ClanError(msg) from e
except OSError as e:
msg = f"An error occurred while extracting the public key: {e}"
raise ClanError(msg) from e
msg = f"Could not find the public key in the file at {filepath}."
raise ClanError(msg)
def generate_key() -> str:
path = default_admin_key_path() path = default_admin_key_path()
if path.exists(): _, pub_key = generate_private_key(out_file=path)
log.info(f"Key already exists at {path}") print(
return extract_public_key(path) f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
priv_key, pub_key = generate_private_key(out_file=path)
log.info(
f"Generated age private key at '{default_admin_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets."
) )
return pub_key return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE)
def show_key() -> str:
return get_public_key(default_admin_key_path().read_text())
def generate_command(args: argparse.Namespace) -> None: def generate_command(args: argparse.Namespace) -> None:
pub_key = generate_key() key = generate_key()
log.info( print("Also add your age public key to the repository with:")
f"Also add your age public key to the repository with: \nclan secrets users add <username> {pub_key}" key_type = key.key_type.name.lower()
) print(f"clan secrets users add --{key_type}-key <username>")
def show_command(args: argparse.Namespace) -> None: def show_command(args: argparse.Namespace) -> None:
print(show_key()) key = sops.maybe_get_admin_public_key()
if not key:
msg = "No public key found"
raise ClanError(msg)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
def update_command(args: argparse.Namespace) -> None: def update_command(args: argparse.Namespace) -> None:
@@ -73,10 +59,16 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None:
required=True, required=True,
) )
parser_generate = subparser.add_parser("generate", help="generate age key") parser_generate = subparser.add_parser(
"generate",
description=(
"Generate an age key for the Clan, "
"to use PGP set `SOPS_PGP_FP` in your environment."
),
)
parser_generate.set_defaults(func=generate_command) parser_generate.set_defaults(func=generate_command)
parser_show = subparser.add_parser("show", help="show age public key") parser_show = subparser.add_parser("show", help="show public key")
parser_show.set_defaults(func=show_command) parser_show.set_defaults(func=show_command)
parser_update = subparser.add_parser( parser_update = subparser.add_parser(

View File

@@ -10,7 +10,7 @@ from clan_cli.errors import ClanError
from clan_cli.git import commit_files from clan_cli.git import commit_files
from clan_cli.machines.types import machine_name_type, validate_hostname from clan_cli.machines.types import machine_name_type, validate_hostname
from . import secrets from . import secrets, sops
from .folders import ( from .folders import (
list_objects, list_objects,
remove_object, remove_object,
@@ -24,7 +24,7 @@ from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None: def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None:
machine_path = sops_machines_folder(flake_dir) / machine machine_path = sops_machines_folder(flake_dir) / machine
write_key(machine_path, pubkey, force) write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
paths = [machine_path] paths = [machine_path]
def filter_machine_secrets(secret: Path) -> bool: def filter_machine_secrets(secret: Path) -> bool:
@@ -48,7 +48,8 @@ def remove_machine(flake_dir: Path, name: str) -> None:
def get_machine(flake_dir: Path, name: str) -> str: def get_machine(flake_dir: Path, name: str) -> str:
return read_key(sops_machines_folder(flake_dir) / name) key, _ = read_key(sops_machines_folder(flake_dir) / name)
return key
def has_machine(flake_dir: Path, name: str) -> bool: def has_machine(flake_dir: Path, name: str) -> bool:
@@ -168,7 +169,7 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
add_dynamic_completer(add_machine_action, complete_machines) add_dynamic_completer(add_machine_action, complete_machines)
add_parser.add_argument( add_parser.add_argument(
"key", "key",
help="public key or private key of the user", help="public or private age key of the machine",
type=public_or_private_age_key_type, type=public_or_private_age_key_type,
) )
add_parser.set_defaults(func=add_command) add_parser.set_defaults(func=add_command)

View File

@@ -1,5 +1,7 @@
import argparse import argparse
import functools
import getpass import getpass
import operator
import os import os
import shutil import shutil
import sys import sys
@@ -20,6 +22,7 @@ from clan_cli.completions import (
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.git import commit_files from clan_cli.git import commit_files
from . import sops
from .folders import ( from .folders import (
list_objects, list_objects,
sops_groups_folder, sops_groups_folder,
@@ -42,13 +45,13 @@ def update_secrets(
changed_files.extend( changed_files.extend(
update_keys( update_keys(
secret_path, secret_path,
sorted(collect_keys_for_path(secret_path)), sorted_keys(collect_keys_for_path(secret_path)),
) )
) )
return changed_files return changed_files
def collect_keys_for_type(folder: Path) -> set[str]: def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
if not folder.exists(): if not folder.exists():
return set() return set()
keys = set() keys = set()
@@ -68,7 +71,7 @@ def collect_keys_for_type(folder: Path) -> set[str]:
return keys return keys
def collect_keys_for_path(path: Path) -> set[str]: def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
keys = set() keys = set()
keys.update(collect_keys_for_type(path / "machines")) keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users")) keys.update(collect_keys_for_type(path / "users"))
@@ -132,8 +135,8 @@ def encrypt_secret(
recipient_keys = collect_keys_for_path(secret_path) recipient_keys = collect_keys_for_path(secret_path)
if key.pubkey not in recipient_keys: if (key.pubkey, key.key_type) not in recipient_keys:
recipient_keys.add(key.pubkey) recipient_keys.add((key.pubkey, key.key_type))
files_to_commit.extend( files_to_commit.extend(
allow_member( allow_member(
users_folder(secret_path), users_folder(secret_path),
@@ -144,7 +147,7 @@ def encrypt_secret(
) )
secret_path = secret_path / "secret" secret_path = secret_path / "secret"
encrypt_file(secret_path, value, sorted(recipient_keys)) encrypt_file(secret_path, value, sorted_keys(recipient_keys))
files_to_commit.append(secret_path) files_to_commit.append(secret_path)
if git_commit: if git_commit:
commit_files( commit_files(
@@ -228,7 +231,7 @@ def allow_member(
changed.extend( changed.extend(
update_keys( update_keys(
group_folder.parent, group_folder.parent,
sorted(collect_keys_for_path(group_folder.parent)), sorted_keys(collect_keys_for_path(group_folder.parent)),
) )
) )
return changed return changed
@@ -255,10 +258,13 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
group_folder.parent.rmdir() group_folder.parent.rmdir()
return update_keys( return update_keys(
target.parent.parent, sorted(collect_keys_for_path(group_folder.parent)) target.parent.parent, sorted_keys(collect_keys_for_path(group_folder.parent))
) )
sorted_keys = functools.partial(sorted, key=operator.itemgetter(0))
def has_secret(secret_path: Path) -> bool: def has_secret(secret_path: Path) -> bool:
return (secret_path / "secret").exists() return (secret_path / "secret").exists()

View File

@@ -1,3 +1,4 @@
import enum
import io import io
import json import json
import os import os
@@ -19,13 +20,32 @@ from clan_cli.nix import nix_shell
from .folders import sops_machines_folder, sops_users_folder from .folders import sops_machines_folder, sops_users_folder
@dataclass class KeyType(enum.Enum):
AGE = enum.auto()
PGP = enum.auto()
@classmethod
def validate(cls, value: str | None) -> "KeyType | None": # noqa: ANN102
if value:
return cls.__members__.get(value.upper())
return None
@dataclass(frozen=True, eq=False)
class SopsKey: class SopsKey:
pubkey: str pubkey: str
username: str username: str
key_type: KeyType
def as_dict(self) -> dict[str, str]:
return {
"publickey": self.pubkey,
"username": self.username,
"type": self.key_type.name.lower(),
}
def get_public_key(privkey: str) -> str: def get_public_age_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try: try:
res = subprocess.run( res = subprocess.run(
@@ -78,8 +98,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists") print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None: def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
key = SopsKey(pub_key, username="")
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)] folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders: for folder in folders:
@@ -87,20 +106,20 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
for user in folder.iterdir(): for user in folder.iterdir():
if not (user / "key.json").exists(): if not (user / "key.json").exists():
continue continue
if read_key(user) == pub_key: this_pub_key, this_key_type = read_key(user)
key.username = user.name if key.pubkey == this_pub_key and key.key_type == this_key_type:
return key return SopsKey(key.pubkey, user.name, key.key_type)
return None return None
@API.register @API.register
def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey: def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
key = maybe_get_user_or_machine(flake_dir, pub_key) maybe_key = maybe_get_user_or_machine(flake_dir, key)
if not key: if maybe_key:
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)" return maybe_key
raise ClanError(msg) msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
return key raise ClanError(msg)
def default_admin_key_path() -> Path: def default_admin_key_path() -> Path:
@@ -111,43 +130,59 @@ def default_admin_key_path() -> Path:
@API.register @API.register
def maybe_get_admin_public_key() -> str | None: def maybe_get_admin_public_key() -> None | SopsKey:
key = os.environ.get("SOPS_AGE_KEY") age_key = os.environ.get("SOPS_AGE_KEY")
if key: pgp_key = os.environ.get("SOPS_PGP_FP")
return get_public_key(key) if age_key and pgp_key:
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
raise ClanError(msg)
if age_key:
return SopsKey(
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
)
if pgp_key:
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
path = default_admin_key_path() path = default_admin_key_path()
if path.exists(): if path.exists():
return get_public_key(path.read_text()) return SopsKey(
pubkey=get_public_age_key(path.read_text()),
key_type=KeyType.AGE,
username="",
)
return None return None
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None: def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
pub_key = maybe_get_admin_public_key() key = maybe_get_admin_public_key()
if pub_key: if not key:
return maybe_get_user_or_machine(flake_dir, pub_key) return None
return None return maybe_get_user_or_machine(flake_dir, key)
def ensure_admin_key(flake_dir: Path) -> SopsKey: def ensure_admin_key(flake_dir: Path) -> SopsKey:
pub_key = maybe_get_admin_public_key() key = maybe_get_admin_public_key()
if not pub_key: if key:
msg = "No sops key found. Please generate one with 'clan secrets key generate'." return ensure_user_or_machine(flake_dir, key)
raise ClanError(msg) msg = "No sops key found. Please generate one with 'clan secrets key generate'."
return ensure_user_or_machine(flake_dir, pub_key) raise ClanError(msg)
@contextmanager @contextmanager
def sops_manifest(keys: list[str]) -> Iterator[Path]: def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
all_keys: dict[str, list[str]] = {
key_type.lower(): [] for key_type in KeyType.__members__
}
for key, key_type in keys:
all_keys[key_type.name.lower()].append(key)
with NamedTemporaryFile(delete=False, mode="w") as manifest: with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump( json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
{"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2
)
manifest.flush() manifest.flush()
yield Path(manifest.name) yield Path(manifest.name)
def update_keys(secret_path: Path, keys: list[str]) -> list[Path]: def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]:
with sops_manifest(keys) as manifest: with sops_manifest(keys) as manifest:
secret_path = secret_path / "secret" secret_path = secret_path / "secret"
time_before = secret_path.stat().st_mtime time_before = secret_path.stat().st_mtime
@@ -171,7 +206,7 @@ def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
def encrypt_file( def encrypt_file(
secret_path: Path, secret_path: Path,
content: IO[str] | str | bytes | None, content: IO[str] | str | bytes | None,
pubkeys: list[str], pubkeys: list[tuple[str, KeyType]],
) -> None: ) -> None:
folder = secret_path.parent folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True) folder.mkdir(parents=True, exist_ok=True)
@@ -237,7 +272,7 @@ def get_meta(secret_path: Path) -> dict:
return json.load(f) return json.load(f)
def write_key(path: Path, publickey: str, overwrite: bool) -> None: def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True) path.mkdir(parents=True, exist_ok=True)
try: try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
@@ -248,21 +283,23 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None:
msg = f"{path.name} already exists in {path}. Use --force to overwrite." msg = f"{path.name} already exists in {path}. Use --force to overwrite."
raise ClanError(msg) from e raise ClanError(msg) from e
with os.fdopen(fd, "w") as f: with os.fdopen(fd, "w") as f:
json.dump({"publickey": publickey, "type": "age"}, f, indent=2) contents = {"publickey": publickey, "type": key_type.name.lower()}
json.dump(contents, f, indent=2)
def read_key(path: Path) -> str: def read_key(path: Path) -> tuple[str, KeyType]:
with Path(path / "key.json").open() as f: with Path(path / "key.json").open() as f:
try: try:
key = json.load(f) key = json.load(f)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}" msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e raise ClanError(msg) from e
if key["type"] != "age": key_type = KeyType.validate(key.get("type"))
msg = f"{path.name} is not an age key but {key['type']}. This is not supported" if key_type is None:
msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
raise ClanError(msg) raise ClanError(msg)
publickey = key.get("publickey") publickey = key.get("publickey")
if not publickey: if not publickey:
msg = f"{path.name} does not contain a public key" msg = f"{path.name} does not contain a public key"
raise ClanError(msg) raise ClanError(msg)
return publickey return publickey, key_type

View File

@@ -5,7 +5,7 @@ from pathlib import Path
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from .sops import get_public_key from .sops import get_public_age_key
VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$") VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$")
VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$") VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
@@ -24,7 +24,7 @@ def public_or_private_age_key_type(arg_value: str) -> str:
if arg_value.startswith("age1"): if arg_value.startswith("age1"):
return arg_value.strip() return arg_value.strip()
if arg_value.startswith("AGE-SECRET-KEY-"): if arg_value.startswith("AGE-SECRET-KEY-"):
return get_public_key(arg_value) return get_public_age_key(arg_value)
if not arg_value.startswith("age1"): if not arg_value.startswith("age1"):
msg = f"Please provide an age key starting with age1, got: '{arg_value}'" msg = f"Please provide an age key starting with age1, got: '{arg_value}'"
raise ClanError(msg) raise ClanError(msg)

View File

@@ -1,11 +1,13 @@
import argparse import argparse
import json
import sys
from pathlib import Path from pathlib import Path
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.git import commit_files from clan_cli.git import commit_files
from . import secrets from . import secrets, sops
from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder
from .secrets import update_secrets from .secrets import update_secrets
from .sops import read_key, write_key from .sops import read_key, write_key
@@ -17,13 +19,19 @@ from .types import (
) )
def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None: def add_user(
flake_dir: Path,
name: str,
key: str,
key_type: sops.KeyType,
force: bool,
) -> None:
path = sops_users_folder(flake_dir) / name path = sops_users_folder(flake_dir) / name
def filter_user_secrets(secret: Path) -> bool: def filter_user_secrets(secret: Path) -> bool:
return secret.joinpath("users", name).exists() return secret.joinpath("users", name).exists()
write_key(path, key, force) write_key(path, key, key_type, overwrite=force)
paths = [path] paths = [path]
paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets)) paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets))
commit_files( commit_files(
@@ -42,8 +50,9 @@ def remove_user(flake_dir: Path, name: str) -> None:
) )
def get_user(flake_dir: Path, name: str) -> str: def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
return read_key(sops_users_folder(flake_dir) / name) key, key_type = read_key(sops_users_folder(flake_dir) / name)
return sops.SopsKey(key, name, key_type)
def list_users(flake_dir: Path) -> list[str]: def list_users(flake_dir: Path) -> list[str]:
@@ -95,14 +104,24 @@ def add_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
msg = "Could not find clan flake toplevel directory" msg = "Could not find clan flake toplevel directory"
raise ClanError(msg) raise ClanError(msg)
add_user(args.flake.path, args.user, args.key, args.force) if args.age_key or args.agekey:
key_type = sops.KeyType.AGE
elif args.pgp_key:
key_type = sops.KeyType.PGP
else:
msg = "BUG!: key type not set"
raise ValueError(msg)
key = args.agekey or args.age_key or args.pgp_key
assert key is not None, "key is None"
add_user(args.flake.path, args.user, key, key_type, args.force)
def get_command(args: argparse.Namespace) -> None: def get_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
msg = "Could not find clan flake toplevel directory" msg = "Could not find clan flake toplevel directory"
raise ClanError(msg) raise ClanError(msg)
print(get_user(args.flake.path, args.user)) key = get_user(args.flake.path, args.user)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
@@ -141,12 +160,29 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
"-f", "--force", help="overwrite existing user", action="store_true" "-f", "--force", help="overwrite existing user", action="store_true"
) )
add_parser.add_argument("user", help="the name of the user", type=user_name_type) add_parser.add_argument("user", help="the name of the user", type=user_name_type)
add_parser.add_argument( key_type = add_parser.add_mutually_exclusive_group(required=True)
"key", key_type.add_argument(
help="public key or private key of the user." "agekey",
"Execute 'clan secrets key --help' on how to retrieve a key." help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age", "To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type, type=public_or_private_age_key_type,
nargs="?",
)
key_type.add_argument(
"--age-key",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
)
key_type.add_argument(
"--pgp-key",
help=(
"public PGP encryption key of the user. "
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
),
) )
add_parser.set_defaults(func=add_command) add_parser.set_defaults(func=add_command)

View File

@@ -229,7 +229,7 @@ class SecretStore(SecretStoreBase):
) -> bool: ) -> bool:
secret_path = self.secret_path(generator_name, secret_name, shared) secret_path = self.secret_path(generator_name, secret_name, shared)
secret = json.loads((secret_path / "secret").read_text()) secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in secret["sops"]["age"]] recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
machines_folder_path = sops_machines_folder(self.machine.flake_dir) machines_folder_path = sops_machines_folder(self.machine.flake_dir)
machine_pubkey = json.loads( machine_pubkey = json.loads(
(machines_folder_path / self.machine.name / "key.json").read_text() (machines_folder_path / self.machine.name / "key.json").read_text()

View File

@@ -129,6 +129,9 @@ def test_all_dataclasses() -> None:
try: try:
API.reset() API.reset()
dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent)) dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent))
if dclass is None:
msg = f"Could not load dataclass {dataclass} from {file}"
raise ClanError(msg)
type_to_dict(dclass) type_to_dict(dclass)
except JSchemaTypeError as e: except JSchemaTypeError as e:
print(f"Error loading dataclass {dataclass} from {file}: {e}") print(f"Error loading dataclass {dataclass} from {file}: {e}")

View File

@@ -1,7 +1,12 @@
import functools
import json
import logging import logging
import os import os
import re
import subprocess
from collections.abc import Iterator from collections.abc import Iterator
from contextlib import contextmanager from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
@@ -227,7 +232,7 @@ def test_groups(
@contextmanager @contextmanager
def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
old_key = os.environ["SOPS_AGE_KEY_FILE"] old_key = os.environ["SOPS_AGE_KEY_FILE"]
monkeypatch.delenv("SOPS_AGE_KEY_FILE") monkeypatch.delenv("SOPS_AGE_KEY_FILE")
monkeypatch.setenv("SOPS_AGE_KEY", key) monkeypatch.setenv("SOPS_AGE_KEY", key)
@@ -238,29 +243,95 @@ def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key) monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key)
@contextmanager
def use_gpg_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
old_key_file = os.environ.get("SOPS_AGE_KEY_FILE")
old_key = os.environ.get("SOPS_AGE_KEY")
monkeypatch.delenv("SOPS_AGE_KEY_FILE", raising=False)
monkeypatch.delenv("SOPS_AGE_KEY", raising=False)
monkeypatch.setenv("SOPS_PGP_FP", key)
try:
yield
finally:
monkeypatch.delenv("SOPS_PGP_FP")
if old_key_file is not None:
monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key_file)
if old_key is not None:
monkeypatch.setenv("SOPS_AGE_KEY", old_key)
@pytest.fixture
def gpg_key(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> str:
gpg_home = tmp_path / "gnupghome"
gpg_home.mkdir(mode=0o700)
gpg_environ = os.environ.copy()
gpg_environ["GNUPGHOME"] = str(gpg_home)
run = functools.partial(
subprocess.run,
encoding="utf-8",
check=True,
env=gpg_environ,
)
key_parameters = "\n".join(
(
"%no-protection",
"%transient-key",
"Key-Type: rsa",
"Key-Usage: cert encrypt",
"Name-Real: Foo Bar",
"Name-Comment: Test user",
"Name-Email: test@clan.lol",
"%commit",
)
)
run(["gpg", "--batch", "--quiet", "--generate-key"], input=key_parameters)
details = run(["gpg", "--list-keys", "--with-colons"], capture_output=True)
fingerprint = None
for line in details.stdout.strip().split(os.linesep):
if not line.startswith("fpr"):
continue
fingerprint = line.split(":")[9]
break
assert fingerprint is not None, "Could not generate test GPG key"
log.info(f"Created GPG key under {gpg_home}")
monkeypatch.setenv("GNUPGHOME", str(gpg_home))
return fingerprint
def test_secrets( def test_secrets(
test_flake: FlakeForTest, test_flake: FlakeForTest,
capture_output: CaptureOutput, capture_output: CaptureOutput,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
gpg_key: str,
age_keys: list["KeyPair"], age_keys: list["KeyPair"],
) -> None: ) -> None:
with capture_output as output: with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)]) cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == "" assert output.out == ""
monkeypatch.setenv("SOPS_NIX_SECRET", "foo") # Generate a new key for the clan
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key")) monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)]) with capture_output as output:
cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)])
assert "age private key" in output.out
# Read the key that was generated
with capture_output as output: with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)]) cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = output.out key = json.loads(output.out)["publickey"]
assert key.startswith("age1") assert key.startswith("age1")
# Add testuser with the key that was generated for the clan
cli.run( cli.run(
["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key] ["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key]
) )
with pytest.raises(ClanError): # does not exist yet with pytest.raises(ClanError): # does not exist yet
cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"])
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"]) cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"])
with capture_output as output: with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"])
@@ -289,6 +360,8 @@ def test_secrets(
cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"]) cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"])
assert output.out == "key\n" assert output.out == "key\n"
# using the `age_keys` KeyPair, add a machine and rotate its key
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -315,7 +388,7 @@ def test_secrets(
cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)]) cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)])
assert output.out == "machine1\n" assert output.out == "machine1\n"
with use_key(age_keys[1].privkey, monkeypatch): with use_age_key(age_keys[1].privkey, monkeypatch):
with capture_output as output: with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo" assert output.out == "foo"
@@ -335,7 +408,7 @@ def test_secrets(
) )
# should also rotate the encrypted secret # should also rotate the encrypted secret
with use_key(age_keys[0].privkey, monkeypatch): with use_age_key(age_keys[0].privkey, monkeypatch):
with capture_output as output: with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo" assert output.out == "foo"
@@ -374,7 +447,7 @@ def test_secrets(
"key", "key",
] ]
) )
with capture_output as output, use_key(age_keys[1].privkey, monkeypatch): with capture_output as output, use_age_key(age_keys[1].privkey, monkeypatch):
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo" assert output.out == "foo"
cli.run( cli.run(
@@ -447,12 +520,12 @@ def test_secrets(
] ]
) )
with use_key(age_keys[1].privkey, monkeypatch): with use_age_key(age_keys[1].privkey, monkeypatch):
with capture_output as output: with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo" assert output.out == "foo"
# extend group will update secrets # Add an user with a GPG key
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -460,10 +533,13 @@ def test_secrets(
"add", "add",
"--flake", "--flake",
str(test_flake.path), str(test_flake.path),
"--pgp-key",
gpg_key,
"user2", "user2",
age_keys[2].pubkey,
] ]
) )
# Extend group will update secrets
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -476,7 +552,7 @@ def test_secrets(
] ]
) )
with use_key(age_keys[2].privkey, monkeypatch): # user2 with use_gpg_key(gpg_key, monkeypatch): # user2
with capture_output as output: with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"]) cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo" assert output.out == "foo"
@@ -494,7 +570,7 @@ def test_secrets(
) )
with ( with (
pytest.raises(ClanError), pytest.raises(ClanError),
use_key(age_keys[2].privkey, monkeypatch), use_gpg_key(gpg_key, monkeypatch),
capture_output as output, capture_output as output,
): ):
# user2 is not in the group anymore # user2 is not in the group anymore
@@ -519,3 +595,66 @@ def test_secrets(
with capture_output as output: with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)]) cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == "" assert output.out == ""
def test_secrets_key_generate_gpg(
test_flake: FlakeForTest,
capture_output: CaptureOutput,
monkeypatch: pytest.MonkeyPatch,
gpg_key: str,
) -> None:
with use_gpg_key(gpg_key, monkeypatch):
# Make sure clan secrets key generate recognizes
# the PGP key and does nothing:
with capture_output as output:
cli.run(
[
"secrets",
"key",
"generate",
"--flake",
str(test_flake.path),
]
)
assert "age private key" not in output.out
assert re.match(r"PGP key.+is already set", output.out) is not None
with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = json.loads(output.out)
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key
# Add testuser with the key that was (not) generated for the clan:
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"--pgp-key",
gpg_key,
"testuser",
]
)
with capture_output as output:
cli.run(
[
"secrets",
"users",
"get",
"--flake",
str(test_flake.path),
"testuser",
]
)
key = json.loads(output.out)
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key
monkeypatch.setenv("SOPS_NIX_SECRET", "secret-value")
cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"])
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-name"])
assert output.out == "secret-value"