diff --git a/flake-parts/formatting.nix b/flake-parts/formatting.nix index d4263a035..99b30085d 100644 --- a/flake-parts/formatting.nix +++ b/flake-parts/formatting.nix @@ -38,6 +38,8 @@ includes = [ "*.py" ]; }; + # we already run treefmt in ci + pre-commit.check.enable = false; # activated in devShells via inputsFrom = [config.pre-commit.devShell]; pre-commit.settings.hooks.format-all = { name = "format-all"; diff --git a/pkgs/clan-cli/clan_cli/__init__.py b/pkgs/clan-cli/clan_cli/__init__.py index ab4db80b3..dfc11c9fd 100644 --- a/pkgs/clan-cli/clan_cli/__init__.py +++ b/pkgs/clan-cli/clan_cli/__init__.py @@ -2,7 +2,7 @@ import argparse import sys -from . import admin, ssh +from . import admin, secrets, ssh from .errors import ClanError has_argcomplete = True @@ -23,6 +23,9 @@ def main() -> None: parser_ssh = subparsers.add_parser("ssh", help="ssh to a remote machine") ssh.register_parser(parser_ssh) + parser_secrets = subparsers.add_parser("secrets", help="manage secrets") + secrets.register_parser(parser_secrets) + if has_argcomplete: argcomplete.autocomplete(parser) diff --git a/pkgs/clan-cli/clan_cli/secrets/__init__.py b/pkgs/clan-cli/clan_cli/secrets/__init__.py new file mode 100644 index 000000000..ff76ee00f --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/__init__.py @@ -0,0 +1,28 @@ +# !/usr/bin/env python3 +import argparse + +from .groups import register_groups_parser +from .machines import register_machines_parser +from .secrets import register_secrets_parser +from .users import register_users_parser + + +# takes a (sub)parser and configures it +def register_parser(parser: argparse.ArgumentParser) -> None: + subparser = parser.add_subparsers( + title="command", + description="the command to run", + help="the command to run", + required=True, + ) + + groups_parser = subparser.add_parser("groups", help="manage groups") + register_groups_parser(groups_parser) + + users_parser = subparser.add_parser("users", help="manage users") + register_users_parser(users_parser) + + machines_parser = subparser.add_parser("machines", help="manage machines") + register_machines_parser(machines_parser) + + register_secrets_parser(subparser) diff --git a/pkgs/clan-cli/clan_cli/secrets/folders.py b/pkgs/clan-cli/clan_cli/secrets/folders.py new file mode 100644 index 000000000..aa7ac26a0 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/folders.py @@ -0,0 +1,71 @@ +import json +import os +import shutil +from pathlib import Path +from typing import Callable + +from ..dirs import get_clan_flake_toplevel +from ..errors import ClanError + + +def get_sops_folder() -> Path: + return get_clan_flake_toplevel() / "sops" + + +def gen_sops_subfolder(subdir: str) -> Callable[[], Path]: + def folder() -> Path: + return get_clan_flake_toplevel() / "sops" / subdir + + return folder + + +sops_secrets_folder = gen_sops_subfolder("secrets") +sops_users_folder = gen_sops_subfolder("users") +sops_machines_folder = gen_sops_subfolder("machines") +sops_groups_folder = gen_sops_subfolder("groups") + + +def list_objects(path: Path, is_valid: Callable[[str], bool]) -> None: + if not path.exists(): + return + for f in os.listdir(path): + if is_valid(f): + print(f) + + +def remove_object(path: Path, name: str) -> None: + try: + shutil.rmtree(path / name) + except FileNotFoundError: + raise ClanError(f"{name} not found in {path}") + if not os.listdir(path): + os.rmdir(path) + + +def add_key(path: Path, publickey: str, overwrite: bool) -> None: + path.mkdir(parents=True, exist_ok=True) + try: + flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC + if not overwrite: + flags |= os.O_EXCL + fd = os.open(path / "key.json", flags) + except FileExistsError: + raise ClanError(f"{path.name} already exists in {path}") + with os.fdopen(fd, "w") as f: + json.dump({"publickey": publickey, "type": "age"}, f, indent=2) + + +def read_key(path: Path) -> str: + with open(path / "key.json") as f: + try: + key = json.load(f) + except json.JSONDecodeError as e: + raise ClanError(f"Failed to decode {path.name}: {e}") + if key["type"] != "age": + raise ClanError( + f"{path.name} is not an age key but {key['type']}. This is not supported" + ) + publickey = key.get("publickey") + if not publickey: + raise ClanError(f"{path.name} does not contain a public key") + return publickey diff --git a/pkgs/clan-cli/clan_cli/secrets/groups.py b/pkgs/clan-cli/clan_cli/secrets/groups.py new file mode 100644 index 000000000..264c43b7d --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/groups.py @@ -0,0 +1,177 @@ +import argparse +import os +from pathlib import Path + +from ..errors import ClanError +from . import secrets +from .folders import sops_groups_folder, sops_machines_folder, sops_users_folder +from .types import ( + VALID_USER_NAME, + group_name_type, + machine_name_type, + secret_name_type, + user_name_type, + validate_hostname, +) + + +def machines_folder(group: str) -> Path: + return sops_groups_folder() / group / "machines" + + +def users_folder(group: str) -> Path: + return sops_groups_folder() / group / "users" + + +# TODO: make this a tree +def list_command(args: argparse.Namespace) -> None: + folder = sops_groups_folder() + if not folder.exists(): + return + + for group in os.listdir(folder): + group_folder = folder / group + if not group_folder.is_dir(): + continue + print(group) + machines = machines_folder(group) + if machines.is_dir(): + print("machines:") + for f in machines.iterdir(): + if validate_hostname(f.name): + print(f.name) + users = users_folder(group) + if users.is_dir(): + print("users:") + for f in users.iterdir(): + if VALID_USER_NAME.match(f.name): + print(f) + + +def add_member(group_folder: Path, source_folder: Path, name: str) -> None: + source = source_folder / name + if not source.exists(): + raise ClanError(f"{name} does not exist in {source_folder}") + group_folder.mkdir(parents=True, exist_ok=True) + user_target = group_folder / name + if user_target.exists(): + if not user_target.is_symlink(): + raise ClanError( + f"Cannot add user {name}. {user_target} exists but is not a symlink" + ) + os.remove(user_target) + user_target.symlink_to(source) + + +def remove_member(group_folder: Path, name: str) -> None: + target = group_folder / name + if not target.exists(): + raise ClanError(f"{name} does not exist in group in {group_folder}") + os.remove(target) + + if len(os.listdir(group_folder)) == 0: + os.rmdir(group_folder) + + if len(os.listdir(group_folder.parent)) == 0: + os.rmdir(group_folder.parent) + + +def add_user_command(args: argparse.Namespace) -> None: + add_member(users_folder(args.group), sops_users_folder(), args.user) + + +def remove_user_command(args: argparse.Namespace) -> None: + remove_member(users_folder(args.group), args.user) + + +def add_machine_command(args: argparse.Namespace) -> None: + add_member( + machines_folder(args.group), + sops_machines_folder(), + args.machine, + ) + + +def remove_machine_command(args: argparse.Namespace) -> None: + remove_member(machines_folder(args.group), args.machine) + + +def add_group_argument(parser: argparse.ArgumentParser) -> None: + parser.add_argument("group", help="the name of the secret", type=group_name_type) + + +def add_secret_command(args: argparse.Namespace) -> None: + secrets.allow_member( + secrets.groups_folder(args.group), sops_machines_folder(), args.group + ) + + +def remove_secret_command(args: argparse.Namespace) -> None: + secrets.disallow_member(secrets.groups_folder(args.group), args.group) + + +def register_groups_parser(parser: argparse.ArgumentParser) -> None: + subparser = parser.add_subparsers( + title="command", + description="the command to run", + help="the command to run", + required=True, + ) + list_parser = subparser.add_parser("list", help="list groups") + list_parser.set_defaults(func=list_command) + + add_machine_parser = subparser.add_parser( + "add-machine", help="add a machine to group" + ) + add_group_argument(add_machine_parser) + add_machine_parser.add_argument( + "machine", help="the name of the machines to add", type=machine_name_type + ) + add_machine_parser.set_defaults(func=add_machine_command) + + remove_machine_parser = subparser.add_parser( + "remove-machine", help="remove a machine from group" + ) + add_group_argument(remove_machine_parser) + remove_machine_parser.add_argument( + "machine", help="the name of the machines to remove", type=machine_name_type + ) + remove_machine_parser.set_defaults(func=remove_machine_command) + + add_user_parser = subparser.add_parser("add-user", help="add a user to group") + add_group_argument(add_user_parser) + add_user_parser.add_argument( + "user", help="the name of the user to add", type=user_name_type + ) + add_user_parser.set_defaults(func=add_user_command) + + remove_user_parser = subparser.add_parser( + "remove-user", help="remove a user from group" + ) + add_group_argument(remove_user_parser) + remove_user_parser.add_argument( + "user", help="the name of the user to remove", type=user_name_type + ) + remove_user_parser.set_defaults(func=remove_user_command) + + add_secret_parser = subparser.add_parser( + "add-secret", help="allow a user to access a secret" + ) + add_secret_parser.add_argument( + "group", help="the name of the user", type=group_name_type + ) + add_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + add_secret_parser.set_defaults(func=add_secret_command) + + remove_secret_parser = subparser.add_parser( + "remove-secret", help="remove a group's access to a secret" + ) + remove_secret_parser.add_argument( + "group", help="the name of the group", type=group_name_type + ) + remove_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + remove_secret_parser.set_defaults(func=remove_secret_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/machines.py b/pkgs/clan-cli/clan_cli/secrets/machines.py new file mode 100644 index 000000000..b08804e02 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/machines.py @@ -0,0 +1,89 @@ +import argparse + +from . import secrets +from .folders import add_key, list_objects, remove_object, sops_machines_folder +from .types import ( + machine_name_type, + public_or_private_age_key_type, + secret_name_type, + validate_hostname, +) + + +def list_command(args: argparse.Namespace) -> None: + list_objects(sops_machines_folder(), lambda x: validate_hostname(x)) + + +def add_command(args: argparse.Namespace) -> None: + add_key(sops_machines_folder() / args.machine, args.key, args.force) + + +def remove_command(args: argparse.Namespace) -> None: + remove_object(sops_machines_folder(), args.machine) + + +def add_secret_command(args: argparse.Namespace) -> None: + secrets.allow_member( + secrets.machines_folder(args.group), sops_machines_folder(), args.machine + ) + + +def remove_secret_command(args: argparse.Namespace) -> None: + secrets.disallow_member(secrets.machines_folder(args.group), args.machine) + + +def register_machines_parser(parser: argparse.ArgumentParser) -> None: + subparser = parser.add_subparsers( + title="command", + description="the command to run", + help="the command to run", + required=True, + ) + list_parser = subparser.add_parser("list", help="list machines") + list_parser.set_defaults(func=list_command) + + add_parser = subparser.add_parser("add", help="add a machine") + add_parser.add_argument( + "-f", + "--force", + help="overwrite existing machine", + action="store_true", + default=False, + ) + add_parser.add_argument( + "machine", help="the name of the machine", type=machine_name_type + ) + add_parser.add_argument( + "key", + help="public key or private key of the user", + type=public_or_private_age_key_type, + ) + add_parser.set_defaults(func=add_command) + + remove_parser = subparser.add_parser("remove", help="remove a machine") + remove_parser.add_argument( + "machine", help="the name of the machine", type=machine_name_type + ) + remove_parser.set_defaults(func=remove_command) + + add_secret_parser = subparser.add_parser( + "add-secret", help="allow a machine to access a secret" + ) + add_secret_parser.add_argument( + "machine", help="the name of the machine", type=machine_name_type + ) + add_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + add_secret_parser.set_defaults(func=add_secret_command) + + remove_secret_parser = subparser.add_parser( + "remove-secret", help="remove a group's access to a secret" + ) + remove_secret_parser.add_argument( + "machine", help="the name of the group", type=machine_name_type + ) + remove_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + remove_secret_parser.set_defaults(func=remove_secret_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/secrets.py b/pkgs/clan-cli/clan_cli/secrets/secrets.py new file mode 100644 index 000000000..eda8657f0 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/secrets.py @@ -0,0 +1,124 @@ +import argparse +import getpass +import os +import subprocess +import sys +from io import StringIO +from pathlib import Path +from typing import IO + +from .. import tty +from ..errors import ClanError +from ..nix import nix_shell +from .folders import list_objects, sops_secrets_folder +from .sops import SopsKey, encrypt_file, ensure_sops_key, read_key +from .types import VALID_SECRET_NAME, secret_name_type + + +def list_command(args: argparse.Namespace) -> None: + list_objects( + sops_secrets_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None + ) + + +def get_command(args: argparse.Namespace) -> None: + secret: str = args.secret + ensure_sops_key() + secret_path = sops_secrets_folder() / secret / "secret" + if not secret_path.exists(): + raise ClanError(f"Secret '{secret}' does not exist") + cmd = nix_shell(["sops"], ["sops", "--decrypt", str(secret_path)]) + res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True) + print(res.stdout, end="") + + +def encrypt_secret(key: SopsKey, secret: Path, value: IO[str]) -> None: + keys = set([key.pubkey]) + for kind in ["users", "machines", "groups"]: + if not (sops_secrets_folder() / kind).is_dir(): + continue + k = read_key(sops_secrets_folder() / kind) + keys.add(k) + encrypt_file(secret / "secret", value, list(sorted(keys))) + + +def set_command(args: argparse.Namespace) -> None: + secret: str = args.secret + key = ensure_sops_key() + secret_value = os.environ.get("SOPS_NIX_SECRET") + if secret_value: + encrypt_secret(key, sops_secrets_folder() / secret, StringIO(secret_value)) + elif tty.is_interactive(): + secret = getpass.getpass(prompt="Paste your secret: ") + encrypt_secret(key, sops_secrets_folder() / secret, StringIO(secret)) + else: + encrypt_secret(key, sops_secrets_folder() / secret, sys.stdin) + + +def remove_command(args: argparse.Namespace) -> None: + secret: str = args.secret + path = sops_secrets_folder() / secret + if not path.exists(): + raise ClanError(f"Secret '{secret}' does not exist") + path.unlink() + + +def add_secret_argument(parser: argparse.ArgumentParser) -> None: + parser.add_argument("secret", help="the name of the secret", type=secret_name_type) + + +def allow_member(group_folder: Path, source_folder: Path, name: str) -> None: + source = source_folder / name + if not source.exists(): + raise ClanError(f"{name} does not exist in {source_folder}") + group_folder.mkdir(parents=True, exist_ok=True) + user_target = group_folder / name + if user_target.exists(): + if not user_target.is_symlink(): + raise ClanError( + f"Cannot add user {name}. {user_target} exists but is not a symlink" + ) + os.remove(user_target) + user_target.symlink_to(source) + + +def disallow_member(group_folder: Path, name: str) -> None: + target = group_folder / name + if not target.exists(): + raise ClanError(f"{name} does not exist in group in {group_folder}") + os.remove(target) + + if len(os.listdir(group_folder)) == 0: + os.rmdir(group_folder) + + if len(os.listdir(group_folder.parent)) == 0: + os.rmdir(group_folder.parent) + + +def machines_folder(group: str) -> Path: + return sops_secrets_folder() / group / "machines" + + +def users_folder(group: str) -> Path: + return sops_secrets_folder() / group / "users" + + +def groups_folder(group: str) -> Path: + return sops_secrets_folder() / group / "groups" + + +def register_secrets_parser(subparser: argparse._SubParsersAction) -> None: + parser_list = subparser.add_parser("list", help="list secrets") + parser_list.set_defaults(func=list_command) + + parser_get = subparser.add_parser("get", help="get a secret") + add_secret_argument(parser_get) + parser_get.set_defaults(func=get_command) + + parser_set = subparser.add_parser("set", help="set a secret") + add_secret_argument(parser_set) + parser_set.set_defaults(func=set_command) + + parser_delete = subparser.add_parser("remove", help="remove a secret") + add_secret_argument(parser_delete) + parser_delete.set_defaults(func=remove_command) diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py new file mode 100644 index 000000000..a995f46f4 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -0,0 +1,124 @@ +import os +import shutil +import subprocess +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import IO + +from .. import tty +from ..dirs import user_config_dir +from ..nix import nix_shell +from .folders import add_key, read_key, sops_users_folder + + +class SopsKey: + def __init__(self, pubkey: str) -> None: + self.pubkey = pubkey + + +def get_public_key(privkey: str) -> str: + cmd = nix_shell(["age"], ["age-keygen", "-y"]) + res = subprocess.run( + cmd, input=privkey, check=True, stdout=subprocess.PIPE, text=True + ) + return res.stdout.strip() + + +def get_unique_user(users_folder: Path, user: str) -> str: + """Return a unique path in the users_folder for the given user.""" + i = 0 + path = users_folder / user + while path.exists(): + i += 1 + user = user + str(i) + path = users_folder / user + return user + + +def get_user_name(user: str) -> str: + """Ask the user for their name until a unique one is provided.""" + while True: + name = input( + f"Enter your user name for which the key will be stored as [{user}]: " + ) + if name: + user = name + if not (sops_users_folder() / user).exists(): + return user + print(f"{sops_users_folder() / user} already exists") + + +def ensure_user(pub_key: str) -> SopsKey: + key = SopsKey(pub_key) + users_folder = sops_users_folder() + + # Check if the public key already exists for any user + if users_folder.exists(): + for user in users_folder.iterdir(): + if not user.is_dir(): + continue + if read_key(user) == pub_key: + return key + + # Find a unique user name if the public key is not found + try: + loginname = os.getlogin() + except OSError: + loginname = os.environ.get("USER", "nobody") + username = get_unique_user(users_folder, loginname) + + if tty.is_interactive(): + # Ask the user for their name until a unique one is provided + username = get_user_name(username) + + # Add the public key for the user + add_key(users_folder / username, pub_key, False) + + return key + + +def ensure_sops_key() -> SopsKey: + key = os.environ.get("SOPS_AGE_KEY") + if key: + return ensure_user(get_public_key(key)) + raw_path = os.environ.get("SOPS_AGE_KEY_FILE") + if raw_path: + path = Path(raw_path) + else: + path = user_config_dir() / "sops" / "age" / "keys.txt" + if path.exists(): + return ensure_user(get_public_key(path.read_text())) + path.parent.mkdir(parents=True, exist_ok=True) + cmd = nix_shell(["age"], ["age-keygen", "-o", str(path)]) + subprocess.run(cmd, check=True) + + tty.info( + f"Generated age key at '{path}'. Please back it up on a secure location or you will lose access to your secrets." + ) + return ensure_user(get_public_key(path.read_text())) + + +def encrypt_file(secret_path: Path, content: IO[str], keys: list[str]) -> None: + folder = secret_path.parent + folder.mkdir(parents=True, exist_ok=True) + + # hopefully /tmp is written to an in-memory file to avoid leaking secrets + with NamedTemporaryFile(delete=False) as f: + try: + with open(f.name, "w") as fd: + shutil.copyfileobj(content, fd) + args = ["sops"] + for key in keys: + args.extend(["--age", key]) + args.extend(["-i", "--encrypt", str(f.name)]) + cmd = nix_shell(["sops"], args) + subprocess.run(cmd, check=True) + # atomic copy of the encrypted file + with NamedTemporaryFile(dir=folder, delete=False) as f2: + shutil.copyfile(f.name, f2.name) + os.rename(f2.name, secret_path) + finally: + try: + os.remove(f.name) + except OSError: + pass diff --git a/pkgs/clan-cli/clan_cli/secrets/types.py b/pkgs/clan-cli/clan_cli/secrets/types.py new file mode 100644 index 000000000..000537657 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/types.py @@ -0,0 +1,71 @@ +import argparse +import os +import re +from pathlib import Path +from typing import Callable + +from ..errors import ClanError +from .sops import get_public_key + +VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$") +VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$") +VALID_HOSTNAME = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", re.IGNORECASE) + + +def validate_hostname(hostname: str) -> bool: + if len(hostname) > 63: + return False + return VALID_HOSTNAME.match(hostname) is not None + + +def secret_name_type(arg_value: str) -> str: + if not VALID_SECRET_NAME.match(arg_value): + raise argparse.ArgumentTypeError( + "Invalid character in secret name. Allowed characters are a-z, A-Z, 0-9, ., -, and _" + ) + return arg_value + + +def machine_name_type(arg_value: str) -> str: + if len(arg_value) > 63: + raise argparse.ArgumentTypeError( + "Machine name must be less than 63 characters long" + ) + if not VALID_SECRET_NAME.match(arg_value): + raise argparse.ArgumentTypeError( + "Invalid character in machine name. Allowed characters are a-z, 0-9, ., -, and _. Must not start with a number" + ) + return arg_value + + +def public_or_private_age_key_type(arg_value: str) -> str: + if os.path.isfile(arg_value): + arg_value = Path(arg_value).read_text().strip() + if arg_value.startswith("age1"): + return arg_value.strip() + if arg_value.startswith("AGE-SECRET-KEY-"): + return get_public_key(arg_value) + if not arg_value.startswith("age1"): + raise ClanError( + f"Please provide an age key starting with age1, got: '{arg_value}'" + ) + return arg_value + + +def group_or_user_name_type(what: str) -> Callable[[str], str]: + def name_type(arg_value: str) -> str: + if len(arg_value) > 32: + raise argparse.ArgumentTypeError( + f"{what.capitalize()} name must be less than 32 characters long" + ) + if not VALID_USER_NAME.match(arg_value): + raise argparse.ArgumentTypeError( + f"Invalid character in {what} name. Allowed characters are a-z, 0-9, -, and _. Must start with a letter or _" + ) + return arg_value + + return name_type + + +user_name_type = group_or_user_name_type("user") +group_name_type = group_or_user_name_type("group") diff --git a/pkgs/clan-cli/clan_cli/secrets/users.py b/pkgs/clan-cli/clan_cli/secrets/users.py new file mode 100644 index 000000000..f9151e6af --- /dev/null +++ b/pkgs/clan-cli/clan_cli/secrets/users.py @@ -0,0 +1,96 @@ +import argparse + +from . import secrets +from .folders import add_key, list_objects, remove_object, sops_users_folder +from .types import ( + VALID_SECRET_NAME, + public_or_private_age_key_type, + secret_name_type, + user_name_type, +) + + +def add_user(name: str, key: str, force: bool) -> None: + add_key(sops_users_folder() / name, key, force) + + +def list_command(args: argparse.Namespace) -> None: + list_objects(sops_users_folder(), lambda n: VALID_SECRET_NAME.match(n) is not None) + + +def add_command(args: argparse.Namespace) -> None: + add_user(args.user, args.key, args.force) + + +def remove_command(args: argparse.Namespace) -> None: + remove_object(sops_users_folder(), args.user) + + +def add_secret_command(args: argparse.Namespace) -> None: + secrets.allow_member( + secrets.groups_folder(args.group), sops_users_folder(), args.group + ) + + +def remove_secret_command(args: argparse.Namespace) -> None: + secrets.disallow_member(secrets.groups_folder(args.group), args.group) + + +def register_users_parser(parser: argparse.ArgumentParser) -> None: + subparser = parser.add_subparsers( + title="command", + description="the command to run", + help="the command to run", + required=True, + ) + list_parser = subparser.add_parser("list", help="list users") + list_parser.set_defaults(func=list_command) + + add_parser = subparser.add_parser("add", help="add a user") + add_parser.add_argument( + "-f", "--force", help="overwrite existing user", action="store_true" + ) + add_parser.add_argument("user", help="the name of the user", type=user_name_type) + add_parser.add_argument( + "key", + help="public key or private key of the user", + type=public_or_private_age_key_type, + ) + add_parser.set_defaults(func=add_command) + + remove_parser = subparser.add_parser("remove", help="remove a user") + remove_parser.add_argument("user", help="the name of the user", type=user_name_type) + remove_parser.set_defaults(func=remove_command) + + add_secret_parser = subparser.add_parser( + "add-secret", help="allow a user to access a secret" + ) + add_secret_parser.add_argument( + "user", help="the name of the group", type=user_name_type + ) + add_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + add_secret_parser.set_defaults(func=add_secret_command) + + add_secret_parser = subparser.add_parser( + "add-secret", help="allow a machine to access a secret" + ) + add_secret_parser.add_argument( + "user", help="the name of the group", type=user_name_type + ) + add_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + add_secret_parser.set_defaults(func=add_secret_command) + + remove_secret_parser = subparser.add_parser( + "remove-secret", help="remove a user's access to a secret" + ) + add_secret_parser.add_argument( + "user", help="the name of the group", type=user_name_type + ) + remove_secret_parser.add_argument( + "secret", help="the name of the secret", type=secret_name_type + ) + remove_secret_parser.set_defaults(func=remove_secret_command) diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index 8b8eebc94..db177b796 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -6,6 +6,8 @@ , installShellFiles , zerotierone , bubblewrap +, sops +, age , self }: let @@ -71,7 +73,7 @@ let clan-pytest = runCommand "${name}-tests" { - nativeBuildInputs = [ zerotierone bubblewrap ]; + nativeBuildInputs = [ zerotierone bubblewrap sops age ]; } '' cp -r ${src} ./src chmod +w -R ./src diff --git a/pkgs/clan-cli/flake-module.nix b/pkgs/clan-cli/flake-module.nix index ade014fc1..a617bf145 100644 --- a/pkgs/clan-cli/flake-module.nix +++ b/pkgs/clan-cli/flake-module.nix @@ -18,7 +18,9 @@ openssh sshpass zbar - tor; + tor + sops + age; # Override license so that we can build zerotierone without # having to re-import nixpkgs. zerotierone = pkgs.zerotierone.overrideAttrs (_old: { meta = { }; }); diff --git a/pkgs/clan-cli/tests/test_secrets.py b/pkgs/clan-cli/tests/test_secrets.py new file mode 100644 index 000000000..59e3d0601 --- /dev/null +++ b/pkgs/clan-cli/tests/test_secrets.py @@ -0,0 +1,120 @@ +import argparse +import os +from pathlib import Path + +import pytest +from environment import mock_env + +from clan_cli.errors import ClanError +from clan_cli.secrets import register_parser + + +class SecretCli: + def __init__(self) -> None: + self.parser = argparse.ArgumentParser() + register_parser(self.parser) + + def run(self, args: list[str]) -> argparse.Namespace: + parsed = self.parser.parse_args(args) + parsed.func(parsed) + return parsed + + +PUBKEY = "age1dhwqzkah943xzc34tc3dlmfayyevcmdmxzjezdgdy33euxwf59vsp3vk3c" +PRIVKEY = "AGE-SECRET-KEY-1KF8E3SR3TTGL6M476SKF7EEMR4H9NF7ZWYSLJUAK8JX276JC7KUSSURKFK" + + +def _test_identities( + what: str, clan_flake: Path, capsys: pytest.CaptureFixture +) -> None: + cli = SecretCli() + sops_folder = clan_flake / "sops" + + cli.run([what, "add", "foo", PUBKEY]) + assert (sops_folder / what / "foo" / "key.json").exists() + with pytest.raises(ClanError): + cli.run([what, "add", "foo", PUBKEY]) + + cli.run( + [ + what, + "add", + "-f", + "foo", + PRIVKEY, + ] + ) + capsys.readouterr() # empty the buffer + + cli.run([what, "list"]) + out = capsys.readouterr() # empty the buffer + assert "foo" in out.out + + cli.run([what, "remove", "foo"]) + assert not (sops_folder / what / "foo" / "key.json").exists() + + with pytest.raises(ClanError): # already removed + cli.run([what, "remove", "foo"]) + + capsys.readouterr() + cli.run([what, "list"]) + out = capsys.readouterr() + assert "foo" not in out.out + + +def test_users(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: + _test_identities("users", clan_flake, capsys) + + +def test_machines(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: + _test_identities("machines", clan_flake, capsys) + + +def test_groups(clan_flake: Path, capsys: pytest.CaptureFixture) -> None: + cli = SecretCli() + capsys.readouterr() # empty the buffer + cli.run(["groups", "list"]) + assert capsys.readouterr().out == "" + + with pytest.raises(ClanError): # machine does not exist yet + cli.run(["groups", "add-machine", "group1", "machine1"]) + with pytest.raises(ClanError): # user does not exist yet + cli.run(["groups", "add-user", "groupb1", "user1"]) + cli.run(["machines", "add", "machine1", PUBKEY]) + cli.run(["groups", "add-machine", "group1", "machine1"]) + + # Should this fail? + cli.run(["groups", "add-machine", "group1", "machine1"]) + + cli.run(["users", "add", "user1", PUBKEY]) + cli.run(["groups", "add-user", "group1", "user1"]) + + capsys.readouterr() # empty the buffer + cli.run(["groups", "list"]) + out = capsys.readouterr().out + assert "user1" in out + assert "machine1" in out + + cli.run(["groups", "remove-user", "group1", "user1"]) + cli.run(["groups", "remove-machine", "group1", "machine1"]) + groups = os.listdir(clan_flake / "sops" / "groups") + assert len(groups) == 0 + + +def test_secrets( + clan_flake: Path, capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch +) -> None: + cli = SecretCli() + capsys.readouterr() # empty the buffer + cli.run(["list"]) + assert capsys.readouterr().out == "" + + with pytest.raises(ClanError): # does not exist yet + cli.run(["get", "nonexisting"]) + with mock_env( + SOPS_NIX_SECRET="foo", SOPS_AGE_KEY_FILE=str(clan_flake / ".." / "age.key") + ): + cli.run(["set", "nonexisting"]) + capsys.readouterr() + cli.run(["get", "nonexisting"]) + assert capsys.readouterr().out == "foo"