Added flake_name:str argument everywhere, nix fmt doesn't complain anymore
This commit is contained in:
@@ -3,17 +3,17 @@ import shutil
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from ..dirs import get_clan_flake_toplevel
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..errors import ClanError
|
||||
|
||||
|
||||
def get_sops_folder() -> Path:
|
||||
return get_clan_flake_toplevel() / "sops"
|
||||
def get_sops_folder(flake_name: str) -> Path:
|
||||
return specific_flake_dir(flake_name) / "sops"
|
||||
|
||||
|
||||
def gen_sops_subfolder(subdir: str) -> Callable[[], Path]:
|
||||
def folder() -> Path:
|
||||
return get_clan_flake_toplevel() / "sops" / subdir
|
||||
def gen_sops_subfolder(subdir: str) -> Callable[[str], Path]:
|
||||
def folder(flake_name: str) -> Path:
|
||||
return specific_flake_dir(flake_name) / "sops" / subdir
|
||||
|
||||
return folder
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import sys
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..machines.machines import Machine
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -29,7 +30,7 @@ def generate_secrets(machine: Machine) -> None:
|
||||
|
||||
|
||||
def generate_command(args: argparse.Namespace) -> None:
|
||||
machine = Machine(args.machine)
|
||||
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
|
||||
generate_secrets(machine)
|
||||
|
||||
|
||||
@@ -38,4 +39,9 @@ def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
||||
"machine",
|
||||
help="The machine to generate secrets for",
|
||||
)
|
||||
parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser.set_defaults(func=generate_command)
|
||||
|
||||
@@ -20,24 +20,27 @@ from .types import (
|
||||
)
|
||||
|
||||
|
||||
def machines_folder(group: str) -> Path:
|
||||
return sops_groups_folder() / group / "machines"
|
||||
def machines_folder(flake_name: str, group: str) -> Path:
|
||||
return sops_groups_folder(flake_name) / group / "machines"
|
||||
|
||||
|
||||
def users_folder(group: str) -> Path:
|
||||
return sops_groups_folder() / group / "users"
|
||||
def users_folder(flake_name: str, group: str) -> Path:
|
||||
return sops_groups_folder(flake_name) / group / "users"
|
||||
|
||||
|
||||
class Group:
|
||||
def __init__(self, name: str, machines: list[str], users: list[str]) -> None:
|
||||
def __init__(
|
||||
self, flake_name: str, name: str, machines: list[str], users: list[str]
|
||||
) -> None:
|
||||
self.name = name
|
||||
self.machines = machines
|
||||
self.users = users
|
||||
self.flake_name = flake_name
|
||||
|
||||
|
||||
def list_groups() -> list[Group]:
|
||||
def list_groups(flake_name: str) -> list[Group]:
|
||||
groups: list[Group] = []
|
||||
folder = sops_groups_folder()
|
||||
folder = sops_groups_folder(flake_name)
|
||||
if not folder.exists():
|
||||
return groups
|
||||
|
||||
@@ -45,24 +48,24 @@ def list_groups() -> list[Group]:
|
||||
group_folder = folder / name
|
||||
if not group_folder.is_dir():
|
||||
continue
|
||||
machines_path = machines_folder(name)
|
||||
machines_path = machines_folder(flake_name, name)
|
||||
machines = []
|
||||
if machines_path.is_dir():
|
||||
for f in machines_path.iterdir():
|
||||
if validate_hostname(f.name):
|
||||
machines.append(f.name)
|
||||
users_path = users_folder(name)
|
||||
users_path = users_folder(flake_name, name)
|
||||
users = []
|
||||
if users_path.is_dir():
|
||||
for f in users_path.iterdir():
|
||||
if VALID_USER_NAME.match(f.name):
|
||||
users.append(f.name)
|
||||
groups.append(Group(name, machines, users))
|
||||
groups.append(Group(flake_name, name, machines, users))
|
||||
return groups
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
for group in list_groups():
|
||||
for group in list_groups(args.flake):
|
||||
print(group.name)
|
||||
if group.machines:
|
||||
print("machines:")
|
||||
@@ -84,9 +87,9 @@ def list_directory(directory: Path) -> str:
|
||||
return msg
|
||||
|
||||
|
||||
def update_group_keys(group: str) -> None:
|
||||
for secret_ in secrets.list_secrets():
|
||||
secret = sops_secrets_folder() / secret_
|
||||
def update_group_keys(flake_name: str, group: str) -> None:
|
||||
for secret_ in secrets.list_secrets(flake_name):
|
||||
secret = sops_secrets_folder(flake_name) / secret_
|
||||
if (secret / "groups" / group).is_symlink():
|
||||
update_keys(
|
||||
secret,
|
||||
@@ -94,7 +97,9 @@ def update_group_keys(group: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
def add_member(
|
||||
flake_name: str, group_folder: Path, source_folder: Path, name: str
|
||||
) -> None:
|
||||
source = source_folder / name
|
||||
if not source.exists():
|
||||
msg = f"{name} does not exist in {source_folder}: "
|
||||
@@ -109,10 +114,10 @@ def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
)
|
||||
os.remove(user_target)
|
||||
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
||||
update_group_keys(group_folder.parent.name)
|
||||
update_group_keys(flake_name, group_folder.parent.name)
|
||||
|
||||
|
||||
def remove_member(group_folder: Path, name: str) -> None:
|
||||
def remove_member(flake_name: str, group_folder: Path, name: str) -> None:
|
||||
target = group_folder / name
|
||||
if not target.exists():
|
||||
msg = f"{name} does not exist in group in {group_folder}: "
|
||||
@@ -121,7 +126,7 @@ def remove_member(group_folder: Path, name: str) -> None:
|
||||
os.remove(target)
|
||||
|
||||
if len(os.listdir(group_folder)) > 0:
|
||||
update_group_keys(group_folder.parent.name)
|
||||
update_group_keys(flake_name, group_folder.parent.name)
|
||||
|
||||
if len(os.listdir(group_folder)) == 0:
|
||||
os.rmdir(group_folder)
|
||||
@@ -130,56 +135,65 @@ def remove_member(group_folder: Path, name: str) -> None:
|
||||
os.rmdir(group_folder.parent)
|
||||
|
||||
|
||||
def add_user(group: str, name: str) -> None:
|
||||
add_member(users_folder(group), sops_users_folder(), name)
|
||||
def add_user(flake_name: str, group: str, name: str) -> None:
|
||||
add_member(
|
||||
flake_name, users_folder(flake_name, group), sops_users_folder(flake_name), name
|
||||
)
|
||||
|
||||
|
||||
def add_user_command(args: argparse.Namespace) -> None:
|
||||
add_user(args.group, args.user)
|
||||
add_user(args.flake, args.group, args.user)
|
||||
|
||||
|
||||
def remove_user(group: str, name: str) -> None:
|
||||
remove_member(users_folder(group), name)
|
||||
def remove_user(flake_name: str, group: str, name: str) -> None:
|
||||
remove_member(flake_name, users_folder(flake_name, group), name)
|
||||
|
||||
|
||||
def remove_user_command(args: argparse.Namespace) -> None:
|
||||
remove_user(args.group, args.user)
|
||||
remove_user(args.flake, args.group, args.user)
|
||||
|
||||
|
||||
def add_machine(group: str, name: str) -> None:
|
||||
add_member(machines_folder(group), sops_machines_folder(), name)
|
||||
def add_machine(flake_name: str, group: str, name: str) -> None:
|
||||
add_member(
|
||||
flake_name,
|
||||
machines_folder(flake_name, group),
|
||||
sops_machines_folder(flake_name),
|
||||
name,
|
||||
)
|
||||
|
||||
|
||||
def add_machine_command(args: argparse.Namespace) -> None:
|
||||
add_machine(args.group, args.machine)
|
||||
add_machine(args.flake, args.group, args.machine)
|
||||
|
||||
|
||||
def remove_machine(group: str, name: str) -> None:
|
||||
remove_member(machines_folder(group), name)
|
||||
def remove_machine(flake_name: str, group: str, name: str) -> None:
|
||||
remove_member(flake_name, machines_folder(flake_name, group), name)
|
||||
|
||||
|
||||
def remove_machine_command(args: argparse.Namespace) -> None:
|
||||
remove_machine(args.group, args.machine)
|
||||
remove_machine(args.flake, args.group, args.machine)
|
||||
|
||||
|
||||
def add_group_argument(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("group", help="the name of the secret", type=group_name_type)
|
||||
|
||||
|
||||
def add_secret(group: str, name: str) -> None:
|
||||
secrets.allow_member(secrets.groups_folder(name), sops_groups_folder(), group)
|
||||
def add_secret(flake_name: str, group: str, name: str) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.groups_folder(flake_name, name), sops_groups_folder(flake_name), group
|
||||
)
|
||||
|
||||
|
||||
def add_secret_command(args: argparse.Namespace) -> None:
|
||||
add_secret(args.group, args.secret)
|
||||
add_secret(args.flake, args.group, args.secret)
|
||||
|
||||
|
||||
def remove_secret(group: str, name: str) -> None:
|
||||
secrets.disallow_member(secrets.groups_folder(name), group)
|
||||
def remove_secret(flake_name: str, group: str, name: str) -> None:
|
||||
secrets.disallow_member(secrets.groups_folder(flake_name, name), group)
|
||||
|
||||
|
||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
remove_secret(args.group, args.secret)
|
||||
remove_secret(args.flake, args.group, args.secret)
|
||||
|
||||
|
||||
def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
||||
@@ -36,14 +36,15 @@ def import_sops(args: argparse.Namespace) -> None:
|
||||
file=sys.stderr,
|
||||
)
|
||||
continue
|
||||
if (sops_secrets_folder() / k / "secret").exists():
|
||||
if (sops_secrets_folder(args.flake) / k / "secret").exists():
|
||||
print(
|
||||
f"WARNING: {k} already exists, skipping",
|
||||
file=sys.stderr,
|
||||
)
|
||||
continue
|
||||
encrypt_secret(
|
||||
sops_secrets_folder() / k,
|
||||
args.flake,
|
||||
sops_secrets_folder(args.flake) / k,
|
||||
v,
|
||||
add_groups=args.group,
|
||||
add_machines=args.machine,
|
||||
|
||||
@@ -7,65 +7,67 @@ from .sops import read_key, write_key
|
||||
from .types import public_or_private_age_key_type, secret_name_type
|
||||
|
||||
|
||||
def add_machine(name: str, key: str, force: bool) -> None:
|
||||
write_key(sops_machines_folder() / name, key, force)
|
||||
def add_machine(flake_name: str, name: str, key: str, force: bool) -> None:
|
||||
write_key(sops_machines_folder(flake_name) / name, key, force)
|
||||
|
||||
|
||||
def remove_machine(name: str) -> None:
|
||||
remove_object(sops_machines_folder(), name)
|
||||
def remove_machine(flake_name: str, name: str) -> None:
|
||||
remove_object(sops_machines_folder(flake_name), name)
|
||||
|
||||
|
||||
def get_machine(name: str) -> str:
|
||||
return read_key(sops_machines_folder() / name)
|
||||
def get_machine(flake_name: str, name: str) -> str:
|
||||
return read_key(sops_machines_folder(flake_name) / name)
|
||||
|
||||
|
||||
def has_machine(name: str) -> bool:
|
||||
return (sops_machines_folder() / name / "key.json").exists()
|
||||
def has_machine(flake_name: str, name: str) -> bool:
|
||||
return (sops_machines_folder(flake_name) / name / "key.json").exists()
|
||||
|
||||
|
||||
def list_machines() -> list[str]:
|
||||
path = sops_machines_folder()
|
||||
def list_machines(flake_name: str) -> list[str]:
|
||||
path = sops_machines_folder(flake_name)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
return validate_hostname(name) and has_machine(name)
|
||||
return validate_hostname(name) and has_machine(flake_name, name)
|
||||
|
||||
return list_objects(path, validate)
|
||||
|
||||
|
||||
def add_secret(machine: str, secret: str) -> None:
|
||||
def add_secret(flake_name: str, machine: str, secret: str) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.machines_folder(secret), sops_machines_folder(), machine
|
||||
secrets.machines_folder(flake_name, secret),
|
||||
sops_machines_folder(flake_name),
|
||||
machine,
|
||||
)
|
||||
|
||||
|
||||
def remove_secret(machine: str, secret: str) -> None:
|
||||
secrets.disallow_member(secrets.machines_folder(secret), machine)
|
||||
def remove_secret(flake_name: str, machine: str, secret: str) -> None:
|
||||
secrets.disallow_member(secrets.machines_folder(flake_name, secret), machine)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
lst = list_machines()
|
||||
lst = list_machines(args.flake)
|
||||
if len(lst) > 0:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def add_command(args: argparse.Namespace) -> None:
|
||||
add_machine(args.machine, args.key, args.force)
|
||||
add_machine(args.flake, args.machine, args.key, args.force)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
print(get_machine(args.machine))
|
||||
print(get_machine(args.flake, args.machine))
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
remove_machine(args.machine)
|
||||
remove_machine(args.flake, args.machine)
|
||||
|
||||
|
||||
def add_secret_command(args: argparse.Namespace) -> None:
|
||||
add_secret(args.machine, args.secret)
|
||||
add_secret(args.flake, args.machine, args.secret)
|
||||
|
||||
|
||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
remove_secret(args.machine, args.secret)
|
||||
remove_secret(args.flake, args.machine, args.secret)
|
||||
|
||||
|
||||
def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
@@ -75,9 +77,16 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
help="the command to run",
|
||||
required=True,
|
||||
)
|
||||
# Parser
|
||||
list_parser = subparser.add_parser("list", help="list machines")
|
||||
list_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
list_parser.set_defaults(func=list_command)
|
||||
|
||||
# Parser
|
||||
add_parser = subparser.add_parser("add", help="add a machine")
|
||||
add_parser.add_argument(
|
||||
"-f",
|
||||
@@ -86,6 +95,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
add_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
@@ -96,21 +110,39 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
)
|
||||
add_parser.set_defaults(func=add_command)
|
||||
|
||||
# Parser
|
||||
get_parser = subparser.add_parser("get", help="get a machine public key")
|
||||
get_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
get_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
get_parser.set_defaults(func=get_command)
|
||||
|
||||
# Parser
|
||||
remove_parser = subparser.add_parser("remove", help="remove a machine")
|
||||
remove_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
remove_parser.set_defaults(func=remove_command)
|
||||
|
||||
# Parser
|
||||
add_secret_parser = subparser.add_parser(
|
||||
"add-secret", help="allow a machine to access a secret"
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
@@ -119,9 +151,15 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
)
|
||||
add_secret_parser.set_defaults(func=add_secret_command)
|
||||
|
||||
# Parser
|
||||
remove_secret_parser = subparser.add_parser(
|
||||
"remove-secret", help="remove a group's access to a secret"
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"machine", help="the name of the group", type=machine_name_type
|
||||
)
|
||||
|
||||
@@ -53,62 +53,79 @@ def collect_keys_for_path(path: Path) -> set[str]:
|
||||
|
||||
|
||||
def encrypt_secret(
|
||||
flake_name: str,
|
||||
secret: Path,
|
||||
value: IO[str] | str | None,
|
||||
add_users: list[str] = [],
|
||||
add_machines: list[str] = [],
|
||||
add_groups: list[str] = [],
|
||||
) -> None:
|
||||
key = ensure_sops_key()
|
||||
key = ensure_sops_key(flake_name)
|
||||
keys = set([])
|
||||
|
||||
for user in add_users:
|
||||
allow_member(users_folder(secret.name), sops_users_folder(), user, False)
|
||||
allow_member(
|
||||
users_folder(flake_name, secret.name),
|
||||
sops_users_folder(flake_name),
|
||||
user,
|
||||
False,
|
||||
)
|
||||
|
||||
for machine in add_machines:
|
||||
allow_member(
|
||||
machines_folder(secret.name), sops_machines_folder(), machine, False
|
||||
machines_folder(flake_name, secret.name),
|
||||
sops_machines_folder(flake_name),
|
||||
machine,
|
||||
False,
|
||||
)
|
||||
|
||||
for group in add_groups:
|
||||
allow_member(groups_folder(secret.name), sops_groups_folder(), group, False)
|
||||
allow_member(
|
||||
groups_folder(flake_name, secret.name),
|
||||
sops_groups_folder(flake_name),
|
||||
group,
|
||||
False,
|
||||
)
|
||||
|
||||
keys = collect_keys_for_path(secret)
|
||||
|
||||
if key.pubkey not in keys:
|
||||
keys.add(key.pubkey)
|
||||
allow_member(
|
||||
users_folder(secret.name), sops_users_folder(), key.username, False
|
||||
users_folder(flake_name, secret.name),
|
||||
sops_users_folder(flake_name),
|
||||
key.username,
|
||||
False,
|
||||
)
|
||||
|
||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||
|
||||
|
||||
def remove_secret(secret: str) -> None:
|
||||
path = sops_secrets_folder() / secret
|
||||
def remove_secret(flake_name: str, secret: str) -> None:
|
||||
path = sops_secrets_folder(flake_name) / secret
|
||||
if not path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
shutil.rmtree(path)
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
remove_secret(args.secret)
|
||||
remove_secret(args.flake, args.secret)
|
||||
|
||||
|
||||
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
|
||||
|
||||
|
||||
def machines_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "machines"
|
||||
def machines_folder(flake_name: str, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "machines"
|
||||
|
||||
|
||||
def users_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "users"
|
||||
def users_folder(flake_name: str, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "users"
|
||||
|
||||
|
||||
def groups_folder(group: str) -> Path:
|
||||
return sops_secrets_folder() / group / "groups"
|
||||
def groups_folder(flake_name: str, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "groups"
|
||||
|
||||
|
||||
def list_directory(directory: Path) -> str:
|
||||
@@ -171,35 +188,37 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def has_secret(secret: str) -> bool:
|
||||
return (sops_secrets_folder() / secret / "secret").exists()
|
||||
def has_secret(flake_name: str, secret: str) -> bool:
|
||||
return (sops_secrets_folder(flake_name) / secret / "secret").exists()
|
||||
|
||||
|
||||
def list_secrets() -> list[str]:
|
||||
path = sops_secrets_folder()
|
||||
def list_secrets(flake_name: str) -> list[str]:
|
||||
path = sops_secrets_folder(flake_name)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
return VALID_SECRET_NAME.match(name) is not None and has_secret(name)
|
||||
return VALID_SECRET_NAME.match(name) is not None and has_secret(
|
||||
flake_name, name
|
||||
)
|
||||
|
||||
return list_objects(path, validate)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
lst = list_secrets()
|
||||
lst = list_secrets(args.flake)
|
||||
if len(lst) > 0:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def decrypt_secret(secret: str) -> str:
|
||||
ensure_sops_key()
|
||||
secret_path = sops_secrets_folder() / secret / "secret"
|
||||
def decrypt_secret(flake_name: str, secret: str) -> str:
|
||||
ensure_sops_key(flake_name)
|
||||
secret_path = sops_secrets_folder(flake_name) / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
return decrypt_file(secret_path)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
print(decrypt_secret(args.secret), end="")
|
||||
print(decrypt_secret(args.flake, args.secret), end="")
|
||||
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
@@ -212,7 +231,8 @@ def set_command(args: argparse.Namespace) -> None:
|
||||
elif tty.is_interactive():
|
||||
secret_value = getpass.getpass(prompt="Paste your secret: ")
|
||||
encrypt_secret(
|
||||
sops_secrets_folder() / args.secret,
|
||||
args.flake,
|
||||
sops_secrets_folder(args.flake) / args.secret,
|
||||
secret_value,
|
||||
args.user,
|
||||
args.machine,
|
||||
@@ -221,8 +241,8 @@ def set_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def rename_command(args: argparse.Namespace) -> None:
|
||||
old_path = sops_secrets_folder() / args.secret
|
||||
new_path = sops_secrets_folder() / args.new_name
|
||||
old_path = sops_secrets_folder(args.flake) / args.secret
|
||||
new_path = sops_secrets_folder(args.flake) / args.new_name
|
||||
if not old_path.exists():
|
||||
raise ClanError(f"Secret '{args.secret}' does not exist")
|
||||
if new_path.exists():
|
||||
@@ -237,9 +257,19 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
parser_get = subparser.add_parser("get", help="get a secret")
|
||||
add_secret_argument(parser_get)
|
||||
parser_get.set_defaults(func=get_command)
|
||||
parser_get.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
|
||||
parser_set = subparser.add_parser("set", help="set a secret")
|
||||
add_secret_argument(parser_set)
|
||||
parser_set.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_set.add_argument(
|
||||
"--group",
|
||||
type=str,
|
||||
|
||||
@@ -51,7 +51,7 @@ def generate_private_key() -> tuple[str, str]:
|
||||
raise ClanError("Failed to generate private sops key") from e
|
||||
|
||||
|
||||
def get_user_name(user: str) -> str:
|
||||
def get_user_name(flake_name: str, user: str) -> str:
|
||||
"""Ask the user for their name until a unique one is provided."""
|
||||
while True:
|
||||
name = input(
|
||||
@@ -59,14 +59,14 @@ def get_user_name(user: str) -> str:
|
||||
)
|
||||
if name:
|
||||
user = name
|
||||
if not (sops_users_folder() / user).exists():
|
||||
if not (sops_users_folder(flake_name) / user).exists():
|
||||
return user
|
||||
print(f"{sops_users_folder() / user} already exists")
|
||||
print(f"{sops_users_folder(flake_name) / user} already exists")
|
||||
|
||||
|
||||
def ensure_user_or_machine(pub_key: str) -> SopsKey:
|
||||
def ensure_user_or_machine(flake_name: str, pub_key: str) -> SopsKey:
|
||||
key = SopsKey(pub_key, username="")
|
||||
folders = [sops_users_folder(), sops_machines_folder()]
|
||||
folders = [sops_users_folder(flake_name), sops_machines_folder(flake_name)]
|
||||
for folder in folders:
|
||||
if folder.exists():
|
||||
for user in folder.iterdir():
|
||||
@@ -90,13 +90,13 @@ def default_sops_key_path() -> Path:
|
||||
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||
|
||||
|
||||
def ensure_sops_key() -> SopsKey:
|
||||
def ensure_sops_key(flake_name: str) -> SopsKey:
|
||||
key = os.environ.get("SOPS_AGE_KEY")
|
||||
if key:
|
||||
return ensure_user_or_machine(get_public_key(key))
|
||||
return ensure_user_or_machine(flake_name, get_public_key(key))
|
||||
path = default_sops_key_path()
|
||||
if path.exists():
|
||||
return ensure_user_or_machine(get_public_key(path.read_text()))
|
||||
return ensure_user_or_machine(flake_name, get_public_key(path.read_text()))
|
||||
else:
|
||||
raise ClanError(
|
||||
"No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
|
||||
@@ -9,7 +9,7 @@ from typing import Any
|
||||
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
from ..dirs import get_clan_flake_toplevel
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..errors import ClanError
|
||||
from .folders import sops_secrets_folder
|
||||
from .machines import add_machine, has_machine
|
||||
@@ -17,21 +17,29 @@ from .secrets import decrypt_secret, encrypt_secret, has_secret
|
||||
from .sops import generate_private_key
|
||||
|
||||
|
||||
def generate_host_key(machine_name: str) -> None:
|
||||
if has_machine(machine_name):
|
||||
def generate_host_key(flake_name: str, machine_name: str) -> None:
|
||||
if has_machine(flake_name, machine_name):
|
||||
return
|
||||
priv_key, pub_key = generate_private_key()
|
||||
encrypt_secret(sops_secrets_folder() / f"{machine_name}-age.key", priv_key)
|
||||
add_machine(machine_name, pub_key, False)
|
||||
encrypt_secret(
|
||||
flake_name,
|
||||
sops_secrets_folder(flake_name) / f"{machine_name}-age.key",
|
||||
priv_key,
|
||||
)
|
||||
add_machine(flake_name, machine_name, pub_key, False)
|
||||
|
||||
|
||||
def generate_secrets_group(
|
||||
secret_group: str, machine_name: str, tempdir: Path, secret_options: dict[str, Any]
|
||||
flake_name: str,
|
||||
secret_group: str,
|
||||
machine_name: str,
|
||||
tempdir: Path,
|
||||
secret_options: dict[str, Any],
|
||||
) -> None:
|
||||
clan_dir = get_clan_flake_toplevel()
|
||||
clan_dir = specific_flake_dir(flake_name)
|
||||
secrets = secret_options["secrets"]
|
||||
needs_regeneration = any(
|
||||
not has_secret(f"{machine_name}-{secret['name']}")
|
||||
not has_secret(flake_name, f"{machine_name}-{secret['name']}")
|
||||
for secret in secrets.values()
|
||||
)
|
||||
generator = secret_options["generator"]
|
||||
@@ -62,7 +70,8 @@ export secrets={shlex.quote(str(secrets_dir))}
|
||||
msg += text
|
||||
raise ClanError(msg)
|
||||
encrypt_secret(
|
||||
sops_secrets_folder() / f"{machine_name}-{secret['name']}",
|
||||
flake_name,
|
||||
sops_secrets_folder(flake_name) / f"{machine_name}-{secret['name']}",
|
||||
secret_file.read_text(),
|
||||
add_machines=[machine_name],
|
||||
)
|
||||
@@ -79,17 +88,18 @@ export secrets={shlex.quote(str(secrets_dir))}
|
||||
|
||||
# this is called by the sops.nix clan core module
|
||||
def generate_secrets_from_nix(
|
||||
flake_name: str,
|
||||
machine_name: str,
|
||||
secret_submodules: dict[str, Any],
|
||||
) -> None:
|
||||
generate_host_key(machine_name)
|
||||
generate_host_key(flake_name, machine_name)
|
||||
errors = {}
|
||||
with TemporaryDirectory() as d:
|
||||
# if any of the secrets are missing, we regenerate all connected facts/secrets
|
||||
for secret_group, secret_options in secret_submodules.items():
|
||||
try:
|
||||
generate_secrets_group(
|
||||
secret_group, machine_name, Path(d), secret_options
|
||||
flake_name, secret_group, machine_name, Path(d), secret_options
|
||||
)
|
||||
except ClanError as e:
|
||||
errors[secret_group] = e
|
||||
@@ -102,12 +112,15 @@ def generate_secrets_from_nix(
|
||||
|
||||
# this is called by the sops.nix clan core module
|
||||
def upload_age_key_from_nix(
|
||||
flake_name: str,
|
||||
machine_name: str,
|
||||
) -> None:
|
||||
secret_name = f"{machine_name}-age.key"
|
||||
if not has_secret(secret_name): # skip uploading the secret, not managed by us
|
||||
if not has_secret(
|
||||
flake_name, secret_name
|
||||
): # skip uploading the secret, not managed by us
|
||||
return
|
||||
secret = decrypt_secret(secret_name)
|
||||
secret = decrypt_secret(flake_name, secret_name)
|
||||
|
||||
secrets_dir = Path(os.environ["SECRETS_DIR"])
|
||||
(secrets_dir / "key.txt").write_text(secret)
|
||||
|
||||
@@ -4,6 +4,7 @@ import subprocess
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..machines.machines import Machine
|
||||
from ..nix import nix_shell
|
||||
|
||||
@@ -37,7 +38,7 @@ def upload_secrets(machine: Machine) -> None:
|
||||
|
||||
|
||||
def upload_command(args: argparse.Namespace) -> None:
|
||||
machine = Machine(args.machine)
|
||||
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
|
||||
upload_secrets(machine)
|
||||
|
||||
|
||||
@@ -46,4 +47,9 @@ def register_upload_parser(parser: argparse.ArgumentParser) -> None:
|
||||
"machine",
|
||||
help="The machine to upload secrets to",
|
||||
)
|
||||
parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser.set_defaults(func=upload_command)
|
||||
|
||||
@@ -11,20 +11,20 @@ from .types import (
|
||||
)
|
||||
|
||||
|
||||
def add_user(name: str, key: str, force: bool) -> None:
|
||||
write_key(sops_users_folder() / name, key, force)
|
||||
def add_user(flake_name: str, name: str, key: str, force: bool) -> None:
|
||||
write_key(sops_users_folder(flake_name) / name, key, force)
|
||||
|
||||
|
||||
def remove_user(name: str) -> None:
|
||||
remove_object(sops_users_folder(), name)
|
||||
def remove_user(flake_name: str, name: str) -> None:
|
||||
remove_object(sops_users_folder(flake_name), name)
|
||||
|
||||
|
||||
def get_user(name: str) -> str:
|
||||
return read_key(sops_users_folder() / name)
|
||||
def get_user(flake_name: str, name: str) -> str:
|
||||
return read_key(sops_users_folder(flake_name) / name)
|
||||
|
||||
|
||||
def list_users() -> list[str]:
|
||||
path = sops_users_folder()
|
||||
def list_users(flake_name: str) -> list[str]:
|
||||
path = sops_users_folder(flake_name)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
return (
|
||||
@@ -35,38 +35,40 @@ def list_users() -> list[str]:
|
||||
return list_objects(path, validate)
|
||||
|
||||
|
||||
def add_secret(user: str, secret: str) -> None:
|
||||
secrets.allow_member(secrets.users_folder(secret), sops_users_folder(), user)
|
||||
def add_secret(flake_name: str, user: str, secret: str) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.users_folder(flake_name, secret), sops_users_folder(flake_name), user
|
||||
)
|
||||
|
||||
|
||||
def remove_secret(user: str, secret: str) -> None:
|
||||
secrets.disallow_member(secrets.users_folder(secret), user)
|
||||
def remove_secret(flake_name: str, user: str, secret: str) -> None:
|
||||
secrets.disallow_member(secrets.users_folder(flake_name, secret), user)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
lst = list_users()
|
||||
lst = list_users(args.flake)
|
||||
if len(lst) > 0:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def add_command(args: argparse.Namespace) -> None:
|
||||
add_user(args.user, args.key, args.force)
|
||||
add_user(args.flake, args.user, args.key, args.force)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
print(get_user(args.user))
|
||||
print(get_user(args.flake, args.user))
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
remove_user(args.user)
|
||||
remove_user(args.flake, args.user)
|
||||
|
||||
|
||||
def add_secret_command(args: argparse.Namespace) -> None:
|
||||
add_secret(args.user, args.secret)
|
||||
add_secret(args.flake, args.user, args.secret)
|
||||
|
||||
|
||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||
remove_secret(args.user, args.secret)
|
||||
remove_secret(args.flake, args.user, args.secret)
|
||||
|
||||
|
||||
def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
@@ -77,6 +79,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
required=True,
|
||||
)
|
||||
list_parser = subparser.add_parser("list", help="list users")
|
||||
list_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
list_parser.set_defaults(func=list_command)
|
||||
|
||||
add_parser = subparser.add_parser("add", help="add a user")
|
||||
@@ -90,14 +97,29 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
type=public_or_private_age_key_type,
|
||||
)
|
||||
add_parser.set_defaults(func=add_command)
|
||||
add_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
|
||||
get_parser = subparser.add_parser("get", help="get a user public key")
|
||||
get_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
||||
get_parser.set_defaults(func=get_command)
|
||||
get_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
|
||||
remove_parser = subparser.add_parser("remove", help="remove a user")
|
||||
remove_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
||||
remove_parser.set_defaults(func=remove_command)
|
||||
remove_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
|
||||
add_secret_parser = subparser.add_parser(
|
||||
"add-secret", help="allow a user to access a secret"
|
||||
|
||||
Reference in New Issue
Block a user