Added new type FlakeName
This commit is contained in:
@@ -5,14 +5,15 @@ from typing import Callable
|
||||
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..errors import ClanError
|
||||
from ..flakes.types import FlakeName
|
||||
|
||||
|
||||
def get_sops_folder(flake_name: str) -> Path:
|
||||
def get_sops_folder(flake_name: FlakeName) -> Path:
|
||||
return specific_flake_dir(flake_name) / "sops"
|
||||
|
||||
|
||||
def gen_sops_subfolder(subdir: str) -> Callable[[str], Path]:
|
||||
def folder(flake_name: str) -> Path:
|
||||
def gen_sops_subfolder(subdir: str) -> Callable[[FlakeName], Path]:
|
||||
def folder(flake_name: FlakeName) -> Path:
|
||||
return specific_flake_dir(flake_name) / "sops" / subdir
|
||||
|
||||
return folder
|
||||
|
||||
@@ -3,6 +3,7 @@ import os
|
||||
from pathlib import Path
|
||||
|
||||
from ..errors import ClanError
|
||||
from ..flakes.types import FlakeName
|
||||
from ..machines.types import machine_name_type, validate_hostname
|
||||
from . import secrets
|
||||
from .folders import (
|
||||
@@ -20,17 +21,17 @@ from .types import (
|
||||
)
|
||||
|
||||
|
||||
def machines_folder(flake_name: str, group: str) -> Path:
|
||||
def machines_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_groups_folder(flake_name) / group / "machines"
|
||||
|
||||
|
||||
def users_folder(flake_name: str, group: str) -> Path:
|
||||
def users_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_groups_folder(flake_name) / group / "users"
|
||||
|
||||
|
||||
class Group:
|
||||
def __init__(
|
||||
self, flake_name: str, name: str, machines: list[str], users: list[str]
|
||||
self, flake_name: FlakeName, name: str, machines: list[str], users: list[str]
|
||||
) -> None:
|
||||
self.name = name
|
||||
self.machines = machines
|
||||
@@ -38,7 +39,7 @@ class Group:
|
||||
self.flake_name = flake_name
|
||||
|
||||
|
||||
def list_groups(flake_name: str) -> list[Group]:
|
||||
def list_groups(flake_name: FlakeName) -> list[Group]:
|
||||
groups: list[Group] = []
|
||||
folder = sops_groups_folder(flake_name)
|
||||
if not folder.exists():
|
||||
@@ -87,7 +88,7 @@ def list_directory(directory: Path) -> str:
|
||||
return msg
|
||||
|
||||
|
||||
def update_group_keys(flake_name: str, group: str) -> None:
|
||||
def update_group_keys(flake_name: FlakeName, group: str) -> None:
|
||||
for secret_ in secrets.list_secrets(flake_name):
|
||||
secret = sops_secrets_folder(flake_name) / secret_
|
||||
if (secret / "groups" / group).is_symlink():
|
||||
@@ -98,7 +99,7 @@ def update_group_keys(flake_name: str, group: str) -> None:
|
||||
|
||||
|
||||
def add_member(
|
||||
flake_name: str, group_folder: Path, source_folder: Path, name: str
|
||||
flake_name: FlakeName, group_folder: Path, source_folder: Path, name: str
|
||||
) -> None:
|
||||
source = source_folder / name
|
||||
if not source.exists():
|
||||
@@ -117,7 +118,7 @@ def add_member(
|
||||
update_group_keys(flake_name, group_folder.parent.name)
|
||||
|
||||
|
||||
def remove_member(flake_name: str, group_folder: Path, name: str) -> None:
|
||||
def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
|
||||
target = group_folder / name
|
||||
if not target.exists():
|
||||
msg = f"{name} does not exist in group in {group_folder}: "
|
||||
@@ -135,7 +136,7 @@ def remove_member(flake_name: str, group_folder: Path, name: str) -> None:
|
||||
os.rmdir(group_folder.parent)
|
||||
|
||||
|
||||
def add_user(flake_name: str, group: str, name: str) -> None:
|
||||
def add_user(flake_name: FlakeName, group: str, name: str) -> None:
|
||||
add_member(
|
||||
flake_name, users_folder(flake_name, group), sops_users_folder(flake_name), name
|
||||
)
|
||||
@@ -145,7 +146,7 @@ def add_user_command(args: argparse.Namespace) -> None:
|
||||
add_user(args.flake, args.group, args.user)
|
||||
|
||||
|
||||
def remove_user(flake_name: str, group: str, name: str) -> None:
|
||||
def remove_user(flake_name: FlakeName, group: str, name: str) -> None:
|
||||
remove_member(flake_name, users_folder(flake_name, group), name)
|
||||
|
||||
|
||||
@@ -153,7 +154,7 @@ def remove_user_command(args: argparse.Namespace) -> None:
|
||||
remove_user(args.flake, args.group, args.user)
|
||||
|
||||
|
||||
def add_machine(flake_name: str, group: str, name: str) -> None:
|
||||
def add_machine(flake_name: FlakeName, group: str, name: str) -> None:
|
||||
add_member(
|
||||
flake_name,
|
||||
machines_folder(flake_name, group),
|
||||
@@ -166,7 +167,7 @@ def add_machine_command(args: argparse.Namespace) -> None:
|
||||
add_machine(args.flake, args.group, args.machine)
|
||||
|
||||
|
||||
def remove_machine(flake_name: str, group: str, name: str) -> None:
|
||||
def remove_machine(flake_name: FlakeName, group: str, name: str) -> None:
|
||||
remove_member(flake_name, machines_folder(flake_name, group), name)
|
||||
|
||||
|
||||
@@ -178,7 +179,7 @@ def add_group_argument(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("group", help="the name of the secret", type=group_name_type)
|
||||
|
||||
|
||||
def add_secret(flake_name: str, group: str, name: str) -> None:
|
||||
def add_secret(flake_name: FlakeName, group: str, name: str) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.groups_folder(flake_name, name), sops_groups_folder(flake_name), group
|
||||
)
|
||||
@@ -188,7 +189,7 @@ def add_secret_command(args: argparse.Namespace) -> None:
|
||||
add_secret(args.flake, args.group, args.secret)
|
||||
|
||||
|
||||
def remove_secret(flake_name: str, group: str, name: str) -> None:
|
||||
def remove_secret(flake_name: FlakeName, group: str, name: str) -> None:
|
||||
secrets.disallow_member(secrets.groups_folder(flake_name, name), group)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import argparse
|
||||
|
||||
from ..flakes.types import FlakeName
|
||||
from ..machines.types import machine_name_type, validate_hostname
|
||||
from . import secrets
|
||||
from .folders import list_objects, remove_object, sops_machines_folder
|
||||
@@ -7,23 +8,23 @@ from .sops import read_key, write_key
|
||||
from .types import public_or_private_age_key_type, secret_name_type
|
||||
|
||||
|
||||
def add_machine(flake_name: str, name: str, key: str, force: bool) -> None:
|
||||
def add_machine(flake_name: FlakeName, name: str, key: str, force: bool) -> None:
|
||||
write_key(sops_machines_folder(flake_name) / name, key, force)
|
||||
|
||||
|
||||
def remove_machine(flake_name: str, name: str) -> None:
|
||||
def remove_machine(flake_name: FlakeName, name: str) -> None:
|
||||
remove_object(sops_machines_folder(flake_name), name)
|
||||
|
||||
|
||||
def get_machine(flake_name: str, name: str) -> str:
|
||||
def get_machine(flake_name: FlakeName, name: str) -> str:
|
||||
return read_key(sops_machines_folder(flake_name) / name)
|
||||
|
||||
|
||||
def has_machine(flake_name: str, name: str) -> bool:
|
||||
def has_machine(flake_name: FlakeName, name: str) -> bool:
|
||||
return (sops_machines_folder(flake_name) / name / "key.json").exists()
|
||||
|
||||
|
||||
def list_machines(flake_name: str) -> list[str]:
|
||||
def list_machines(flake_name: FlakeName) -> list[str]:
|
||||
path = sops_machines_folder(flake_name)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
@@ -32,7 +33,7 @@ def list_machines(flake_name: str) -> list[str]:
|
||||
return list_objects(path, validate)
|
||||
|
||||
|
||||
def add_secret(flake_name: str, machine: str, secret: str) -> None:
|
||||
def add_secret(flake_name: FlakeName, machine: str, secret: str) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.machines_folder(flake_name, secret),
|
||||
sops_machines_folder(flake_name),
|
||||
@@ -40,7 +41,7 @@ def add_secret(flake_name: str, machine: str, secret: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def remove_secret(flake_name: str, machine: str, secret: str) -> None:
|
||||
def remove_secret(flake_name: FlakeName, machine: str, secret: str) -> None:
|
||||
secrets.disallow_member(secrets.machines_folder(flake_name, secret), machine)
|
||||
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import IO
|
||||
|
||||
from .. import tty
|
||||
from ..errors import ClanError
|
||||
from ..flakes.types import FlakeName
|
||||
from .folders import (
|
||||
list_objects,
|
||||
sops_groups_folder,
|
||||
@@ -53,7 +54,7 @@ def collect_keys_for_path(path: Path) -> set[str]:
|
||||
|
||||
|
||||
def encrypt_secret(
|
||||
flake_name: str,
|
||||
flake_name: FlakeName,
|
||||
secret: Path,
|
||||
value: IO[str] | str | None,
|
||||
add_users: list[str] = [],
|
||||
@@ -101,7 +102,7 @@ def encrypt_secret(
|
||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||
|
||||
|
||||
def remove_secret(flake_name: str, secret: str) -> None:
|
||||
def remove_secret(flake_name: FlakeName, secret: str) -> None:
|
||||
path = sops_secrets_folder(flake_name) / secret
|
||||
if not path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
@@ -116,15 +117,15 @@ def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
|
||||
|
||||
|
||||
def machines_folder(flake_name: str, group: str) -> Path:
|
||||
def machines_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "machines"
|
||||
|
||||
|
||||
def users_folder(flake_name: str, group: str) -> Path:
|
||||
def users_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "users"
|
||||
|
||||
|
||||
def groups_folder(flake_name: str, group: str) -> Path:
|
||||
def groups_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "groups"
|
||||
|
||||
|
||||
@@ -188,11 +189,11 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def has_secret(flake_name: str, secret: str) -> bool:
|
||||
def has_secret(flake_name: FlakeName, secret: str) -> bool:
|
||||
return (sops_secrets_folder(flake_name) / secret / "secret").exists()
|
||||
|
||||
|
||||
def list_secrets(flake_name: str) -> list[str]:
|
||||
def list_secrets(flake_name: FlakeName) -> list[str]:
|
||||
path = sops_secrets_folder(flake_name)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
@@ -209,7 +210,7 @@ def list_command(args: argparse.Namespace) -> None:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def decrypt_secret(flake_name: str, secret: str) -> str:
|
||||
def decrypt_secret(flake_name: FlakeName, secret: str) -> str:
|
||||
ensure_sops_key(flake_name)
|
||||
secret_path = sops_secrets_folder(flake_name) / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
|
||||
@@ -9,6 +9,7 @@ from typing import IO, Iterator
|
||||
|
||||
from ..dirs import user_config_dir
|
||||
from ..errors import ClanError
|
||||
from ..flakes.types import FlakeName
|
||||
from ..nix import nix_shell
|
||||
from .folders import sops_machines_folder, sops_users_folder
|
||||
|
||||
@@ -51,7 +52,7 @@ def generate_private_key() -> tuple[str, str]:
|
||||
raise ClanError("Failed to generate private sops key") from e
|
||||
|
||||
|
||||
def get_user_name(flake_name: str, user: str) -> str:
|
||||
def get_user_name(flake_name: FlakeName, user: str) -> str:
|
||||
"""Ask the user for their name until a unique one is provided."""
|
||||
while True:
|
||||
name = input(
|
||||
@@ -64,7 +65,7 @@ def get_user_name(flake_name: str, user: str) -> str:
|
||||
print(f"{sops_users_folder(flake_name) / user} already exists")
|
||||
|
||||
|
||||
def ensure_user_or_machine(flake_name: str, pub_key: str) -> SopsKey:
|
||||
def ensure_user_or_machine(flake_name: FlakeName, pub_key: str) -> SopsKey:
|
||||
key = SopsKey(pub_key, username="")
|
||||
folders = [sops_users_folder(flake_name), sops_machines_folder(flake_name)]
|
||||
for folder in folders:
|
||||
@@ -90,7 +91,7 @@ def default_sops_key_path() -> Path:
|
||||
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||
|
||||
|
||||
def ensure_sops_key(flake_name: str) -> SopsKey:
|
||||
def ensure_sops_key(flake_name: FlakeName) -> SopsKey:
|
||||
key = os.environ.get("SOPS_AGE_KEY")
|
||||
if key:
|
||||
return ensure_user_or_machine(flake_name, get_public_key(key))
|
||||
|
||||
@@ -11,13 +11,14 @@ from clan_cli.nix import nix_shell
|
||||
|
||||
from ..dirs import specific_flake_dir
|
||||
from ..errors import ClanError
|
||||
from ..flakes.types import FlakeName
|
||||
from .folders import sops_secrets_folder
|
||||
from .machines import add_machine, has_machine
|
||||
from .secrets import decrypt_secret, encrypt_secret, has_secret
|
||||
from .sops import generate_private_key
|
||||
|
||||
|
||||
def generate_host_key(flake_name: str, machine_name: str) -> None:
|
||||
def generate_host_key(flake_name: FlakeName, machine_name: str) -> None:
|
||||
if has_machine(flake_name, machine_name):
|
||||
return
|
||||
priv_key, pub_key = generate_private_key()
|
||||
@@ -30,7 +31,7 @@ def generate_host_key(flake_name: str, machine_name: str) -> None:
|
||||
|
||||
|
||||
def generate_secrets_group(
|
||||
flake_name: str,
|
||||
flake_name: FlakeName,
|
||||
secret_group: str,
|
||||
machine_name: str,
|
||||
tempdir: Path,
|
||||
@@ -88,7 +89,7 @@ export secrets={shlex.quote(str(secrets_dir))}
|
||||
|
||||
# this is called by the sops.nix clan core module
|
||||
def generate_secrets_from_nix(
|
||||
flake_name: str,
|
||||
flake_name: FlakeName,
|
||||
machine_name: str,
|
||||
secret_submodules: dict[str, Any],
|
||||
) -> None:
|
||||
@@ -112,7 +113,7 @@ def generate_secrets_from_nix(
|
||||
|
||||
# this is called by the sops.nix clan core module
|
||||
def upload_age_key_from_nix(
|
||||
flake_name: str,
|
||||
flake_name: FlakeName,
|
||||
machine_name: str,
|
||||
) -> None:
|
||||
secret_name = f"{machine_name}-age.key"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import argparse
|
||||
|
||||
from ..flakes.types import FlakeName
|
||||
from . import secrets
|
||||
from .folders import list_objects, remove_object, sops_users_folder
|
||||
from .sops import read_key, write_key
|
||||
@@ -11,19 +12,19 @@ from .types import (
|
||||
)
|
||||
|
||||
|
||||
def add_user(flake_name: str, name: str, key: str, force: bool) -> None:
|
||||
def add_user(flake_name: FlakeName, name: str, key: str, force: bool) -> None:
|
||||
write_key(sops_users_folder(flake_name) / name, key, force)
|
||||
|
||||
|
||||
def remove_user(flake_name: str, name: str) -> None:
|
||||
def remove_user(flake_name: FlakeName, name: str) -> None:
|
||||
remove_object(sops_users_folder(flake_name), name)
|
||||
|
||||
|
||||
def get_user(flake_name: str, name: str) -> str:
|
||||
def get_user(flake_name: FlakeName, name: str) -> str:
|
||||
return read_key(sops_users_folder(flake_name) / name)
|
||||
|
||||
|
||||
def list_users(flake_name: str) -> list[str]:
|
||||
def list_users(flake_name: FlakeName) -> list[str]:
|
||||
path = sops_users_folder(flake_name)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
@@ -35,13 +36,13 @@ def list_users(flake_name: str) -> list[str]:
|
||||
return list_objects(path, validate)
|
||||
|
||||
|
||||
def add_secret(flake_name: str, user: str, secret: str) -> None:
|
||||
def add_secret(flake_name: FlakeName, user: str, secret: str) -> None:
|
||||
secrets.allow_member(
|
||||
secrets.users_folder(flake_name, secret), sops_users_folder(flake_name), user
|
||||
)
|
||||
|
||||
|
||||
def remove_secret(flake_name: str, user: str, secret: str) -> None:
|
||||
def remove_secret(flake_name: FlakeName, user: str, secret: str) -> None:
|
||||
secrets.disallow_member(secrets.users_folder(flake_name, secret), user)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user