Revert "Merge pull request 'Revert "Merge pull request 'clan-cli: secrets: Add support for PGP keys with sops-nix' (#2186) from lopter/clan-core:lo-sops-nix-pgp-support into main"' (#2202) from revert into main"

This reverts commit a5496e8db0, reversing
changes made to 9bb1aef245.
This commit is contained in:
Jörg Thalheim
2024-10-04 18:24:59 +02:00
committed by Mic92
parent 3d3324155b
commit 1666513e91
11 changed files with 361 additions and 131 deletions

View File

@@ -1,63 +1,49 @@
import argparse
import json
import logging
from pathlib import Path
import sys
from clan_cli.errors import ClanError
from clan_cli.git import commit_files
from . import sops
from .secrets import update_secrets
from .sops import default_admin_key_path, generate_private_key, get_public_key
from .sops import (
default_admin_key_path,
generate_private_key,
maybe_get_admin_public_key,
)
log = logging.getLogger(__name__)
def extract_public_key(filepath: Path) -> str:
"""
Extracts the public key from a given text file.
"""
try:
with filepath.open() as file:
for line in file:
# Check if the line contains the public key
if line.startswith("# public key:"):
# Extract and return the public key part after the prefix
return line.strip().split(": ")[1]
except FileNotFoundError as e:
msg = f"The file at {filepath} was not found."
raise ClanError(msg) from e
except OSError as e:
msg = f"An error occurred while extracting the public key: {e}"
raise ClanError(msg) from e
def generate_key() -> sops.SopsKey:
key = maybe_get_admin_public_key()
if key is not None:
print(f"{key.key_type.name} key {key.pubkey} is already set")
return key
msg = f"Could not find the public key in the file at {filepath}."
raise ClanError(msg)
def generate_key() -> str:
path = default_admin_key_path()
if path.exists():
log.info(f"Key already exists at {path}")
return extract_public_key(path)
priv_key, pub_key = generate_private_key(out_file=path)
log.info(
f"Generated age private key at '{default_admin_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets."
_, pub_key = generate_private_key(out_file=path)
print(
f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
)
return pub_key
def show_key() -> str:
return get_public_key(default_admin_key_path().read_text())
return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE)
def generate_command(args: argparse.Namespace) -> None:
pub_key = generate_key()
log.info(
f"Also add your age public key to the repository with: \nclan secrets users add <username> {pub_key}"
)
key = generate_key()
print("Also add your age public key to the repository with:")
key_type = key.key_type.name.lower()
print(f"clan secrets users add --{key_type}-key <username>")
def show_command(args: argparse.Namespace) -> None:
print(show_key())
key = sops.maybe_get_admin_public_key()
if not key:
msg = "No public key found"
raise ClanError(msg)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
def update_command(args: argparse.Namespace) -> None:
@@ -73,10 +59,16 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None:
required=True,
)
parser_generate = subparser.add_parser("generate", help="generate age key")
parser_generate = subparser.add_parser(
"generate",
description=(
"Generate an age key for the Clan, "
"to use PGP set `SOPS_PGP_FP` in your environment."
),
)
parser_generate.set_defaults(func=generate_command)
parser_show = subparser.add_parser("show", help="show age public key")
parser_show = subparser.add_parser("show", help="show public key")
parser_show.set_defaults(func=show_command)
parser_update = subparser.add_parser(

View File

@@ -10,7 +10,7 @@ from clan_cli.errors import ClanError
from clan_cli.git import commit_files
from clan_cli.machines.types import machine_name_type, validate_hostname
from . import secrets
from . import secrets, sops
from .folders import (
list_objects,
remove_object,
@@ -24,7 +24,7 @@ from .types import public_or_private_age_key_type, secret_name_type
def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None:
machine_path = sops_machines_folder(flake_dir) / machine
write_key(machine_path, pubkey, force)
write_key(machine_path, pubkey, sops.KeyType.AGE, overwrite=force)
paths = [machine_path]
def filter_machine_secrets(secret: Path) -> bool:
@@ -48,7 +48,8 @@ def remove_machine(flake_dir: Path, name: str) -> None:
def get_machine(flake_dir: Path, name: str) -> str:
return read_key(sops_machines_folder(flake_dir) / name)
key, _ = read_key(sops_machines_folder(flake_dir) / name)
return key
def has_machine(flake_dir: Path, name: str) -> bool:
@@ -168,7 +169,7 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
add_dynamic_completer(add_machine_action, complete_machines)
add_parser.add_argument(
"key",
help="public key or private key of the user",
help="public or private age key of the machine",
type=public_or_private_age_key_type,
)
add_parser.set_defaults(func=add_command)

View File

@@ -1,5 +1,7 @@
import argparse
import functools
import getpass
import operator
import os
import shutil
import sys
@@ -20,6 +22,7 @@ from clan_cli.completions import (
from clan_cli.errors import ClanError
from clan_cli.git import commit_files
from . import sops
from .folders import (
list_objects,
sops_groups_folder,
@@ -42,13 +45,13 @@ def update_secrets(
changed_files.extend(
update_keys(
secret_path,
sorted(collect_keys_for_path(secret_path)),
sorted_keys(collect_keys_for_path(secret_path)),
)
)
return changed_files
def collect_keys_for_type(folder: Path) -> set[str]:
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
if not folder.exists():
return set()
keys = set()
@@ -68,7 +71,7 @@ def collect_keys_for_type(folder: Path) -> set[str]:
return keys
def collect_keys_for_path(path: Path) -> set[str]:
def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
keys = set()
keys.update(collect_keys_for_type(path / "machines"))
keys.update(collect_keys_for_type(path / "users"))
@@ -132,8 +135,8 @@ def encrypt_secret(
recipient_keys = collect_keys_for_path(secret_path)
if key.pubkey not in recipient_keys:
recipient_keys.add(key.pubkey)
if (key.pubkey, key.key_type) not in recipient_keys:
recipient_keys.add((key.pubkey, key.key_type))
files_to_commit.extend(
allow_member(
users_folder(secret_path),
@@ -144,7 +147,7 @@ def encrypt_secret(
)
secret_path = secret_path / "secret"
encrypt_file(secret_path, value, sorted(recipient_keys))
encrypt_file(secret_path, value, sorted_keys(recipient_keys))
files_to_commit.append(secret_path)
if git_commit:
commit_files(
@@ -228,7 +231,7 @@ def allow_member(
changed.extend(
update_keys(
group_folder.parent,
sorted(collect_keys_for_path(group_folder.parent)),
sorted_keys(collect_keys_for_path(group_folder.parent)),
)
)
return changed
@@ -255,10 +258,13 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
group_folder.parent.rmdir()
return update_keys(
target.parent.parent, sorted(collect_keys_for_path(group_folder.parent))
target.parent.parent, sorted_keys(collect_keys_for_path(group_folder.parent))
)
sorted_keys = functools.partial(sorted, key=operator.itemgetter(0))
def has_secret(secret_path: Path) -> bool:
return (secret_path / "secret").exists()

View File

@@ -1,3 +1,4 @@
import enum
import io
import json
import os
@@ -19,13 +20,32 @@ from clan_cli.nix import nix_shell
from .folders import sops_machines_folder, sops_users_folder
@dataclass
class KeyType(enum.Enum):
AGE = enum.auto()
PGP = enum.auto()
@classmethod
def validate(cls, value: str | None) -> "KeyType | None": # noqa: ANN102
if value:
return cls.__members__.get(value.upper())
return None
@dataclass(frozen=True, eq=False)
class SopsKey:
pubkey: str
username: str
key_type: KeyType
def as_dict(self) -> dict[str, str]:
return {
"publickey": self.pubkey,
"username": self.username,
"type": self.key_type.name.lower(),
}
def get_public_key(privkey: str) -> str:
def get_public_age_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try:
res = subprocess.run(
@@ -78,8 +98,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
key = SopsKey(pub_key, username="")
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
@@ -87,20 +106,20 @@ def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
if read_key(user) == pub_key:
key.username = user.name
return key
this_pub_key, this_key_type = read_key(user)
if key.pubkey == this_pub_key and key.key_type == this_key_type:
return SopsKey(key.pubkey, user.name, key.key_type)
return None
@API.register
def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
key = maybe_get_user_or_machine(flake_dir, pub_key)
if not key:
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
raise ClanError(msg)
return key
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
maybe_key = maybe_get_user_or_machine(flake_dir, key)
if maybe_key:
return maybe_key
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
def default_admin_key_path() -> Path:
@@ -111,43 +130,59 @@ def default_admin_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> str | None:
key = os.environ.get("SOPS_AGE_KEY")
if key:
return get_public_key(key)
def maybe_get_admin_public_key() -> None | SopsKey:
age_key = os.environ.get("SOPS_AGE_KEY")
pgp_key = os.environ.get("SOPS_PGP_FP")
if age_key and pgp_key:
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
raise ClanError(msg)
if age_key:
return SopsKey(
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
)
if pgp_key:
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
path = default_admin_key_path()
if path.exists():
return get_public_key(path.read_text())
return SopsKey(
pubkey=get_public_age_key(path.read_text()),
key_type=KeyType.AGE,
username="",
)
return None
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
pub_key = maybe_get_admin_public_key()
if pub_key:
return maybe_get_user_or_machine(flake_dir, pub_key)
return None
key = maybe_get_admin_public_key()
if not key:
return None
return maybe_get_user_or_machine(flake_dir, key)
def ensure_admin_key(flake_dir: Path) -> SopsKey:
pub_key = maybe_get_admin_public_key()
if not pub_key:
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
return ensure_user_or_machine(flake_dir, pub_key)
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
@contextmanager
def sops_manifest(keys: list[str]) -> Iterator[Path]:
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
all_keys: dict[str, list[str]] = {
key_type.lower(): [] for key_type in KeyType.__members__
}
for key, key_type in keys:
all_keys[key_type.name.lower()].append(key)
with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump(
{"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2
)
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
manifest.flush()
yield Path(manifest.name)
def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]:
with sops_manifest(keys) as manifest:
secret_path = secret_path / "secret"
time_before = secret_path.stat().st_mtime
@@ -171,7 +206,7 @@ def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
def encrypt_file(
secret_path: Path,
content: IO[str] | str | bytes | None,
pubkeys: list[str],
pubkeys: list[tuple[str, KeyType]],
) -> None:
folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True)
@@ -237,7 +272,7 @@ def get_meta(secret_path: Path) -> dict:
return json.load(f)
def write_key(path: Path, publickey: str, overwrite: bool) -> None:
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
@@ -248,21 +283,23 @@ def write_key(path: Path, publickey: str, overwrite: bool) -> None:
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
raise ClanError(msg) from e
with os.fdopen(fd, "w") as f:
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
contents = {"publickey": publickey, "type": key_type.name.lower()}
json.dump(contents, f, indent=2)
def read_key(path: Path) -> str:
def read_key(path: Path) -> tuple[str, KeyType]:
with Path(path / "key.json").open() as f:
try:
key = json.load(f)
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
if key["type"] != "age":
msg = f"{path.name} is not an age key but {key['type']}. This is not supported"
key_type = KeyType.validate(key.get("type"))
if key_type is None:
msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
raise ClanError(msg)
publickey = key.get("publickey")
if not publickey:
msg = f"{path.name} does not contain a public key"
raise ClanError(msg)
return publickey
return publickey, key_type

View File

@@ -5,7 +5,7 @@ from pathlib import Path
from clan_cli.errors import ClanError
from .sops import get_public_key
from .sops import get_public_age_key
VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$")
VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
@@ -24,7 +24,7 @@ def public_or_private_age_key_type(arg_value: str) -> str:
if arg_value.startswith("age1"):
return arg_value.strip()
if arg_value.startswith("AGE-SECRET-KEY-"):
return get_public_key(arg_value)
return get_public_age_key(arg_value)
if not arg_value.startswith("age1"):
msg = f"Please provide an age key starting with age1, got: '{arg_value}'"
raise ClanError(msg)

View File

@@ -1,11 +1,13 @@
import argparse
import json
import sys
from pathlib import Path
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
from clan_cli.errors import ClanError
from clan_cli.git import commit_files
from . import secrets
from . import secrets, sops
from .folders import list_objects, remove_object, sops_secrets_folder, sops_users_folder
from .secrets import update_secrets
from .sops import read_key, write_key
@@ -17,13 +19,19 @@ from .types import (
)
def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None:
def add_user(
flake_dir: Path,
name: str,
key: str,
key_type: sops.KeyType,
force: bool,
) -> None:
path = sops_users_folder(flake_dir) / name
def filter_user_secrets(secret: Path) -> bool:
return secret.joinpath("users", name).exists()
write_key(path, key, force)
write_key(path, key, key_type, overwrite=force)
paths = [path]
paths.extend(update_secrets(flake_dir, filter_secrets=filter_user_secrets))
commit_files(
@@ -42,8 +50,9 @@ def remove_user(flake_dir: Path, name: str) -> None:
)
def get_user(flake_dir: Path, name: str) -> str:
return read_key(sops_users_folder(flake_dir) / name)
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
key, key_type = read_key(sops_users_folder(flake_dir) / name)
return sops.SopsKey(key, name, key_type)
def list_users(flake_dir: Path) -> list[str]:
@@ -95,14 +104,24 @@ def add_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
add_user(args.flake.path, args.user, args.key, args.force)
if args.age_key or args.agekey:
key_type = sops.KeyType.AGE
elif args.pgp_key:
key_type = sops.KeyType.PGP
else:
msg = "BUG!: key type not set"
raise ValueError(msg)
key = args.agekey or args.age_key or args.pgp_key
assert key is not None, "key is None"
add_user(args.flake.path, args.user, key, key_type, args.force)
def get_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
print(get_user(args.flake.path, args.user))
key = get_user(args.flake.path, args.user)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
def remove_command(args: argparse.Namespace) -> None:
@@ -141,12 +160,29 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
"-f", "--force", help="overwrite existing user", action="store_true"
)
add_parser.add_argument("user", help="the name of the user", type=user_name_type)
add_parser.add_argument(
"key",
help="public key or private key of the user."
"Execute 'clan secrets key --help' on how to retrieve a key."
key_type = add_parser.add_mutually_exclusive_group(required=True)
key_type.add_argument(
"agekey",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
nargs="?",
)
key_type.add_argument(
"--age-key",
help="public or private age key of the user. "
"Execute 'clan secrets key --help' on how to retrieve a key. "
"To fetch an age key from an SSH host key: ssh-keyscan <domain_name> | nix shell nixpkgs#ssh-to-age -c ssh-to-age",
type=public_or_private_age_key_type,
)
key_type.add_argument(
"--pgp-key",
help=(
"public PGP encryption key of the user. "
# Use --fingerprint --fingerprint to get fingerprints for subkeys:
"Execute `gpg -k --fingerprint --fingerprint` and remove spaces to get it."
),
)
add_parser.set_defaults(func=add_command)