Added flake_name:str argument everywhere, nix fmt doesn't complain anymore

This commit is contained in:
Qubasa
2023-10-14 14:57:36 +02:00
parent fdcd7ad1d9
commit 32e60f5adc
28 changed files with 365 additions and 206 deletions

View File

@@ -9,7 +9,7 @@ import sys
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Tuple, get_origin from typing import Any, Optional, Tuple, get_origin
from clan_cli.dirs import get_clan_flake_toplevel, machine_settings_file from clan_cli.dirs import machine_settings_file, specific_flake_dir
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.git import commit_file from clan_cli.git import commit_file
from clan_cli.nix import nix_eval from clan_cli.nix import nix_eval
@@ -103,8 +103,10 @@ def cast(value: Any, type: Any, opt_description: str) -> Any:
) )
def options_for_machine(machine_name: str, show_trace: bool = False) -> dict: def options_for_machine(
clan_dir = get_clan_flake_toplevel() flake_name: str, machine_name: str, show_trace: bool = False
) -> dict:
clan_dir = specific_flake_dir(flake_name)
flags = [] flags = []
if show_trace: if show_trace:
flags.append("--show-trace") flags.append("--show-trace")
@@ -125,9 +127,9 @@ def options_for_machine(machine_name: str, show_trace: bool = False) -> dict:
def read_machine_option_value( def read_machine_option_value(
machine_name: str, option: str, show_trace: bool = False flake_name: str, machine_name: str, option: str, show_trace: bool = False
) -> str: ) -> str:
clan_dir = get_clan_flake_toplevel() clan_dir = specific_flake_dir(flake_name)
# use nix eval to read from .#nixosConfigurations.default.config.{option} # use nix eval to read from .#nixosConfigurations.default.config.{option}
# this will give us the evaluated config with the options attribute # this will give us the evaluated config with the options attribute
cmd = nix_eval( cmd = nix_eval(
@@ -160,19 +162,19 @@ def get_or_set_option(args: argparse.Namespace) -> None:
# load options # load options
if args.options_file is None: if args.options_file is None:
options = options_for_machine( options = options_for_machine(
machine_name=args.machine, show_trace=args.show_trace args.flake, machine_name=args.machine, show_trace=args.show_trace
) )
else: else:
with open(args.options_file) as f: with open(args.options_file) as f:
options = json.load(f) options = json.load(f)
# compute settings json file location # compute settings json file location
if args.settings_file is None: if args.settings_file is None:
get_clan_flake_toplevel()
settings_file = machine_settings_file(args.flake, args.machine) settings_file = machine_settings_file(args.flake, args.machine)
else: else:
settings_file = args.settings_file settings_file = args.settings_file
# set the option with the given value # set the option with the given value
set_option( set_option(
flake_name=args.flake,
option=args.option, option=args.option,
value=args.value, value=args.value,
options=options, options=options,
@@ -181,7 +183,7 @@ def get_or_set_option(args: argparse.Namespace) -> None:
show_trace=args.show_trace, show_trace=args.show_trace,
) )
if not args.quiet: if not args.quiet:
new_value = read_machine_option_value(args.machine, args.option) new_value = read_machine_option_value(args.flake, args.machine, args.option)
print(f"New Value for {args.option}:") print(f"New Value for {args.option}:")
print(new_value) print(new_value)
@@ -238,6 +240,7 @@ def find_option(
def set_option( def set_option(
flake_name: str,
option: str, option: str,
value: Any, value: Any,
options: dict, options: dict,
@@ -286,7 +289,7 @@ def set_option(
json.dump(new_config, f, indent=2) json.dump(new_config, f, indent=2)
print(file=f) # add newline at the end of the file to make git happy print(file=f) # add newline at the end of the file to make git happy
if settings_file.resolve().is_relative_to(get_clan_flake_toplevel()): if settings_file.resolve().is_relative_to(specific_flake_dir(flake_name)):
commit_file(settings_file, commit_message=f"Set option {option_description}") commit_file(settings_file, commit_message=f"Set option {option_description}")

View File

@@ -7,9 +7,9 @@ from pathlib import Path
from fastapi import HTTPException from fastapi import HTTPException
from clan_cli.dirs import ( from clan_cli.dirs import (
get_flake_path,
machine_settings_file, machine_settings_file,
nixpkgs_source, nixpkgs_source,
specific_flake_dir,
specific_machine_dir, specific_machine_dir,
) )
from clan_cli.git import commit_file, find_git_repo_root from clan_cli.git import commit_file, find_git_repo_root
@@ -84,7 +84,7 @@ def set_config_for_machine(flake_name: str, machine_name: str, config: dict) ->
def schema_for_machine(flake_name: str, machine_name: str) -> dict: def schema_for_machine(flake_name: str, machine_name: str) -> dict:
flake = get_flake_path(flake_name) flake = specific_flake_dir(flake_name)
# use nix eval to lib.evalModules .#nixosModules.machine-{machine_name} # use nix eval to lib.evalModules .#nixosModules.machine-{machine_name}
proc = subprocess.run( proc = subprocess.run(

View File

@@ -6,7 +6,7 @@ from typing import Optional
from .errors import ClanError from .errors import ClanError
def get_clan_flake_toplevel() -> Path: def _get_clan_flake_toplevel() -> Path:
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"]) return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
@@ -61,22 +61,22 @@ def clan_config_dir() -> Path:
return path.resolve() return path.resolve()
def clan_flake_dir() -> Path: def clan_flakes_dir() -> Path:
path = clan_data_dir() / "flake" path = clan_data_dir() / "flake"
if not path.exists(): if not path.exists():
path.mkdir() path.mkdir()
return path.resolve() return path.resolve()
def get_flake_path(name: str) -> Path: def specific_flake_dir(name: str) -> Path:
flake_dir = clan_flake_dir() / name flake_dir = clan_flakes_dir() / name
if not flake_dir.exists(): if not flake_dir.exists():
raise ClanError(f"Flake {name} does not exist") raise ClanError(f"Flake {name} does not exist")
return flake_dir return flake_dir
def machines_dir(flake_name: str) -> Path: def machines_dir(flake_name: str) -> Path:
return get_flake_path(flake_name) / "machines" return specific_flake_dir(flake_name) / "machines"
def specific_machine_dir(flake_name: str, machine: str) -> Path: def specific_machine_dir(flake_name: str, machine: str) -> Path:

View File

@@ -7,7 +7,7 @@ from pydantic import AnyUrl
from pydantic.tools import parse_obj_as from pydantic.tools import parse_obj_as
from ..async_cmd import CmdOut, run, runforcli from ..async_cmd import CmdOut, run, runforcli
from ..dirs import clan_flake_dir from ..dirs import clan_flakes_dir
from ..nix import nix_command, nix_shell from ..nix import nix_command, nix_shell
DEFAULT_URL: AnyUrl = parse_obj_as( DEFAULT_URL: AnyUrl = parse_obj_as(
@@ -54,7 +54,7 @@ async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]:
def create_flake_command(args: argparse.Namespace) -> None: def create_flake_command(args: argparse.Namespace) -> None:
flake_dir = clan_flake_dir() / args.name flake_dir = clan_flakes_dir() / args.name
runforcli(create_flake, flake_dir, DEFAULT_URL) runforcli(create_flake, flake_dir, DEFAULT_URL)

View File

@@ -2,13 +2,13 @@ import argparse
import logging import logging
import os import os
from ..dirs import clan_flake_dir from ..dirs import clan_flakes_dir
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def list_flakes() -> list[str]: def list_flakes() -> list[str]:
path = clan_flake_dir() path = clan_flakes_dir()
log.debug(f"Listing machines in {path}") log.debug(f"Listing machines in {path}")
if not path.exists(): if not path.exists():
return [] return []

View File

@@ -3,7 +3,7 @@ import logging
from typing import Dict from typing import Dict
from ..async_cmd import CmdOut, run, runforcli from ..async_cmd import CmdOut, run, runforcli
from ..dirs import get_flake_path, specific_machine_dir from ..dirs import specific_flake_dir, specific_machine_dir
from ..errors import ClanError from ..errors import ClanError
from ..nix import nix_shell from ..nix import nix_shell
@@ -35,7 +35,7 @@ async def create_machine(flake_name: str, machine_name: str) -> Dict[str, CmdOut
def create_command(args: argparse.Namespace) -> None: def create_command(args: argparse.Namespace) -> None:
try: try:
flake_dir = get_flake_path(args.flake) flake_dir = specific_flake_dir(args.flake)
runforcli(create_machine, flake_dir, args.machine) runforcli(create_machine, flake_dir, args.machine)
except ClanError as e: except ClanError as e:
print(e) print(e)

View File

@@ -3,7 +3,7 @@ import subprocess
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from ..dirs import get_flake_path from ..dirs import specific_flake_dir
from ..machines.machines import Machine from ..machines.machines import Machine
from ..nix import nix_shell from ..nix import nix_shell
from ..secrets.generate import generate_secrets from ..secrets.generate import generate_secrets
@@ -40,7 +40,7 @@ def install_nixos(machine: Machine) -> None:
def install_command(args: argparse.Namespace) -> None: def install_command(args: argparse.Namespace) -> None:
machine = Machine(args.machine, flake_dir=get_flake_path(args.flake)) machine = Machine(args.machine, flake_dir=specific_flake_dir(args.flake))
machine.deployment_address = args.target_host machine.deployment_address = args.target_host
install_nixos(machine) install_nixos(machine)

View File

@@ -5,7 +5,6 @@ import sys
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from ..dirs import get_clan_flake_toplevel
from ..nix import nix_build, nix_config, nix_eval from ..nix import nix_build, nix_config, nix_eval
from ..ssh import Host, parse_deployment_address from ..ssh import Host, parse_deployment_address
@@ -31,7 +30,7 @@ class Machine:
def __init__( def __init__(
self, self,
name: str, name: str,
flake_dir: Optional[Path] = None, flake_dir: Path,
machine_data: Optional[dict] = None, machine_data: Optional[dict] = None,
) -> None: ) -> None:
""" """
@@ -41,9 +40,6 @@ class Machine:
@machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data @machine_json: can be optionally used to skip evaluation of the machine, location of the json file with machine data
""" """
self.name = name self.name = name
if flake_dir is None:
self.flake_dir = get_clan_flake_toplevel()
else:
self.flake_dir = flake_dir self.flake_dir = flake_dir
if machine_data is None: if machine_data is None:

View File

@@ -4,7 +4,7 @@ import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from ..dirs import get_flake_path from ..dirs import specific_flake_dir
from ..machines.machines import Machine from ..machines.machines import Machine
from ..nix import nix_build, nix_command, nix_config from ..nix import nix_build, nix_command, nix_config
from ..secrets.generate import generate_secrets from ..secrets.generate import generate_secrets
@@ -95,7 +95,11 @@ def get_all_machines(clan_dir: Path) -> HostGroup:
host = parse_deployment_address( host = parse_deployment_address(
name, name,
machine_data["deploymentAddress"], machine_data["deploymentAddress"],
meta={"machine": Machine(name=name, machine_data=machine_data)}, meta={
"machine": Machine(
name=name, flake_dir=clan_dir, machine_data=machine_data
)
},
) )
hosts.append(host) hosts.append(host)
return HostGroup(hosts) return HostGroup(hosts)
@@ -111,7 +115,7 @@ def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGrou
# FIXME: we want some kind of inventory here. # FIXME: we want some kind of inventory here.
def update(args: argparse.Namespace) -> None: def update(args: argparse.Namespace) -> None:
flake_dir = get_flake_path(args.flake) flake_dir = specific_flake_dir(args.flake)
if len(args.machines) == 1 and args.target_host is not None: if len(args.machines) == 1 and args.target_host is not None:
machine = Machine(name=args.machines[0], flake_dir=flake_dir) machine = Machine(name=args.machines[0], flake_dir=flake_dir)
machine.deployment_address = args.target_host machine.deployment_address = args.target_host

View File

@@ -3,17 +3,17 @@ import shutil
from pathlib import Path from pathlib import Path
from typing import Callable from typing import Callable
from ..dirs import get_clan_flake_toplevel from ..dirs import specific_flake_dir
from ..errors import ClanError from ..errors import ClanError
def get_sops_folder() -> Path: def get_sops_folder(flake_name: str) -> Path:
return get_clan_flake_toplevel() / "sops" return specific_flake_dir(flake_name) / "sops"
def gen_sops_subfolder(subdir: str) -> Callable[[], Path]: def gen_sops_subfolder(subdir: str) -> Callable[[str], Path]:
def folder() -> Path: def folder(flake_name: str) -> Path:
return get_clan_flake_toplevel() / "sops" / subdir return specific_flake_dir(flake_name) / "sops" / subdir
return folder return folder

View File

@@ -6,6 +6,7 @@ import sys
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from ..dirs import specific_flake_dir
from ..machines.machines import Machine from ..machines.machines import Machine
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -29,7 +30,7 @@ def generate_secrets(machine: Machine) -> None:
def generate_command(args: argparse.Namespace) -> None: def generate_command(args: argparse.Namespace) -> None:
machine = Machine(args.machine) machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
generate_secrets(machine) generate_secrets(machine)
@@ -38,4 +39,9 @@ def register_generate_parser(parser: argparse.ArgumentParser) -> None:
"machine", "machine",
help="The machine to generate secrets for", help="The machine to generate secrets for",
) )
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=generate_command) parser.set_defaults(func=generate_command)

View File

@@ -20,24 +20,27 @@ from .types import (
) )
def machines_folder(group: str) -> Path: def machines_folder(flake_name: str, group: str) -> Path:
return sops_groups_folder() / group / "machines" return sops_groups_folder(flake_name) / group / "machines"
def users_folder(group: str) -> Path: def users_folder(flake_name: str, group: str) -> Path:
return sops_groups_folder() / group / "users" return sops_groups_folder(flake_name) / group / "users"
class Group: class Group:
def __init__(self, name: str, machines: list[str], users: list[str]) -> None: def __init__(
self, flake_name: str, name: str, machines: list[str], users: list[str]
) -> None:
self.name = name self.name = name
self.machines = machines self.machines = machines
self.users = users self.users = users
self.flake_name = flake_name
def list_groups() -> list[Group]: def list_groups(flake_name: str) -> list[Group]:
groups: list[Group] = [] groups: list[Group] = []
folder = sops_groups_folder() folder = sops_groups_folder(flake_name)
if not folder.exists(): if not folder.exists():
return groups return groups
@@ -45,24 +48,24 @@ def list_groups() -> list[Group]:
group_folder = folder / name group_folder = folder / name
if not group_folder.is_dir(): if not group_folder.is_dir():
continue continue
machines_path = machines_folder(name) machines_path = machines_folder(flake_name, name)
machines = [] machines = []
if machines_path.is_dir(): if machines_path.is_dir():
for f in machines_path.iterdir(): for f in machines_path.iterdir():
if validate_hostname(f.name): if validate_hostname(f.name):
machines.append(f.name) machines.append(f.name)
users_path = users_folder(name) users_path = users_folder(flake_name, name)
users = [] users = []
if users_path.is_dir(): if users_path.is_dir():
for f in users_path.iterdir(): for f in users_path.iterdir():
if VALID_USER_NAME.match(f.name): if VALID_USER_NAME.match(f.name):
users.append(f.name) users.append(f.name)
groups.append(Group(name, machines, users)) groups.append(Group(flake_name, name, machines, users))
return groups return groups
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
for group in list_groups(): for group in list_groups(args.flake):
print(group.name) print(group.name)
if group.machines: if group.machines:
print("machines:") print("machines:")
@@ -84,9 +87,9 @@ def list_directory(directory: Path) -> str:
return msg return msg
def update_group_keys(group: str) -> None: def update_group_keys(flake_name: str, group: str) -> None:
for secret_ in secrets.list_secrets(): for secret_ in secrets.list_secrets(flake_name):
secret = sops_secrets_folder() / secret_ secret = sops_secrets_folder(flake_name) / secret_
if (secret / "groups" / group).is_symlink(): if (secret / "groups" / group).is_symlink():
update_keys( update_keys(
secret, secret,
@@ -94,7 +97,9 @@ def update_group_keys(group: str) -> None:
) )
def add_member(group_folder: Path, source_folder: Path, name: str) -> None: def add_member(
flake_name: str, group_folder: Path, source_folder: Path, name: str
) -> None:
source = source_folder / name source = source_folder / name
if not source.exists(): if not source.exists():
msg = f"{name} does not exist in {source_folder}: " msg = f"{name} does not exist in {source_folder}: "
@@ -109,10 +114,10 @@ def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
) )
os.remove(user_target) os.remove(user_target)
user_target.symlink_to(os.path.relpath(source, user_target.parent)) user_target.symlink_to(os.path.relpath(source, user_target.parent))
update_group_keys(group_folder.parent.name) update_group_keys(flake_name, group_folder.parent.name)
def remove_member(group_folder: Path, name: str) -> None: def remove_member(flake_name: str, group_folder: Path, name: str) -> None:
target = group_folder / name target = group_folder / name
if not target.exists(): if not target.exists():
msg = f"{name} does not exist in group in {group_folder}: " msg = f"{name} does not exist in group in {group_folder}: "
@@ -121,7 +126,7 @@ def remove_member(group_folder: Path, name: str) -> None:
os.remove(target) os.remove(target)
if len(os.listdir(group_folder)) > 0: if len(os.listdir(group_folder)) > 0:
update_group_keys(group_folder.parent.name) update_group_keys(flake_name, group_folder.parent.name)
if len(os.listdir(group_folder)) == 0: if len(os.listdir(group_folder)) == 0:
os.rmdir(group_folder) os.rmdir(group_folder)
@@ -130,56 +135,65 @@ def remove_member(group_folder: Path, name: str) -> None:
os.rmdir(group_folder.parent) os.rmdir(group_folder.parent)
def add_user(group: str, name: str) -> None: def add_user(flake_name: str, group: str, name: str) -> None:
add_member(users_folder(group), sops_users_folder(), name) add_member(
flake_name, users_folder(flake_name, group), sops_users_folder(flake_name), name
)
def add_user_command(args: argparse.Namespace) -> None: def add_user_command(args: argparse.Namespace) -> None:
add_user(args.group, args.user) add_user(args.flake, args.group, args.user)
def remove_user(group: str, name: str) -> None: def remove_user(flake_name: str, group: str, name: str) -> None:
remove_member(users_folder(group), name) remove_member(flake_name, users_folder(flake_name, group), name)
def remove_user_command(args: argparse.Namespace) -> None: def remove_user_command(args: argparse.Namespace) -> None:
remove_user(args.group, args.user) remove_user(args.flake, args.group, args.user)
def add_machine(group: str, name: str) -> None: def add_machine(flake_name: str, group: str, name: str) -> None:
add_member(machines_folder(group), sops_machines_folder(), name) add_member(
flake_name,
machines_folder(flake_name, group),
sops_machines_folder(flake_name),
name,
)
def add_machine_command(args: argparse.Namespace) -> None: def add_machine_command(args: argparse.Namespace) -> None:
add_machine(args.group, args.machine) add_machine(args.flake, args.group, args.machine)
def remove_machine(group: str, name: str) -> None: def remove_machine(flake_name: str, group: str, name: str) -> None:
remove_member(machines_folder(group), name) remove_member(flake_name, machines_folder(flake_name, group), name)
def remove_machine_command(args: argparse.Namespace) -> None: def remove_machine_command(args: argparse.Namespace) -> None:
remove_machine(args.group, args.machine) remove_machine(args.flake, args.group, args.machine)
def add_group_argument(parser: argparse.ArgumentParser) -> None: def add_group_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("group", help="the name of the secret", type=group_name_type) parser.add_argument("group", help="the name of the secret", type=group_name_type)
def add_secret(group: str, name: str) -> None: def add_secret(flake_name: str, group: str, name: str) -> None:
secrets.allow_member(secrets.groups_folder(name), sops_groups_folder(), group) secrets.allow_member(
secrets.groups_folder(flake_name, name), sops_groups_folder(flake_name), group
)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.group, args.secret) add_secret(args.flake, args.group, args.secret)
def remove_secret(group: str, name: str) -> None: def remove_secret(flake_name: str, group: str, name: str) -> None:
secrets.disallow_member(secrets.groups_folder(name), group) secrets.disallow_member(secrets.groups_folder(flake_name, name), group)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.group, args.secret) remove_secret(args.flake, args.group, args.secret)
def register_groups_parser(parser: argparse.ArgumentParser) -> None: def register_groups_parser(parser: argparse.ArgumentParser) -> None:

View File

@@ -36,14 +36,15 @@ def import_sops(args: argparse.Namespace) -> None:
file=sys.stderr, file=sys.stderr,
) )
continue continue
if (sops_secrets_folder() / k / "secret").exists(): if (sops_secrets_folder(args.flake) / k / "secret").exists():
print( print(
f"WARNING: {k} already exists, skipping", f"WARNING: {k} already exists, skipping",
file=sys.stderr, file=sys.stderr,
) )
continue continue
encrypt_secret( encrypt_secret(
sops_secrets_folder() / k, args.flake,
sops_secrets_folder(args.flake) / k,
v, v,
add_groups=args.group, add_groups=args.group,
add_machines=args.machine, add_machines=args.machine,

View File

@@ -7,65 +7,67 @@ from .sops import read_key, write_key
from .types import public_or_private_age_key_type, secret_name_type from .types import public_or_private_age_key_type, secret_name_type
def add_machine(name: str, key: str, force: bool) -> None: def add_machine(flake_name: str, name: str, key: str, force: bool) -> None:
write_key(sops_machines_folder() / name, key, force) write_key(sops_machines_folder(flake_name) / name, key, force)
def remove_machine(name: str) -> None: def remove_machine(flake_name: str, name: str) -> None:
remove_object(sops_machines_folder(), name) remove_object(sops_machines_folder(flake_name), name)
def get_machine(name: str) -> str: def get_machine(flake_name: str, name: str) -> str:
return read_key(sops_machines_folder() / name) return read_key(sops_machines_folder(flake_name) / name)
def has_machine(name: str) -> bool: def has_machine(flake_name: str, name: str) -> bool:
return (sops_machines_folder() / name / "key.json").exists() return (sops_machines_folder(flake_name) / name / "key.json").exists()
def list_machines() -> list[str]: def list_machines(flake_name: str) -> list[str]:
path = sops_machines_folder() path = sops_machines_folder(flake_name)
def validate(name: str) -> bool: def validate(name: str) -> bool:
return validate_hostname(name) and has_machine(name) return validate_hostname(name) and has_machine(flake_name, name)
return list_objects(path, validate) return list_objects(path, validate)
def add_secret(machine: str, secret: str) -> None: def add_secret(flake_name: str, machine: str, secret: str) -> None:
secrets.allow_member( secrets.allow_member(
secrets.machines_folder(secret), sops_machines_folder(), machine secrets.machines_folder(flake_name, secret),
sops_machines_folder(flake_name),
machine,
) )
def remove_secret(machine: str, secret: str) -> None: def remove_secret(flake_name: str, machine: str, secret: str) -> None:
secrets.disallow_member(secrets.machines_folder(secret), machine) secrets.disallow_member(secrets.machines_folder(flake_name, secret), machine)
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
lst = list_machines() lst = list_machines(args.flake)
if len(lst) > 0: if len(lst) > 0:
print("\n".join(lst)) print("\n".join(lst))
def add_command(args: argparse.Namespace) -> None: def add_command(args: argparse.Namespace) -> None:
add_machine(args.machine, args.key, args.force) add_machine(args.flake, args.machine, args.key, args.force)
def get_command(args: argparse.Namespace) -> None: def get_command(args: argparse.Namespace) -> None:
print(get_machine(args.machine)) print(get_machine(args.flake, args.machine))
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
remove_machine(args.machine) remove_machine(args.flake, args.machine)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.machine, args.secret) add_secret(args.flake, args.machine, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.machine, args.secret) remove_secret(args.flake, args.machine, args.secret)
def register_machines_parser(parser: argparse.ArgumentParser) -> None: def register_machines_parser(parser: argparse.ArgumentParser) -> None:
@@ -75,9 +77,16 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="the command to run", help="the command to run",
required=True, required=True,
) )
# Parser
list_parser = subparser.add_parser("list", help="list machines") list_parser = subparser.add_parser("list", help="list machines")
list_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
list_parser.set_defaults(func=list_command) list_parser.set_defaults(func=list_command)
# Parser
add_parser = subparser.add_parser("add", help="add a machine") add_parser = subparser.add_parser("add", help="add a machine")
add_parser.add_argument( add_parser.add_argument(
"-f", "-f",
@@ -86,6 +95,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
action="store_true", action="store_true",
default=False, default=False,
) )
add_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_parser.add_argument( add_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type "machine", help="the name of the machine", type=machine_name_type
) )
@@ -96,21 +110,39 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
) )
add_parser.set_defaults(func=add_command) add_parser.set_defaults(func=add_command)
# Parser
get_parser = subparser.add_parser("get", help="get a machine public key") get_parser = subparser.add_parser("get", help="get a machine public key")
get_parser.add_argument( get_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type "machine", help="the name of the machine", type=machine_name_type
) )
get_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
get_parser.set_defaults(func=get_command) get_parser.set_defaults(func=get_command)
# Parser
remove_parser = subparser.add_parser("remove", help="remove a machine") remove_parser = subparser.add_parser("remove", help="remove a machine")
remove_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_parser.add_argument( remove_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type "machine", help="the name of the machine", type=machine_name_type
) )
remove_parser.set_defaults(func=remove_command) remove_parser.set_defaults(func=remove_command)
# Parser
add_secret_parser = subparser.add_parser( add_secret_parser = subparser.add_parser(
"add-secret", help="allow a machine to access a secret" "add-secret", help="allow a machine to access a secret"
) )
add_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_secret_parser.add_argument( add_secret_parser.add_argument(
"machine", help="the name of the machine", type=machine_name_type "machine", help="the name of the machine", type=machine_name_type
) )
@@ -119,9 +151,15 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
) )
add_secret_parser.set_defaults(func=add_secret_command) add_secret_parser.set_defaults(func=add_secret_command)
# Parser
remove_secret_parser = subparser.add_parser( remove_secret_parser = subparser.add_parser(
"remove-secret", help="remove a group's access to a secret" "remove-secret", help="remove a group's access to a secret"
) )
remove_secret_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_secret_parser.add_argument( remove_secret_parser.add_argument(
"machine", help="the name of the group", type=machine_name_type "machine", help="the name of the group", type=machine_name_type
) )

View File

@@ -53,62 +53,79 @@ def collect_keys_for_path(path: Path) -> set[str]:
def encrypt_secret( def encrypt_secret(
flake_name: str,
secret: Path, secret: Path,
value: IO[str] | str | None, value: IO[str] | str | None,
add_users: list[str] = [], add_users: list[str] = [],
add_machines: list[str] = [], add_machines: list[str] = [],
add_groups: list[str] = [], add_groups: list[str] = [],
) -> None: ) -> None:
key = ensure_sops_key() key = ensure_sops_key(flake_name)
keys = set([]) keys = set([])
for user in add_users: for user in add_users:
allow_member(users_folder(secret.name), sops_users_folder(), user, False) allow_member(
users_folder(flake_name, secret.name),
sops_users_folder(flake_name),
user,
False,
)
for machine in add_machines: for machine in add_machines:
allow_member( allow_member(
machines_folder(secret.name), sops_machines_folder(), machine, False machines_folder(flake_name, secret.name),
sops_machines_folder(flake_name),
machine,
False,
) )
for group in add_groups: for group in add_groups:
allow_member(groups_folder(secret.name), sops_groups_folder(), group, False) allow_member(
groups_folder(flake_name, secret.name),
sops_groups_folder(flake_name),
group,
False,
)
keys = collect_keys_for_path(secret) keys = collect_keys_for_path(secret)
if key.pubkey not in keys: if key.pubkey not in keys:
keys.add(key.pubkey) keys.add(key.pubkey)
allow_member( allow_member(
users_folder(secret.name), sops_users_folder(), key.username, False users_folder(flake_name, secret.name),
sops_users_folder(flake_name),
key.username,
False,
) )
encrypt_file(secret / "secret", value, list(sorted(keys))) encrypt_file(secret / "secret", value, list(sorted(keys)))
def remove_secret(secret: str) -> None: def remove_secret(flake_name: str, secret: str) -> None:
path = sops_secrets_folder() / secret path = sops_secrets_folder(flake_name) / secret
if not path.exists(): if not path.exists():
raise ClanError(f"Secret '{secret}' does not exist") raise ClanError(f"Secret '{secret}' does not exist")
shutil.rmtree(path) shutil.rmtree(path)
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
remove_secret(args.secret) remove_secret(args.flake, args.secret)
def add_secret_argument(parser: argparse.ArgumentParser) -> None: def add_secret_argument(parser: argparse.ArgumentParser) -> None:
parser.add_argument("secret", help="the name of the secret", type=secret_name_type) parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
def machines_folder(group: str) -> Path: def machines_folder(flake_name: str, group: str) -> Path:
return sops_secrets_folder() / group / "machines" return sops_secrets_folder(flake_name) / group / "machines"
def users_folder(group: str) -> Path: def users_folder(flake_name: str, group: str) -> Path:
return sops_secrets_folder() / group / "users" return sops_secrets_folder(flake_name) / group / "users"
def groups_folder(group: str) -> Path: def groups_folder(flake_name: str, group: str) -> Path:
return sops_secrets_folder() / group / "groups" return sops_secrets_folder(flake_name) / group / "groups"
def list_directory(directory: Path) -> str: def list_directory(directory: Path) -> str:
@@ -171,35 +188,37 @@ def disallow_member(group_folder: Path, name: str) -> None:
) )
def has_secret(secret: str) -> bool: def has_secret(flake_name: str, secret: str) -> bool:
return (sops_secrets_folder() / secret / "secret").exists() return (sops_secrets_folder(flake_name) / secret / "secret").exists()
def list_secrets() -> list[str]: def list_secrets(flake_name: str) -> list[str]:
path = sops_secrets_folder() path = sops_secrets_folder(flake_name)
def validate(name: str) -> bool: def validate(name: str) -> bool:
return VALID_SECRET_NAME.match(name) is not None and has_secret(name) return VALID_SECRET_NAME.match(name) is not None and has_secret(
flake_name, name
)
return list_objects(path, validate) return list_objects(path, validate)
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
lst = list_secrets() lst = list_secrets(args.flake)
if len(lst) > 0: if len(lst) > 0:
print("\n".join(lst)) print("\n".join(lst))
def decrypt_secret(secret: str) -> str: def decrypt_secret(flake_name: str, secret: str) -> str:
ensure_sops_key() ensure_sops_key(flake_name)
secret_path = sops_secrets_folder() / secret / "secret" secret_path = sops_secrets_folder(flake_name) / secret / "secret"
if not secret_path.exists(): if not secret_path.exists():
raise ClanError(f"Secret '{secret}' does not exist") raise ClanError(f"Secret '{secret}' does not exist")
return decrypt_file(secret_path) return decrypt_file(secret_path)
def get_command(args: argparse.Namespace) -> None: def get_command(args: argparse.Namespace) -> None:
print(decrypt_secret(args.secret), end="") print(decrypt_secret(args.flake, args.secret), end="")
def set_command(args: argparse.Namespace) -> None: def set_command(args: argparse.Namespace) -> None:
@@ -212,7 +231,8 @@ def set_command(args: argparse.Namespace) -> None:
elif tty.is_interactive(): elif tty.is_interactive():
secret_value = getpass.getpass(prompt="Paste your secret: ") secret_value = getpass.getpass(prompt="Paste your secret: ")
encrypt_secret( encrypt_secret(
sops_secrets_folder() / args.secret, args.flake,
sops_secrets_folder(args.flake) / args.secret,
secret_value, secret_value,
args.user, args.user,
args.machine, args.machine,
@@ -221,8 +241,8 @@ def set_command(args: argparse.Namespace) -> None:
def rename_command(args: argparse.Namespace) -> None: def rename_command(args: argparse.Namespace) -> None:
old_path = sops_secrets_folder() / args.secret old_path = sops_secrets_folder(args.flake) / args.secret
new_path = sops_secrets_folder() / args.new_name new_path = sops_secrets_folder(args.flake) / args.new_name
if not old_path.exists(): if not old_path.exists():
raise ClanError(f"Secret '{args.secret}' does not exist") raise ClanError(f"Secret '{args.secret}' does not exist")
if new_path.exists(): if new_path.exists():
@@ -237,9 +257,19 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
parser_get = subparser.add_parser("get", help="get a secret") parser_get = subparser.add_parser("get", help="get a secret")
add_secret_argument(parser_get) add_secret_argument(parser_get)
parser_get.set_defaults(func=get_command) parser_get.set_defaults(func=get_command)
parser_get.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_set = subparser.add_parser("set", help="set a secret") parser_set = subparser.add_parser("set", help="set a secret")
add_secret_argument(parser_set) add_secret_argument(parser_set)
parser_set.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser_set.add_argument( parser_set.add_argument(
"--group", "--group",
type=str, type=str,

View File

@@ -51,7 +51,7 @@ def generate_private_key() -> tuple[str, str]:
raise ClanError("Failed to generate private sops key") from e raise ClanError("Failed to generate private sops key") from e
def get_user_name(user: str) -> str: def get_user_name(flake_name: str, user: str) -> str:
"""Ask the user for their name until a unique one is provided.""" """Ask the user for their name until a unique one is provided."""
while True: while True:
name = input( name = input(
@@ -59,14 +59,14 @@ def get_user_name(user: str) -> str:
) )
if name: if name:
user = name user = name
if not (sops_users_folder() / user).exists(): if not (sops_users_folder(flake_name) / user).exists():
return user return user
print(f"{sops_users_folder() / user} already exists") print(f"{sops_users_folder(flake_name) / user} already exists")
def ensure_user_or_machine(pub_key: str) -> SopsKey: def ensure_user_or_machine(flake_name: str, pub_key: str) -> SopsKey:
key = SopsKey(pub_key, username="") key = SopsKey(pub_key, username="")
folders = [sops_users_folder(), sops_machines_folder()] folders = [sops_users_folder(flake_name), sops_machines_folder(flake_name)]
for folder in folders: for folder in folders:
if folder.exists(): if folder.exists():
for user in folder.iterdir(): for user in folder.iterdir():
@@ -90,13 +90,13 @@ def default_sops_key_path() -> Path:
return user_config_dir() / "sops" / "age" / "keys.txt" return user_config_dir() / "sops" / "age" / "keys.txt"
def ensure_sops_key() -> SopsKey: def ensure_sops_key(flake_name: str) -> SopsKey:
key = os.environ.get("SOPS_AGE_KEY") key = os.environ.get("SOPS_AGE_KEY")
if key: if key:
return ensure_user_or_machine(get_public_key(key)) return ensure_user_or_machine(flake_name, get_public_key(key))
path = default_sops_key_path() path = default_sops_key_path()
if path.exists(): if path.exists():
return ensure_user_or_machine(get_public_key(path.read_text())) return ensure_user_or_machine(flake_name, get_public_key(path.read_text()))
else: else:
raise ClanError( raise ClanError(
"No sops key found. Please generate one with 'clan secrets key generate'." "No sops key found. Please generate one with 'clan secrets key generate'."

View File

@@ -9,7 +9,7 @@ from typing import Any
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from ..dirs import get_clan_flake_toplevel from ..dirs import specific_flake_dir
from ..errors import ClanError from ..errors import ClanError
from .folders import sops_secrets_folder from .folders import sops_secrets_folder
from .machines import add_machine, has_machine from .machines import add_machine, has_machine
@@ -17,21 +17,29 @@ from .secrets import decrypt_secret, encrypt_secret, has_secret
from .sops import generate_private_key from .sops import generate_private_key
def generate_host_key(machine_name: str) -> None: def generate_host_key(flake_name: str, machine_name: str) -> None:
if has_machine(machine_name): if has_machine(flake_name, machine_name):
return return
priv_key, pub_key = generate_private_key() priv_key, pub_key = generate_private_key()
encrypt_secret(sops_secrets_folder() / f"{machine_name}-age.key", priv_key) encrypt_secret(
add_machine(machine_name, pub_key, False) flake_name,
sops_secrets_folder(flake_name) / f"{machine_name}-age.key",
priv_key,
)
add_machine(flake_name, machine_name, pub_key, False)
def generate_secrets_group( def generate_secrets_group(
secret_group: str, machine_name: str, tempdir: Path, secret_options: dict[str, Any] flake_name: str,
secret_group: str,
machine_name: str,
tempdir: Path,
secret_options: dict[str, Any],
) -> None: ) -> None:
clan_dir = get_clan_flake_toplevel() clan_dir = specific_flake_dir(flake_name)
secrets = secret_options["secrets"] secrets = secret_options["secrets"]
needs_regeneration = any( needs_regeneration = any(
not has_secret(f"{machine_name}-{secret['name']}") not has_secret(flake_name, f"{machine_name}-{secret['name']}")
for secret in secrets.values() for secret in secrets.values()
) )
generator = secret_options["generator"] generator = secret_options["generator"]
@@ -62,7 +70,8 @@ export secrets={shlex.quote(str(secrets_dir))}
msg += text msg += text
raise ClanError(msg) raise ClanError(msg)
encrypt_secret( encrypt_secret(
sops_secrets_folder() / f"{machine_name}-{secret['name']}", flake_name,
sops_secrets_folder(flake_name) / f"{machine_name}-{secret['name']}",
secret_file.read_text(), secret_file.read_text(),
add_machines=[machine_name], add_machines=[machine_name],
) )
@@ -79,17 +88,18 @@ export secrets={shlex.quote(str(secrets_dir))}
# this is called by the sops.nix clan core module # this is called by the sops.nix clan core module
def generate_secrets_from_nix( def generate_secrets_from_nix(
flake_name: str,
machine_name: str, machine_name: str,
secret_submodules: dict[str, Any], secret_submodules: dict[str, Any],
) -> None: ) -> None:
generate_host_key(machine_name) generate_host_key(flake_name, machine_name)
errors = {} errors = {}
with TemporaryDirectory() as d: with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets # if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items(): for secret_group, secret_options in secret_submodules.items():
try: try:
generate_secrets_group( generate_secrets_group(
secret_group, machine_name, Path(d), secret_options flake_name, secret_group, machine_name, Path(d), secret_options
) )
except ClanError as e: except ClanError as e:
errors[secret_group] = e errors[secret_group] = e
@@ -102,12 +112,15 @@ def generate_secrets_from_nix(
# this is called by the sops.nix clan core module # this is called by the sops.nix clan core module
def upload_age_key_from_nix( def upload_age_key_from_nix(
flake_name: str,
machine_name: str, machine_name: str,
) -> None: ) -> None:
secret_name = f"{machine_name}-age.key" secret_name = f"{machine_name}-age.key"
if not has_secret(secret_name): # skip uploading the secret, not managed by us if not has_secret(
flake_name, secret_name
): # skip uploading the secret, not managed by us
return return
secret = decrypt_secret(secret_name) secret = decrypt_secret(flake_name, secret_name)
secrets_dir = Path(os.environ["SECRETS_DIR"]) secrets_dir = Path(os.environ["SECRETS_DIR"])
(secrets_dir / "key.txt").write_text(secret) (secrets_dir / "key.txt").write_text(secret)

View File

@@ -4,6 +4,7 @@ import subprocess
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from ..dirs import specific_flake_dir
from ..machines.machines import Machine from ..machines.machines import Machine
from ..nix import nix_shell from ..nix import nix_shell
@@ -37,7 +38,7 @@ def upload_secrets(machine: Machine) -> None:
def upload_command(args: argparse.Namespace) -> None: def upload_command(args: argparse.Namespace) -> None:
machine = Machine(args.machine) machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
upload_secrets(machine) upload_secrets(machine)
@@ -46,4 +47,9 @@ def register_upload_parser(parser: argparse.ArgumentParser) -> None:
"machine", "machine",
help="The machine to upload secrets to", help="The machine to upload secrets to",
) )
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=upload_command) parser.set_defaults(func=upload_command)

View File

@@ -11,20 +11,20 @@ from .types import (
) )
def add_user(name: str, key: str, force: bool) -> None: def add_user(flake_name: str, name: str, key: str, force: bool) -> None:
write_key(sops_users_folder() / name, key, force) write_key(sops_users_folder(flake_name) / name, key, force)
def remove_user(name: str) -> None: def remove_user(flake_name: str, name: str) -> None:
remove_object(sops_users_folder(), name) remove_object(sops_users_folder(flake_name), name)
def get_user(name: str) -> str: def get_user(flake_name: str, name: str) -> str:
return read_key(sops_users_folder() / name) return read_key(sops_users_folder(flake_name) / name)
def list_users() -> list[str]: def list_users(flake_name: str) -> list[str]:
path = sops_users_folder() path = sops_users_folder(flake_name)
def validate(name: str) -> bool: def validate(name: str) -> bool:
return ( return (
@@ -35,38 +35,40 @@ def list_users() -> list[str]:
return list_objects(path, validate) return list_objects(path, validate)
def add_secret(user: str, secret: str) -> None: def add_secret(flake_name: str, user: str, secret: str) -> None:
secrets.allow_member(secrets.users_folder(secret), sops_users_folder(), user) secrets.allow_member(
secrets.users_folder(flake_name, secret), sops_users_folder(flake_name), user
)
def remove_secret(user: str, secret: str) -> None: def remove_secret(flake_name: str, user: str, secret: str) -> None:
secrets.disallow_member(secrets.users_folder(secret), user) secrets.disallow_member(secrets.users_folder(flake_name, secret), user)
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
lst = list_users() lst = list_users(args.flake)
if len(lst) > 0: if len(lst) > 0:
print("\n".join(lst)) print("\n".join(lst))
def add_command(args: argparse.Namespace) -> None: def add_command(args: argparse.Namespace) -> None:
add_user(args.user, args.key, args.force) add_user(args.flake, args.user, args.key, args.force)
def get_command(args: argparse.Namespace) -> None: def get_command(args: argparse.Namespace) -> None:
print(get_user(args.user)) print(get_user(args.flake, args.user))
def remove_command(args: argparse.Namespace) -> None: def remove_command(args: argparse.Namespace) -> None:
remove_user(args.user) remove_user(args.flake, args.user)
def add_secret_command(args: argparse.Namespace) -> None: def add_secret_command(args: argparse.Namespace) -> None:
add_secret(args.user, args.secret) add_secret(args.flake, args.user, args.secret)
def remove_secret_command(args: argparse.Namespace) -> None: def remove_secret_command(args: argparse.Namespace) -> None:
remove_secret(args.user, args.secret) remove_secret(args.flake, args.user, args.secret)
def register_users_parser(parser: argparse.ArgumentParser) -> None: def register_users_parser(parser: argparse.ArgumentParser) -> None:
@@ -77,6 +79,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
required=True, required=True,
) )
list_parser = subparser.add_parser("list", help="list users") list_parser = subparser.add_parser("list", help="list users")
list_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
list_parser.set_defaults(func=list_command) list_parser.set_defaults(func=list_command)
add_parser = subparser.add_parser("add", help="add a user") add_parser = subparser.add_parser("add", help="add a user")
@@ -90,14 +97,29 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
type=public_or_private_age_key_type, type=public_or_private_age_key_type,
) )
add_parser.set_defaults(func=add_command) add_parser.set_defaults(func=add_command)
add_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
get_parser = subparser.add_parser("get", help="get a user public key") get_parser = subparser.add_parser("get", help="get a user public key")
get_parser.add_argument("user", help="the name of the user", type=user_name_type) get_parser.add_argument("user", help="the name of the user", type=user_name_type)
get_parser.set_defaults(func=get_command) get_parser.set_defaults(func=get_command)
get_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
remove_parser = subparser.add_parser("remove", help="remove a user") remove_parser = subparser.add_parser("remove", help="remove a user")
remove_parser.add_argument("user", help="the name of the user", type=user_name_type) remove_parser.add_argument("user", help="the name of the user", type=user_name_type)
remove_parser.set_defaults(func=remove_command) remove_parser.set_defaults(func=remove_command)
remove_parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
add_secret_parser = subparser.add_parser( add_secret_parser = subparser.add_parser(
"add-secret", help="allow a user to access a secret" "add-secret", help="allow a user to access a secret"

View File

@@ -9,7 +9,7 @@ from pathlib import Path
from typing import Iterator from typing import Iterator
from uuid import UUID from uuid import UUID
from ..dirs import get_clan_flake_toplevel from ..dirs import specific_flake_dir
from ..nix import nix_build, nix_config, nix_shell from ..nix import nix_build, nix_config, nix_shell
from ..task_manager import BaseTask, Command, create_task from ..task_manager import BaseTask, Command, create_task
from .inspect import VmConfig, inspect_vm from .inspect import VmConfig, inspect_vm
@@ -147,7 +147,7 @@ def create_vm(vm: VmConfig) -> BuildVmTask:
def create_command(args: argparse.Namespace) -> None: def create_command(args: argparse.Namespace) -> None:
clan_dir = get_clan_flake_toplevel() clan_dir = specific_flake_dir(args.flake)
vm = asyncio.run(inspect_vm(flake_url=clan_dir, flake_attr=args.machine)) vm = asyncio.run(inspect_vm(flake_url=clan_dir, flake_attr=args.machine))
task = create_vm(vm) task = create_vm(vm)
@@ -157,4 +157,9 @@ def create_command(args: argparse.Namespace) -> None:
def register_create_parser(parser: argparse.ArgumentParser) -> None: def register_create_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("machine", type=str) parser.add_argument("machine", type=str)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=create_command) parser.set_defaults(func=create_command)

View File

@@ -6,7 +6,7 @@ from pathlib import Path
from pydantic import AnyUrl, BaseModel from pydantic import AnyUrl, BaseModel
from ..async_cmd import run from ..async_cmd import run
from ..dirs import get_clan_flake_toplevel from ..dirs import specific_flake_dir
from ..nix import nix_config, nix_eval from ..nix import nix_config, nix_eval
@@ -33,7 +33,7 @@ async def inspect_vm(flake_url: AnyUrl | Path, flake_attr: str) -> VmConfig:
def inspect_command(args: argparse.Namespace) -> None: def inspect_command(args: argparse.Namespace) -> None:
clan_dir = get_clan_flake_toplevel() clan_dir = specific_flake_dir(args.flake)
res = asyncio.run(inspect_vm(flake_url=clan_dir, flake_attr=args.machine)) res = asyncio.run(inspect_vm(flake_url=clan_dir, flake_attr=args.machine))
print("Cores:", res.cores) print("Cores:", res.cores)
print("Memory size:", res.memory_size) print("Memory size:", res.memory_size)
@@ -42,4 +42,9 @@ def inspect_command(args: argparse.Namespace) -> None:
def register_inspect_parser(parser: argparse.ArgumentParser) -> None: def register_inspect_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("machine", type=str) parser.add_argument("machine", type=str)
parser.add_argument(
"flake",
type=str,
help="name of the flake to create machine for",
)
parser.set_defaults(func=inspect_command) parser.set_defaults(func=inspect_command)

View File

@@ -4,7 +4,7 @@ from typing import Any
from pydantic import AnyUrl, BaseModel, validator from pydantic import AnyUrl, BaseModel, validator
from ..dirs import clan_data_dir, clan_flake_dir from ..dirs import clan_data_dir, clan_flakes_dir
from ..flakes.create import DEFAULT_URL from ..flakes.create import DEFAULT_URL
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ class ClanFlakePath(BaseModel):
@validator("dest") @validator("dest")
def check_dest(cls: Any, v: Path) -> Path: # noqa def check_dest(cls: Any, v: Path) -> Path: # noqa
return validate_path(clan_flake_dir(), v) return validate_path(clan_flakes_dir(), v)
class FlakeCreateInput(ClanFlakePath): class FlakeCreateInput(ClanFlakePath):

View File

@@ -2,7 +2,7 @@ import fileinput
import shutil import shutil
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Iterator from typing import Iterator, NamedTuple
import pytest import pytest
from root import CLAN_CORE from root import CLAN_CORE
@@ -27,22 +27,27 @@ def substitute(
print(line, end="") print(line, end="")
class TestFlake(NamedTuple):
name: str
path: Path
def create_flake( def create_flake(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
name: str, flake_name: str,
clan_core_flake: Path | None = None, clan_core_flake: Path | None = None,
machines: list[str] = [], machines: list[str] = [],
remote: bool = False, remote: bool = False,
) -> Iterator[Path]: ) -> Iterator[TestFlake]:
""" """
Creates a flake with the given name and machines. Creates a flake with the given name and machines.
The machine names map to the machines in ./test_machines The machine names map to the machines in ./test_machines
""" """
template = Path(__file__).parent / name template = Path(__file__).parent / flake_name
# copy the template to a new temporary location # copy the template to a new temporary location
with tempfile.TemporaryDirectory() as tmpdir_: with tempfile.TemporaryDirectory() as tmpdir_:
home = Path(tmpdir_) home = Path(tmpdir_)
flake = home / name flake = home / flake_name
shutil.copytree(template, flake) shutil.copytree(template, flake)
# lookup the requested machines in ./test_machines and include them # lookup the requested machines in ./test_machines and include them
if machines: if machines:
@@ -60,20 +65,20 @@ def create_flake(
with tempfile.TemporaryDirectory() as workdir: with tempfile.TemporaryDirectory() as workdir:
monkeypatch.chdir(workdir) monkeypatch.chdir(workdir)
monkeypatch.setenv("HOME", str(home)) monkeypatch.setenv("HOME", str(home))
yield flake yield TestFlake(flake_name, flake)
else: else:
monkeypatch.chdir(flake) monkeypatch.chdir(flake)
monkeypatch.setenv("HOME", str(home)) monkeypatch.setenv("HOME", str(home))
yield flake yield TestFlake(flake_name, flake)
@pytest.fixture @pytest.fixture
def test_flake(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: def test_flake(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestFlake]:
yield from create_flake(monkeypatch, "test_flake") yield from create_flake(monkeypatch, "test_flake")
@pytest.fixture @pytest.fixture
def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestFlake]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( raise Exception(
"clan-core flake not found. This test requires the clan-core flake to be present" "clan-core flake not found. This test requires the clan-core flake to be present"
@@ -82,7 +87,9 @@ def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
@pytest.fixture @pytest.fixture
def test_flake_with_core_and_pass(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: def test_flake_with_core_and_pass(
monkeypatch: pytest.MonkeyPatch,
) -> Iterator[TestFlake]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( raise Exception(
"clan-core flake not found. This test requires the clan-core flake to be present" "clan-core flake not found. This test requires the clan-core flake to be present"

View File

@@ -2,7 +2,7 @@ from pathlib import Path
import pytest import pytest
from clan_cli.dirs import get_clan_flake_toplevel from clan_cli.dirs import _get_clan_flake_toplevel
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -11,12 +11,12 @@ def test_get_clan_flake_toplevel(
) -> None: ) -> None:
monkeypatch.chdir(temporary_dir) monkeypatch.chdir(temporary_dir)
with pytest.raises(ClanError): with pytest.raises(ClanError):
print(get_clan_flake_toplevel()) print(_get_clan_flake_toplevel())
(temporary_dir / ".git").touch() (temporary_dir / ".git").touch()
assert get_clan_flake_toplevel() == temporary_dir assert _get_clan_flake_toplevel() == temporary_dir
subdir = temporary_dir / "subdir" subdir = temporary_dir / "subdir"
subdir.mkdir() subdir.mkdir()
monkeypatch.chdir(subdir) monkeypatch.chdir(subdir)
(subdir / ".clan-flake").touch() (subdir / ".clan-flake").touch()
assert get_clan_flake_toplevel() == subdir assert _get_clan_flake_toplevel() == subdir

View File

@@ -1,8 +1,8 @@
from pathlib import Path from fixtures_flakes import TestFlake
from clan_cli.config import machine from clan_cli.config import machine
def test_schema_for_machine(test_flake: Path) -> None: def test_schema_for_machine(test_flake: TestFlake) -> None:
schema = machine.schema_for_machine("machine1", flake=test_flake) schema = machine.schema_for_machine(test_flake.name, "machine1")
assert "properties" in schema assert "properties" in schema

View File

@@ -1,8 +1,8 @@
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from cli import Cli from cli import Cli
from fixtures_flakes import TestFlake
from clan_cli.machines.facts import machine_get_fact from clan_cli.machines.facts import machine_get_fact
from clan_cli.secrets.folders import sops_secrets_folder from clan_cli.secrets.folders import sops_secrets_folder
@@ -15,21 +15,27 @@ if TYPE_CHECKING:
@pytest.mark.impure @pytest.mark.impure
def test_generate_secret( def test_generate_secret(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: Path, test_flake_with_core: TestFlake,
age_keys: list["KeyPair"], age_keys: list["KeyPair"],
) -> None: ) -> None:
monkeypatch.chdir(test_flake_with_core) monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli() cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey]) cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1"])
has_secret("vm1-age.key") has_secret(test_flake_with_core.name, "vm1-age.key")
has_secret("vm1-zerotier-identity-secret") has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret")
network_id = machine_get_fact("vm1", "zerotier-network-id") network_id = machine_get_fact(
test_flake_with_core.name, "vm1", "zerotier-network-id"
)
assert len(network_id) == 16 assert len(network_id) == 16
age_key = sops_secrets_folder().joinpath("vm1-age.key").joinpath("secret") age_key = (
sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-age.key")
.joinpath("secret")
)
identity_secret = ( identity_secret = (
sops_secrets_folder() sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-zerotier-identity-secret") .joinpath("vm1-zerotier-identity-secret")
.joinpath("secret") .joinpath("secret")
) )
@@ -42,7 +48,7 @@ def test_generate_secret(
assert identity_secret.lstat().st_mtime_ns == secret1_mtime assert identity_secret.lstat().st_mtime_ns == secret1_mtime
machine_path = ( machine_path = (
sops_secrets_folder() sops_secrets_folder(test_flake_with_core.name)
.joinpath("vm1-zerotier-identity-secret") .joinpath("vm1-zerotier-identity-secret")
.joinpath("machines") .joinpath("machines")
.joinpath("vm1") .joinpath("vm1")

View File

@@ -3,6 +3,7 @@ from pathlib import Path
import pytest import pytest
from cli import Cli from cli import Cli
from fixtures_flakes import TestFlake
from clan_cli.machines.facts import machine_get_fact from clan_cli.machines.facts import machine_get_fact
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
@@ -12,11 +13,11 @@ from clan_cli.ssh import HostGroup
@pytest.mark.impure @pytest.mark.impure
def test_upload_secret( def test_upload_secret(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
test_flake_with_core_and_pass: Path, test_flake_with_core_and_pass: TestFlake,
temporary_dir: Path, temporary_dir: Path,
host_group: HostGroup, host_group: HostGroup,
) -> None: ) -> None:
monkeypatch.chdir(test_flake_with_core_and_pass) monkeypatch.chdir(test_flake_with_core_and_pass.path)
gnupghome = temporary_dir / "gpg" gnupghome = temporary_dir / "gpg"
gnupghome.mkdir(mode=0o700) gnupghome.mkdir(mode=0o700)
monkeypatch.setenv("GNUPGHOME", str(gnupghome)) monkeypatch.setenv("GNUPGHOME", str(gnupghome))
@@ -39,7 +40,9 @@ def test_upload_secret(
) )
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True) subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1"])
network_id = machine_get_fact("vm1", "zerotier-network-id") network_id = machine_get_fact(
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
)
assert len(network_id) == 16 assert len(network_id) == 16
identity_secret = ( identity_secret = (
temporary_dir / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg" temporary_dir / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg"
@@ -50,13 +53,13 @@ def test_upload_secret(
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1"])
assert identity_secret.lstat().st_mtime_ns == secret1_mtime assert identity_secret.lstat().st_mtime_ns == secret1_mtime
flake = test_flake_with_core_and_pass.joinpath("flake.nix") flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
host = host_group.hosts[0] host = host_group.hosts[0]
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}" addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr) new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text) flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1"]) cli.run(["secrets", "upload", "vm1"])
zerotier_identity_secret = ( zerotier_identity_secret = (
test_flake_with_core_and_pass / "secrets" / "zerotier-identity-secret" test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
) )
assert zerotier_identity_secret.exists() assert zerotier_identity_secret.exists()

View File

@@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Iterator
import pytest import pytest
from api import TestClient from api import TestClient
from cli import Cli from cli import Cli
from fixtures_flakes import create_flake from fixtures_flakes import TestFlake, create_flake
from httpx import SyncByteStream from httpx import SyncByteStream
from root import CLAN_CORE from root import CLAN_CORE
@@ -14,7 +14,7 @@ if TYPE_CHECKING:
@pytest.fixture @pytest.fixture
def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]: def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[TestFlake]:
yield from create_flake( yield from create_flake(
monkeypatch, monkeypatch,
"test_flake_with_core_dynamic_machines", "test_flake_with_core_dynamic_machines",
@@ -26,7 +26,7 @@ def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path
@pytest.fixture @pytest.fixture
def remote_flake_with_vm_without_secrets( def remote_flake_with_vm_without_secrets(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> Iterator[Path]: ) -> Iterator[TestFlake]:
yield from create_flake( yield from create_flake(
monkeypatch, monkeypatch,
"test_flake_with_core_dynamic_machines", "test_flake_with_core_dynamic_machines",