Merge pull request 'clan-cli secrets: flake_name -> flake_dir' (#450) from flakes into main
This commit is contained in:
@@ -33,7 +33,6 @@ in
|
|||||||
import sys
|
import sys
|
||||||
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
|
from clan_cli.secrets.sops_generate import generate_secrets_from_nix
|
||||||
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; secret_submodules = config.clanCore.secrets; })})
|
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; secret_submodules = config.clanCore.secrets; })})
|
||||||
args["flake_name"] = sys.argv[1]
|
|
||||||
generate_secrets_from_nix(**args)
|
generate_secrets_from_nix(**args)
|
||||||
'';
|
'';
|
||||||
uploadSecrets = pkgs.writeScript "upload-secrets" ''
|
uploadSecrets = pkgs.writeScript "upload-secrets" ''
|
||||||
@@ -43,7 +42,6 @@ in
|
|||||||
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
|
from clan_cli.secrets.sops_generate import upload_age_key_from_nix
|
||||||
# the second toJSON is needed to escape the string for the python
|
# the second toJSON is needed to escape the string for the python
|
||||||
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
|
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})
|
||||||
args["flake_name"] = sys.argv[1]
|
|
||||||
upload_age_key_from_nix(**args)
|
upload_age_key_from_nix(**args)
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ from typing import Any, Optional, Sequence
|
|||||||
|
|
||||||
from . import config, flakes, join, machines, secrets, vms, webui
|
from . import config, flakes, join, machines, secrets, vms, webui
|
||||||
from .custom_logger import setup_logging
|
from .custom_logger import setup_logging
|
||||||
|
from .dirs import get_clan_flake_toplevel
|
||||||
from .ssh import cli as ssh_cli
|
from .ssh import cli as ssh_cli
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -53,6 +54,12 @@ def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
|
|||||||
default=[],
|
default=[],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"--flake",
|
||||||
|
help="path to the flake where the clan resides in",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
subparsers = parser.add_subparsers()
|
subparsers = parser.add_subparsers()
|
||||||
|
|
||||||
parser_flake = subparsers.add_parser(
|
parser_flake = subparsers.add_parser(
|
||||||
@@ -100,6 +107,9 @@ def main() -> None:
|
|||||||
setup_logging(logging.DEBUG)
|
setup_logging(logging.DEBUG)
|
||||||
log.debug("Debug log activated")
|
log.debug("Debug log activated")
|
||||||
|
|
||||||
|
if args.flake is None:
|
||||||
|
args.flake = get_clan_flake_toplevel()
|
||||||
|
|
||||||
if not hasattr(args, "func"):
|
if not hasattr(args, "func"):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from .types import FlakeName
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def get_clan_flake_toplevel() -> Path:
|
def get_clan_flake_toplevel() -> Optional[Path]:
|
||||||
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
|
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
|
||||||
|
|
||||||
|
|
||||||
@@ -21,7 +21,7 @@ def find_git_repo_root() -> Optional[Path]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def find_toplevel(top_level_files: list[str]) -> Path:
|
def find_toplevel(top_level_files: list[str]) -> Optional[Path]:
|
||||||
"""Returns the path to the toplevel of the clan flake"""
|
"""Returns the path to the toplevel of the clan flake"""
|
||||||
for project_file in top_level_files:
|
for project_file in top_level_files:
|
||||||
initial_path = Path(os.getcwd())
|
initial_path = Path(os.getcwd())
|
||||||
@@ -30,7 +30,7 @@ def find_toplevel(top_level_files: list[str]) -> Path:
|
|||||||
if (path / project_file).exists():
|
if (path / project_file).exists():
|
||||||
return path
|
return path
|
||||||
path = path.parent
|
path = path.parent
|
||||||
raise ClanError("Could not find clan flake toplevel directory")
|
return None
|
||||||
|
|
||||||
|
|
||||||
def user_config_dir() -> Path:
|
def user_config_dir() -> Path:
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ def install_nixos(machine: Machine, flake_name: FlakeName) -> None:
|
|||||||
|
|
||||||
flake_attr = h.meta.get("flake_attr", "")
|
flake_attr = h.meta.get("flake_attr", "")
|
||||||
|
|
||||||
generate_secrets(machine, flake_name)
|
generate_secrets(machine)
|
||||||
|
|
||||||
with TemporaryDirectory() as tmpdir_:
|
with TemporaryDirectory() as tmpdir_:
|
||||||
tmpdir = Path(tmpdir_)
|
tmpdir = Path(tmpdir_)
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ class Machine:
|
|||||||
env["SECRETS_DIR"] = str(secrets_dir)
|
env["SECRETS_DIR"] = str(secrets_dir)
|
||||||
print(f"uploading secrets... {self.upload_secrets}")
|
print(f"uploading secrets... {self.upload_secrets}")
|
||||||
proc = subprocess.run(
|
proc = subprocess.run(
|
||||||
[self.upload_secrets, self.flake_dir.name],
|
[self.upload_secrets],
|
||||||
env=env,
|
env=env,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
text=True,
|
text=True,
|
||||||
|
|||||||
@@ -4,13 +4,12 @@ import os
|
|||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from ..dirs import get_clan_flake_toplevel
|
from ..errors import ClanError
|
||||||
from ..machines.machines import Machine
|
from ..machines.machines import Machine
|
||||||
from ..nix import nix_build, nix_command, nix_config
|
from ..nix import nix_build, nix_command, nix_config
|
||||||
from ..secrets.generate import generate_secrets
|
from ..secrets.generate import generate_secrets
|
||||||
from ..secrets.upload import upload_secrets
|
from ..secrets.upload import upload_secrets
|
||||||
from ..ssh import Host, HostGroup, HostKeyCheck, parse_deployment_address
|
from ..ssh import Host, HostGroup, HostKeyCheck, parse_deployment_address
|
||||||
from ..types import FlakeName
|
|
||||||
|
|
||||||
|
|
||||||
def deploy_nixos(hosts: HostGroup, clan_dir: Path) -> None:
|
def deploy_nixos(hosts: HostGroup, clan_dir: Path) -> None:
|
||||||
@@ -41,7 +40,7 @@ def deploy_nixos(hosts: HostGroup, clan_dir: Path) -> None:
|
|||||||
|
|
||||||
flake_attr = h.meta.get("flake_attr", "")
|
flake_attr = h.meta.get("flake_attr", "")
|
||||||
|
|
||||||
generate_secrets(h.meta["machine"], FlakeName(clan_dir.name))
|
generate_secrets(h.meta["machine"])
|
||||||
upload_secrets(h.meta["machine"])
|
upload_secrets(h.meta["machine"])
|
||||||
|
|
||||||
target_host = h.meta.get("target_host")
|
target_host = h.meta.get("target_host")
|
||||||
@@ -117,11 +116,9 @@ def get_selected_machines(machine_names: list[str], flake_dir: Path) -> HostGrou
|
|||||||
# FIXME: we want some kind of inventory here.
|
# FIXME: we want some kind of inventory here.
|
||||||
def update(args: argparse.Namespace) -> None:
|
def update(args: argparse.Namespace) -> None:
|
||||||
if args.flake is None:
|
if args.flake is None:
|
||||||
flake_dir = get_clan_flake_toplevel()
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
else:
|
|
||||||
flake_dir = args.flake
|
|
||||||
if len(args.machines) == 1 and args.target_host is not None:
|
if len(args.machines) == 1 and args.target_host is not None:
|
||||||
machine = Machine(name=args.machines[0], flake_dir=flake_dir)
|
machine = Machine(name=args.machines[0], flake_dir=args.flake)
|
||||||
machine.deployment_address = args.target_host
|
machine.deployment_address = args.target_host
|
||||||
host = parse_deployment_address(
|
host = parse_deployment_address(
|
||||||
args.machines[0],
|
args.machines[0],
|
||||||
@@ -135,11 +132,11 @@ def update(args: argparse.Namespace) -> None:
|
|||||||
exit(1)
|
exit(1)
|
||||||
else:
|
else:
|
||||||
if len(args.machines) == 0:
|
if len(args.machines) == 0:
|
||||||
machines = get_all_machines(flake_dir)
|
machines = get_all_machines(args.flake)
|
||||||
else:
|
else:
|
||||||
machines = get_selected_machines(args.machines, flake_dir)
|
machines = get_selected_machines(args.machines, args.flake)
|
||||||
|
|
||||||
deploy_nixos(machines, flake_dir)
|
deploy_nixos(machines, args.flake)
|
||||||
|
|
||||||
|
|
||||||
def register_update_parser(parser: argparse.ArgumentParser) -> None:
|
def register_update_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
|
|||||||
@@ -3,18 +3,16 @@ import shutil
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
from ..dirs import specific_flake_dir
|
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..types import FlakeName
|
|
||||||
|
|
||||||
|
|
||||||
def get_sops_folder(flake_name: FlakeName) -> Path:
|
def get_sops_folder(flake_dir: Path) -> Path:
|
||||||
return specific_flake_dir(flake_name) / "sops"
|
return flake_dir / "sops"
|
||||||
|
|
||||||
|
|
||||||
def gen_sops_subfolder(subdir: str) -> Callable[[FlakeName], Path]:
|
def gen_sops_subfolder(subdir: str) -> Callable[[Path], Path]:
|
||||||
def folder(flake_name: FlakeName) -> Path:
|
def folder(flake_dir: Path) -> Path:
|
||||||
return specific_flake_dir(flake_name) / "sops" / subdir
|
return flake_dir / "sops" / subdir
|
||||||
|
|
||||||
return folder
|
return folder
|
||||||
|
|
||||||
|
|||||||
@@ -6,21 +6,19 @@ import sys
|
|||||||
|
|
||||||
from clan_cli.errors import ClanError
|
from clan_cli.errors import ClanError
|
||||||
|
|
||||||
from ..dirs import specific_flake_dir
|
|
||||||
from ..machines.machines import Machine
|
from ..machines.machines import Machine
|
||||||
from ..types import FlakeName
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def generate_secrets(machine: Machine, flake_name: FlakeName) -> None:
|
def generate_secrets(machine: Machine) -> None:
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env["CLAN_DIR"] = str(machine.flake_dir)
|
env["CLAN_DIR"] = str(machine.flake_dir)
|
||||||
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
|
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
|
||||||
|
|
||||||
print(f"generating secrets... {machine.generate_secrets}")
|
print(f"generating secrets... {machine.generate_secrets}")
|
||||||
proc = subprocess.run(
|
proc = subprocess.run(
|
||||||
[machine.generate_secrets, flake_name],
|
[machine.generate_secrets],
|
||||||
env=env,
|
env=env,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -31,8 +29,8 @@ def generate_secrets(machine: Machine, flake_name: FlakeName) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def generate_command(args: argparse.Namespace) -> None:
|
def generate_command(args: argparse.Namespace) -> None:
|
||||||
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
|
machine = Machine(name=args.machine, flake_dir=args.flake)
|
||||||
generate_secrets(machine, args.flake)
|
generate_secrets(machine)
|
||||||
|
|
||||||
|
|
||||||
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
@@ -40,9 +38,4 @@ def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
"machine",
|
"machine",
|
||||||
help="The machine to generate secrets for",
|
help="The machine to generate secrets for",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser.set_defaults(func=generate_command)
|
parser.set_defaults(func=generate_command)
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ from pathlib import Path
|
|||||||
|
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..machines.types import machine_name_type, validate_hostname
|
from ..machines.types import machine_name_type, validate_hostname
|
||||||
from ..types import FlakeName
|
|
||||||
from . import secrets
|
from . import secrets
|
||||||
from .folders import (
|
from .folders import (
|
||||||
sops_groups_folder,
|
sops_groups_folder,
|
||||||
@@ -21,27 +20,27 @@ from .types import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def machines_folder(flake_name: FlakeName, group: str) -> Path:
|
def machines_folder(flake_dir: Path, group: str) -> Path:
|
||||||
return sops_groups_folder(flake_name) / group / "machines"
|
return sops_groups_folder(flake_dir) / group / "machines"
|
||||||
|
|
||||||
|
|
||||||
def users_folder(flake_name: FlakeName, group: str) -> Path:
|
def users_folder(flake_dir: Path, group: str) -> Path:
|
||||||
return sops_groups_folder(flake_name) / group / "users"
|
return sops_groups_folder(flake_dir) / group / "users"
|
||||||
|
|
||||||
|
|
||||||
class Group:
|
class Group:
|
||||||
def __init__(
|
def __init__(
|
||||||
self, flake_name: FlakeName, name: str, machines: list[str], users: list[str]
|
self, flake_dir: Path, name: str, machines: list[str], users: list[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
self.name = name
|
self.name = name
|
||||||
self.machines = machines
|
self.machines = machines
|
||||||
self.users = users
|
self.users = users
|
||||||
self.flake_name = flake_name
|
self.flake_dir = flake_dir
|
||||||
|
|
||||||
|
|
||||||
def list_groups(flake_name: FlakeName) -> list[Group]:
|
def list_groups(flake_dir: Path) -> list[Group]:
|
||||||
groups: list[Group] = []
|
groups: list[Group] = []
|
||||||
folder = sops_groups_folder(flake_name)
|
folder = sops_groups_folder(flake_dir)
|
||||||
if not folder.exists():
|
if not folder.exists():
|
||||||
return groups
|
return groups
|
||||||
|
|
||||||
@@ -49,24 +48,24 @@ def list_groups(flake_name: FlakeName) -> list[Group]:
|
|||||||
group_folder = folder / name
|
group_folder = folder / name
|
||||||
if not group_folder.is_dir():
|
if not group_folder.is_dir():
|
||||||
continue
|
continue
|
||||||
machines_path = machines_folder(flake_name, name)
|
machines_path = machines_folder(flake_dir, name)
|
||||||
machines = []
|
machines = []
|
||||||
if machines_path.is_dir():
|
if machines_path.is_dir():
|
||||||
for f in machines_path.iterdir():
|
for f in machines_path.iterdir():
|
||||||
if validate_hostname(f.name):
|
if validate_hostname(f.name):
|
||||||
machines.append(f.name)
|
machines.append(f.name)
|
||||||
users_path = users_folder(flake_name, name)
|
users_path = users_folder(flake_dir, name)
|
||||||
users = []
|
users = []
|
||||||
if users_path.is_dir():
|
if users_path.is_dir():
|
||||||
for f in users_path.iterdir():
|
for f in users_path.iterdir():
|
||||||
if VALID_USER_NAME.match(f.name):
|
if VALID_USER_NAME.match(f.name):
|
||||||
users.append(f.name)
|
users.append(f.name)
|
||||||
groups.append(Group(flake_name, name, machines, users))
|
groups.append(Group(flake_dir, name, machines, users))
|
||||||
return groups
|
return groups
|
||||||
|
|
||||||
|
|
||||||
def list_command(args: argparse.Namespace) -> None:
|
def list_command(args: argparse.Namespace) -> None:
|
||||||
for group in list_groups(args.flake):
|
for group in list_groups(Path(args.flake)):
|
||||||
print(group.name)
|
print(group.name)
|
||||||
if group.machines:
|
if group.machines:
|
||||||
print("machines:")
|
print("machines:")
|
||||||
@@ -88,9 +87,9 @@ def list_directory(directory: Path) -> str:
|
|||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
def update_group_keys(flake_name: FlakeName, group: str) -> None:
|
def update_group_keys(flake_dir: Path, group: str) -> None:
|
||||||
for secret_ in secrets.list_secrets(flake_name):
|
for secret_ in secrets.list_secrets(flake_dir):
|
||||||
secret = sops_secrets_folder(flake_name) / secret_
|
secret = sops_secrets_folder(flake_dir) / secret_
|
||||||
if (secret / "groups" / group).is_symlink():
|
if (secret / "groups" / group).is_symlink():
|
||||||
update_keys(
|
update_keys(
|
||||||
secret,
|
secret,
|
||||||
@@ -99,7 +98,7 @@ def update_group_keys(flake_name: FlakeName, group: str) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def add_member(
|
def add_member(
|
||||||
flake_name: FlakeName, group_folder: Path, source_folder: Path, name: str
|
flake_dir: Path, group_folder: Path, source_folder: Path, name: str
|
||||||
) -> None:
|
) -> None:
|
||||||
source = source_folder / name
|
source = source_folder / name
|
||||||
if not source.exists():
|
if not source.exists():
|
||||||
@@ -115,10 +114,10 @@ def add_member(
|
|||||||
)
|
)
|
||||||
os.remove(user_target)
|
os.remove(user_target)
|
||||||
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
||||||
update_group_keys(flake_name, group_folder.parent.name)
|
update_group_keys(flake_dir, group_folder.parent.name)
|
||||||
|
|
||||||
|
|
||||||
def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
|
def remove_member(flake_dir: Path, group_folder: Path, name: str) -> None:
|
||||||
target = group_folder / name
|
target = group_folder / name
|
||||||
if not target.exists():
|
if not target.exists():
|
||||||
msg = f"{name} does not exist in group in {group_folder}: "
|
msg = f"{name} does not exist in group in {group_folder}: "
|
||||||
@@ -127,7 +126,7 @@ def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
|
|||||||
os.remove(target)
|
os.remove(target)
|
||||||
|
|
||||||
if len(os.listdir(group_folder)) > 0:
|
if len(os.listdir(group_folder)) > 0:
|
||||||
update_group_keys(flake_name, group_folder.parent.name)
|
update_group_keys(flake_dir, group_folder.parent.name)
|
||||||
|
|
||||||
if len(os.listdir(group_folder)) == 0:
|
if len(os.listdir(group_folder)) == 0:
|
||||||
os.rmdir(group_folder)
|
os.rmdir(group_folder)
|
||||||
@@ -136,65 +135,65 @@ def remove_member(flake_name: FlakeName, group_folder: Path, name: str) -> None:
|
|||||||
os.rmdir(group_folder.parent)
|
os.rmdir(group_folder.parent)
|
||||||
|
|
||||||
|
|
||||||
def add_user(flake_name: FlakeName, group: str, name: str) -> None:
|
def add_user(flake_dir: Path, group: str, name: str) -> None:
|
||||||
add_member(
|
add_member(
|
||||||
flake_name, users_folder(flake_name, group), sops_users_folder(flake_name), name
|
flake_dir, users_folder(flake_dir, group), sops_users_folder(flake_dir), name
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_user_command(args: argparse.Namespace) -> None:
|
def add_user_command(args: argparse.Namespace) -> None:
|
||||||
add_user(args.flake, args.group, args.user)
|
add_user(Path(args.flake), args.group, args.user)
|
||||||
|
|
||||||
|
|
||||||
def remove_user(flake_name: FlakeName, group: str, name: str) -> None:
|
def remove_user(flake_dir: Path, group: str, name: str) -> None:
|
||||||
remove_member(flake_name, users_folder(flake_name, group), name)
|
remove_member(flake_dir, users_folder(flake_dir, group), name)
|
||||||
|
|
||||||
|
|
||||||
def remove_user_command(args: argparse.Namespace) -> None:
|
def remove_user_command(args: argparse.Namespace) -> None:
|
||||||
remove_user(args.flake, args.group, args.user)
|
remove_user(Path(args.flake), args.group, args.user)
|
||||||
|
|
||||||
|
|
||||||
def add_machine(flake_name: FlakeName, group: str, name: str) -> None:
|
def add_machine(flake_dir: Path, group: str, name: str) -> None:
|
||||||
add_member(
|
add_member(
|
||||||
flake_name,
|
flake_dir,
|
||||||
machines_folder(flake_name, group),
|
machines_folder(flake_dir, group),
|
||||||
sops_machines_folder(flake_name),
|
sops_machines_folder(flake_dir),
|
||||||
name,
|
name,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_machine_command(args: argparse.Namespace) -> None:
|
def add_machine_command(args: argparse.Namespace) -> None:
|
||||||
add_machine(args.flake, args.group, args.machine)
|
add_machine(Path(args.flake), args.group, args.machine)
|
||||||
|
|
||||||
|
|
||||||
def remove_machine(flake_name: FlakeName, group: str, name: str) -> None:
|
def remove_machine(flake_dir: Path, group: str, name: str) -> None:
|
||||||
remove_member(flake_name, machines_folder(flake_name, group), name)
|
remove_member(flake_dir, machines_folder(flake_dir, group), name)
|
||||||
|
|
||||||
|
|
||||||
def remove_machine_command(args: argparse.Namespace) -> None:
|
def remove_machine_command(args: argparse.Namespace) -> None:
|
||||||
remove_machine(args.flake, args.group, args.machine)
|
remove_machine(Path(args.flake), args.group, args.machine)
|
||||||
|
|
||||||
|
|
||||||
def add_group_argument(parser: argparse.ArgumentParser) -> None:
|
def add_group_argument(parser: argparse.ArgumentParser) -> None:
|
||||||
parser.add_argument("group", help="the name of the secret", type=group_name_type)
|
parser.add_argument("group", help="the name of the secret", type=group_name_type)
|
||||||
|
|
||||||
|
|
||||||
def add_secret(flake_name: FlakeName, group: str, name: str) -> None:
|
def add_secret(flake_dir: Path, group: str, name: str) -> None:
|
||||||
secrets.allow_member(
|
secrets.allow_member(
|
||||||
secrets.groups_folder(flake_name, name), sops_groups_folder(flake_name), group
|
secrets.groups_folder(flake_dir, name), sops_groups_folder(flake_dir), group
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_secret_command(args: argparse.Namespace) -> None:
|
def add_secret_command(args: argparse.Namespace) -> None:
|
||||||
add_secret(args.flake, args.group, args.secret)
|
add_secret(Path(args.flake), args.group, args.secret)
|
||||||
|
|
||||||
|
|
||||||
def remove_secret(flake_name: FlakeName, group: str, name: str) -> None:
|
def remove_secret(flake_dir: Path, group: str, name: str) -> None:
|
||||||
secrets.disallow_member(secrets.groups_folder(flake_name, name), group)
|
secrets.disallow_member(secrets.groups_folder(flake_dir, name), group)
|
||||||
|
|
||||||
|
|
||||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||||
remove_secret(args.flake, args.group, args.secret)
|
remove_secret(Path(args.flake), args.group, args.secret)
|
||||||
|
|
||||||
|
|
||||||
def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
@@ -207,11 +206,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
|
|
||||||
# List groups
|
# List groups
|
||||||
list_parser = subparser.add_parser("list", help="list groups")
|
list_parser = subparser.add_parser("list", help="list groups")
|
||||||
list_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
list_parser.set_defaults(func=list_command)
|
list_parser.set_defaults(func=list_command)
|
||||||
|
|
||||||
# Add user
|
# Add user
|
||||||
@@ -222,11 +216,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
add_machine_parser.add_argument(
|
add_machine_parser.add_argument(
|
||||||
"machine", help="the name of the machines to add", type=machine_name_type
|
"machine", help="the name of the machines to add", type=machine_name_type
|
||||||
)
|
)
|
||||||
add_machine_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
add_machine_parser.set_defaults(func=add_machine_command)
|
add_machine_parser.set_defaults(func=add_machine_command)
|
||||||
|
|
||||||
# Remove machine
|
# Remove machine
|
||||||
@@ -237,11 +226,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
remove_machine_parser.add_argument(
|
remove_machine_parser.add_argument(
|
||||||
"machine", help="the name of the machines to remove", type=machine_name_type
|
"machine", help="the name of the machines to remove", type=machine_name_type
|
||||||
)
|
)
|
||||||
remove_machine_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
remove_machine_parser.set_defaults(func=remove_machine_command)
|
remove_machine_parser.set_defaults(func=remove_machine_command)
|
||||||
|
|
||||||
# Add user
|
# Add user
|
||||||
@@ -250,11 +234,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
add_user_parser.add_argument(
|
add_user_parser.add_argument(
|
||||||
"user", help="the name of the user to add", type=user_name_type
|
"user", help="the name of the user to add", type=user_name_type
|
||||||
)
|
)
|
||||||
add_user_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
add_user_parser.set_defaults(func=add_user_command)
|
add_user_parser.set_defaults(func=add_user_command)
|
||||||
|
|
||||||
# Remove user
|
# Remove user
|
||||||
@@ -265,11 +244,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
remove_user_parser.add_argument(
|
remove_user_parser.add_argument(
|
||||||
"user", help="the name of the user to remove", type=user_name_type
|
"user", help="the name of the user to remove", type=user_name_type
|
||||||
)
|
)
|
||||||
remove_user_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
remove_user_parser.set_defaults(func=remove_user_command)
|
remove_user_parser.set_defaults(func=remove_user_command)
|
||||||
|
|
||||||
# Add secret
|
# Add secret
|
||||||
@@ -282,11 +256,6 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
add_secret_parser.add_argument(
|
add_secret_parser.add_argument(
|
||||||
"secret", help="the name of the secret", type=secret_name_type
|
"secret", help="the name of the secret", type=secret_name_type
|
||||||
)
|
)
|
||||||
add_secret_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
add_secret_parser.set_defaults(func=add_secret_command)
|
add_secret_parser.set_defaults(func=add_secret_command)
|
||||||
|
|
||||||
# Remove secret
|
# Remove secret
|
||||||
@@ -299,9 +268,4 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
remove_secret_parser.add_argument(
|
remove_secret_parser.add_argument(
|
||||||
"secret", help="the name of the secret", type=secret_name_type
|
"secret", help="the name of the secret", type=secret_name_type
|
||||||
)
|
)
|
||||||
remove_secret_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||||
|
|||||||
@@ -36,15 +36,15 @@ def import_sops(args: argparse.Namespace) -> None:
|
|||||||
file=sys.stderr,
|
file=sys.stderr,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
if (sops_secrets_folder(args.flake) / k / "secret").exists():
|
if (sops_secrets_folder(Path(args.flake)) / k / "secret").exists():
|
||||||
print(
|
print(
|
||||||
f"WARNING: {k} already exists, skipping",
|
f"WARNING: {k} already exists, skipping",
|
||||||
file=sys.stderr,
|
file=sys.stderr,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
encrypt_secret(
|
encrypt_secret(
|
||||||
args.flake,
|
Path(args.flake),
|
||||||
sops_secrets_folder(args.flake) / k,
|
sops_secrets_folder(Path(args.flake)) / k,
|
||||||
v,
|
v,
|
||||||
add_groups=args.group,
|
add_groups=args.group,
|
||||||
add_machines=args.machine,
|
add_machines=args.machine,
|
||||||
@@ -91,10 +91,5 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
type=str,
|
type=str,
|
||||||
help="the sops file to import (- for stdin)",
|
help="the sops file to import (- for stdin)",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake",
|
|
||||||
)
|
|
||||||
|
|
||||||
parser.set_defaults(func=import_sops)
|
parser.set_defaults(func=import_sops)
|
||||||
|
|||||||
@@ -1,74 +1,87 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from ..errors import ClanError
|
||||||
from ..machines.types import machine_name_type, validate_hostname
|
from ..machines.types import machine_name_type, validate_hostname
|
||||||
from ..types import FlakeName
|
|
||||||
from . import secrets
|
from . import secrets
|
||||||
from .folders import list_objects, remove_object, sops_machines_folder
|
from .folders import list_objects, remove_object, sops_machines_folder
|
||||||
from .sops import read_key, write_key
|
from .sops import read_key, write_key
|
||||||
from .types import public_or_private_age_key_type, secret_name_type
|
from .types import public_or_private_age_key_type, secret_name_type
|
||||||
|
|
||||||
|
|
||||||
def add_machine(flake_name: FlakeName, name: str, key: str, force: bool) -> None:
|
def add_machine(flake_dir: Path, name: str, key: str, force: bool) -> None:
|
||||||
write_key(sops_machines_folder(flake_name) / name, key, force)
|
write_key(sops_machines_folder(flake_dir) / name, key, force)
|
||||||
|
|
||||||
|
|
||||||
def remove_machine(flake_name: FlakeName, name: str) -> None:
|
def remove_machine(flake_dir: Path, name: str) -> None:
|
||||||
remove_object(sops_machines_folder(flake_name), name)
|
remove_object(sops_machines_folder(flake_dir), name)
|
||||||
|
|
||||||
|
|
||||||
def get_machine(flake_name: FlakeName, name: str) -> str:
|
def get_machine(flake_dir: Path, name: str) -> str:
|
||||||
return read_key(sops_machines_folder(flake_name) / name)
|
return read_key(sops_machines_folder(flake_dir) / name)
|
||||||
|
|
||||||
|
|
||||||
def has_machine(flake_name: FlakeName, name: str) -> bool:
|
def has_machine(flake_dir: Path, name: str) -> bool:
|
||||||
return (sops_machines_folder(flake_name) / name / "key.json").exists()
|
return (sops_machines_folder(flake_dir) / name / "key.json").exists()
|
||||||
|
|
||||||
|
|
||||||
def list_machines(flake_name: FlakeName) -> list[str]:
|
def list_machines(flake_dir: Path) -> list[str]:
|
||||||
path = sops_machines_folder(flake_name)
|
path = sops_machines_folder(flake_dir)
|
||||||
|
|
||||||
def validate(name: str) -> bool:
|
def validate(name: str) -> bool:
|
||||||
return validate_hostname(name) and has_machine(flake_name, name)
|
return validate_hostname(name) and has_machine(flake_dir, name)
|
||||||
|
|
||||||
return list_objects(path, validate)
|
return list_objects(path, validate)
|
||||||
|
|
||||||
|
|
||||||
def add_secret(flake_name: FlakeName, machine: str, secret: str) -> None:
|
def add_secret(flake_dir: Path, machine: str, secret: str) -> None:
|
||||||
secrets.allow_member(
|
secrets.allow_member(
|
||||||
secrets.machines_folder(flake_name, secret),
|
secrets.machines_folder(flake_dir, secret),
|
||||||
sops_machines_folder(flake_name),
|
sops_machines_folder(flake_dir),
|
||||||
machine,
|
machine,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def remove_secret(flake_name: FlakeName, machine: str, secret: str) -> None:
|
def remove_secret(flake_dir: Path, machine: str, secret: str) -> None:
|
||||||
secrets.disallow_member(secrets.machines_folder(flake_name, secret), machine)
|
secrets.disallow_member(secrets.machines_folder(flake_dir, secret), machine)
|
||||||
|
|
||||||
|
|
||||||
def list_command(args: argparse.Namespace) -> None:
|
def list_command(args: argparse.Namespace) -> None:
|
||||||
lst = list_machines(args.flake)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
lst = list_machines(Path(args.flake))
|
||||||
if len(lst) > 0:
|
if len(lst) > 0:
|
||||||
print("\n".join(lst))
|
print("\n".join(lst))
|
||||||
|
|
||||||
|
|
||||||
def add_command(args: argparse.Namespace) -> None:
|
def add_command(args: argparse.Namespace) -> None:
|
||||||
add_machine(args.flake, args.machine, args.key, args.force)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
add_machine(Path(args.flake), args.machine, args.key, args.force)
|
||||||
|
|
||||||
|
|
||||||
def get_command(args: argparse.Namespace) -> None:
|
def get_command(args: argparse.Namespace) -> None:
|
||||||
print(get_machine(args.flake, args.machine))
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
print(get_machine(Path(args.flake), args.machine))
|
||||||
|
|
||||||
|
|
||||||
def remove_command(args: argparse.Namespace) -> None:
|
def remove_command(args: argparse.Namespace) -> None:
|
||||||
remove_machine(args.flake, args.machine)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
remove_machine(Path(args.flake), args.machine)
|
||||||
|
|
||||||
|
|
||||||
def add_secret_command(args: argparse.Namespace) -> None:
|
def add_secret_command(args: argparse.Namespace) -> None:
|
||||||
add_secret(args.flake, args.machine, args.secret)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
add_secret(Path(args.flake), args.machine, args.secret)
|
||||||
|
|
||||||
|
|
||||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||||
remove_secret(args.flake, args.machine, args.secret)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
remove_secret(Path(args.flake), args.machine, args.secret)
|
||||||
|
|
||||||
|
|
||||||
def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
@@ -80,11 +93,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
)
|
)
|
||||||
# Parser
|
# Parser
|
||||||
list_parser = subparser.add_parser("list", help="list machines")
|
list_parser = subparser.add_parser("list", help="list machines")
|
||||||
list_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
list_parser.set_defaults(func=list_command)
|
list_parser.set_defaults(func=list_command)
|
||||||
|
|
||||||
# Parser
|
# Parser
|
||||||
@@ -104,11 +112,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
help="public key or private key of the user",
|
help="public key or private key of the user",
|
||||||
type=public_or_private_age_key_type,
|
type=public_or_private_age_key_type,
|
||||||
)
|
)
|
||||||
add_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
add_parser.set_defaults(func=add_command)
|
add_parser.set_defaults(func=add_command)
|
||||||
|
|
||||||
# Parser
|
# Parser
|
||||||
@@ -116,11 +119,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
get_parser.add_argument(
|
get_parser.add_argument(
|
||||||
"machine", help="the name of the machine", type=machine_name_type
|
"machine", help="the name of the machine", type=machine_name_type
|
||||||
)
|
)
|
||||||
get_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
get_parser.set_defaults(func=get_command)
|
get_parser.set_defaults(func=get_command)
|
||||||
|
|
||||||
# Parser
|
# Parser
|
||||||
@@ -128,11 +126,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
remove_parser.add_argument(
|
remove_parser.add_argument(
|
||||||
"machine", help="the name of the machine", type=machine_name_type
|
"machine", help="the name of the machine", type=machine_name_type
|
||||||
)
|
)
|
||||||
remove_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
remove_parser.set_defaults(func=remove_command)
|
remove_parser.set_defaults(func=remove_command)
|
||||||
|
|
||||||
# Parser
|
# Parser
|
||||||
@@ -145,11 +138,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
add_secret_parser.add_argument(
|
add_secret_parser.add_argument(
|
||||||
"secret", help="the name of the secret", type=secret_name_type
|
"secret", help="the name of the secret", type=secret_name_type
|
||||||
)
|
)
|
||||||
add_secret_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
add_secret_parser.set_defaults(func=add_secret_command)
|
add_secret_parser.set_defaults(func=add_secret_command)
|
||||||
|
|
||||||
# Parser
|
# Parser
|
||||||
@@ -162,9 +150,4 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
remove_secret_parser.add_argument(
|
remove_secret_parser.add_argument(
|
||||||
"secret", help="the name of the secret", type=secret_name_type
|
"secret", help="the name of the secret", type=secret_name_type
|
||||||
)
|
)
|
||||||
remove_secret_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ from typing import IO
|
|||||||
|
|
||||||
from .. import tty
|
from .. import tty
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..types import FlakeName
|
|
||||||
from .folders import (
|
from .folders import (
|
||||||
list_objects,
|
list_objects,
|
||||||
sops_groups_folder,
|
sops_groups_folder,
|
||||||
@@ -54,36 +53,36 @@ def collect_keys_for_path(path: Path) -> set[str]:
|
|||||||
|
|
||||||
|
|
||||||
def encrypt_secret(
|
def encrypt_secret(
|
||||||
flake_name: FlakeName,
|
flake_dir: Path,
|
||||||
secret: Path,
|
secret: Path,
|
||||||
value: IO[str] | str | None,
|
value: IO[str] | str | None,
|
||||||
add_users: list[str] = [],
|
add_users: list[str] = [],
|
||||||
add_machines: list[str] = [],
|
add_machines: list[str] = [],
|
||||||
add_groups: list[str] = [],
|
add_groups: list[str] = [],
|
||||||
) -> None:
|
) -> None:
|
||||||
key = ensure_sops_key(flake_name)
|
key = ensure_sops_key(flake_dir)
|
||||||
keys = set([])
|
keys = set([])
|
||||||
|
|
||||||
for user in add_users:
|
for user in add_users:
|
||||||
allow_member(
|
allow_member(
|
||||||
users_folder(flake_name, secret.name),
|
users_folder(flake_dir, secret.name),
|
||||||
sops_users_folder(flake_name),
|
sops_users_folder(flake_dir),
|
||||||
user,
|
user,
|
||||||
False,
|
False,
|
||||||
)
|
)
|
||||||
|
|
||||||
for machine in add_machines:
|
for machine in add_machines:
|
||||||
allow_member(
|
allow_member(
|
||||||
machines_folder(flake_name, secret.name),
|
machines_folder(flake_dir, secret.name),
|
||||||
sops_machines_folder(flake_name),
|
sops_machines_folder(flake_dir),
|
||||||
machine,
|
machine,
|
||||||
False,
|
False,
|
||||||
)
|
)
|
||||||
|
|
||||||
for group in add_groups:
|
for group in add_groups:
|
||||||
allow_member(
|
allow_member(
|
||||||
groups_folder(flake_name, secret.name),
|
groups_folder(flake_dir, secret.name),
|
||||||
sops_groups_folder(flake_name),
|
sops_groups_folder(flake_dir),
|
||||||
group,
|
group,
|
||||||
False,
|
False,
|
||||||
)
|
)
|
||||||
@@ -93,8 +92,8 @@ def encrypt_secret(
|
|||||||
if key.pubkey not in keys:
|
if key.pubkey not in keys:
|
||||||
keys.add(key.pubkey)
|
keys.add(key.pubkey)
|
||||||
allow_member(
|
allow_member(
|
||||||
users_folder(flake_name, secret.name),
|
users_folder(flake_dir, secret.name),
|
||||||
sops_users_folder(flake_name),
|
sops_users_folder(flake_dir),
|
||||||
key.username,
|
key.username,
|
||||||
False,
|
False,
|
||||||
)
|
)
|
||||||
@@ -102,31 +101,31 @@ def encrypt_secret(
|
|||||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||||
|
|
||||||
|
|
||||||
def remove_secret(flake_name: FlakeName, secret: str) -> None:
|
def remove_secret(flake_dir: Path, secret: str) -> None:
|
||||||
path = sops_secrets_folder(flake_name) / secret
|
path = sops_secrets_folder(flake_dir) / secret
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
raise ClanError(f"Secret '{secret}' does not exist")
|
raise ClanError(f"Secret '{secret}' does not exist")
|
||||||
shutil.rmtree(path)
|
shutil.rmtree(path)
|
||||||
|
|
||||||
|
|
||||||
def remove_command(args: argparse.Namespace) -> None:
|
def remove_command(args: argparse.Namespace) -> None:
|
||||||
remove_secret(args.flake, args.secret)
|
remove_secret(Path(args.flake), args.secret)
|
||||||
|
|
||||||
|
|
||||||
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
||||||
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
|
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
|
||||||
|
|
||||||
|
|
||||||
def machines_folder(flake_name: FlakeName, group: str) -> Path:
|
def machines_folder(flake_dir: Path, group: str) -> Path:
|
||||||
return sops_secrets_folder(flake_name) / group / "machines"
|
return sops_secrets_folder(flake_dir) / group / "machines"
|
||||||
|
|
||||||
|
|
||||||
def users_folder(flake_name: FlakeName, group: str) -> Path:
|
def users_folder(flake_dir: Path, group: str) -> Path:
|
||||||
return sops_secrets_folder(flake_name) / group / "users"
|
return sops_secrets_folder(flake_dir) / group / "users"
|
||||||
|
|
||||||
|
|
||||||
def groups_folder(flake_name: FlakeName, group: str) -> Path:
|
def groups_folder(flake_dir: Path, group: str) -> Path:
|
||||||
return sops_secrets_folder(flake_name) / group / "groups"
|
return sops_secrets_folder(flake_dir) / group / "groups"
|
||||||
|
|
||||||
|
|
||||||
def list_directory(directory: Path) -> str:
|
def list_directory(directory: Path) -> str:
|
||||||
@@ -189,37 +188,35 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def has_secret(flake_name: FlakeName, secret: str) -> bool:
|
def has_secret(flake_dir: Path, secret: str) -> bool:
|
||||||
return (sops_secrets_folder(flake_name) / secret / "secret").exists()
|
return (sops_secrets_folder(flake_dir) / secret / "secret").exists()
|
||||||
|
|
||||||
|
|
||||||
def list_secrets(flake_name: FlakeName) -> list[str]:
|
def list_secrets(flake_dir: Path) -> list[str]:
|
||||||
path = sops_secrets_folder(flake_name)
|
path = sops_secrets_folder(flake_dir)
|
||||||
|
|
||||||
def validate(name: str) -> bool:
|
def validate(name: str) -> bool:
|
||||||
return VALID_SECRET_NAME.match(name) is not None and has_secret(
|
return VALID_SECRET_NAME.match(name) is not None and has_secret(flake_dir, name)
|
||||||
flake_name, name
|
|
||||||
)
|
|
||||||
|
|
||||||
return list_objects(path, validate)
|
return list_objects(path, validate)
|
||||||
|
|
||||||
|
|
||||||
def list_command(args: argparse.Namespace) -> None:
|
def list_command(args: argparse.Namespace) -> None:
|
||||||
lst = list_secrets(args.flake)
|
lst = list_secrets(Path(args.flake))
|
||||||
if len(lst) > 0:
|
if len(lst) > 0:
|
||||||
print("\n".join(lst))
|
print("\n".join(lst))
|
||||||
|
|
||||||
|
|
||||||
def decrypt_secret(flake_name: FlakeName, secret: str) -> str:
|
def decrypt_secret(flake_dir: Path, secret: str) -> str:
|
||||||
ensure_sops_key(flake_name)
|
ensure_sops_key(flake_dir)
|
||||||
secret_path = sops_secrets_folder(flake_name) / secret / "secret"
|
secret_path = sops_secrets_folder(flake_dir) / secret / "secret"
|
||||||
if not secret_path.exists():
|
if not secret_path.exists():
|
||||||
raise ClanError(f"Secret '{secret}' does not exist")
|
raise ClanError(f"Secret '{secret}' does not exist")
|
||||||
return decrypt_file(secret_path)
|
return decrypt_file(secret_path)
|
||||||
|
|
||||||
|
|
||||||
def get_command(args: argparse.Namespace) -> None:
|
def get_command(args: argparse.Namespace) -> None:
|
||||||
print(decrypt_secret(args.flake, args.secret), end="")
|
print(decrypt_secret(Path(args.flake), args.secret), end="")
|
||||||
|
|
||||||
|
|
||||||
def set_command(args: argparse.Namespace) -> None:
|
def set_command(args: argparse.Namespace) -> None:
|
||||||
@@ -232,8 +229,8 @@ def set_command(args: argparse.Namespace) -> None:
|
|||||||
elif tty.is_interactive():
|
elif tty.is_interactive():
|
||||||
secret_value = getpass.getpass(prompt="Paste your secret: ")
|
secret_value = getpass.getpass(prompt="Paste your secret: ")
|
||||||
encrypt_secret(
|
encrypt_secret(
|
||||||
args.flake,
|
Path(args.flake),
|
||||||
sops_secrets_folder(args.flake) / args.secret,
|
sops_secrets_folder(Path(args.flake)) / args.secret,
|
||||||
secret_value,
|
secret_value,
|
||||||
args.user,
|
args.user,
|
||||||
args.machine,
|
args.machine,
|
||||||
@@ -242,8 +239,8 @@ def set_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def rename_command(args: argparse.Namespace) -> None:
|
def rename_command(args: argparse.Namespace) -> None:
|
||||||
old_path = sops_secrets_folder(args.flake) / args.secret
|
old_path = sops_secrets_folder(Path(args.flake)) / args.secret
|
||||||
new_path = sops_secrets_folder(args.flake) / args.new_name
|
new_path = sops_secrets_folder(Path(args.flake)) / args.new_name
|
||||||
if not old_path.exists():
|
if not old_path.exists():
|
||||||
raise ClanError(f"Secret '{args.secret}' does not exist")
|
raise ClanError(f"Secret '{args.secret}' does not exist")
|
||||||
if new_path.exists():
|
if new_path.exists():
|
||||||
@@ -253,20 +250,10 @@ def rename_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||||
parser_list = subparser.add_parser("list", help="list secrets")
|
parser_list = subparser.add_parser("list", help="list secrets")
|
||||||
parser_list.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser_list.set_defaults(func=list_command)
|
parser_list.set_defaults(func=list_command)
|
||||||
|
|
||||||
parser_get = subparser.add_parser("get", help="get a secret")
|
parser_get = subparser.add_parser("get", help="get a secret")
|
||||||
add_secret_argument(parser_get)
|
add_secret_argument(parser_get)
|
||||||
parser_get.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser_get.set_defaults(func=get_command)
|
parser_get.set_defaults(func=get_command)
|
||||||
|
|
||||||
parser_set = subparser.add_parser("set", help="set a secret")
|
parser_set = subparser.add_parser("set", help="set a secret")
|
||||||
@@ -299,28 +286,13 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
|||||||
default=False,
|
default=False,
|
||||||
help="edit the secret with $EDITOR instead of pasting it",
|
help="edit the secret with $EDITOR instead of pasting it",
|
||||||
)
|
)
|
||||||
parser_set.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser_set.set_defaults(func=set_command)
|
parser_set.set_defaults(func=set_command)
|
||||||
|
|
||||||
parser_rename = subparser.add_parser("rename", help="rename a secret")
|
parser_rename = subparser.add_parser("rename", help="rename a secret")
|
||||||
add_secret_argument(parser_rename)
|
add_secret_argument(parser_rename)
|
||||||
parser_rename.add_argument("new_name", type=str, help="the new name of the secret")
|
parser_rename.add_argument("new_name", type=str, help="the new name of the secret")
|
||||||
parser_rename.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser_rename.set_defaults(func=rename_command)
|
parser_rename.set_defaults(func=rename_command)
|
||||||
|
|
||||||
parser_remove = subparser.add_parser("remove", help="remove a secret")
|
parser_remove = subparser.add_parser("remove", help="remove a secret")
|
||||||
add_secret_argument(parser_remove)
|
add_secret_argument(parser_remove)
|
||||||
parser_remove.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser_remove.set_defaults(func=remove_command)
|
parser_remove.set_defaults(func=remove_command)
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ from typing import IO, Iterator
|
|||||||
from ..dirs import user_config_dir
|
from ..dirs import user_config_dir
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
from ..types import FlakeName
|
|
||||||
from .folders import sops_machines_folder, sops_users_folder
|
from .folders import sops_machines_folder, sops_users_folder
|
||||||
|
|
||||||
|
|
||||||
@@ -52,7 +51,7 @@ def generate_private_key() -> tuple[str, str]:
|
|||||||
raise ClanError("Failed to generate private sops key") from e
|
raise ClanError("Failed to generate private sops key") from e
|
||||||
|
|
||||||
|
|
||||||
def get_user_name(flake_name: FlakeName, user: str) -> str:
|
def get_user_name(flake_dir: Path, user: str) -> str:
|
||||||
"""Ask the user for their name until a unique one is provided."""
|
"""Ask the user for their name until a unique one is provided."""
|
||||||
while True:
|
while True:
|
||||||
name = input(
|
name = input(
|
||||||
@@ -60,14 +59,14 @@ def get_user_name(flake_name: FlakeName, user: str) -> str:
|
|||||||
)
|
)
|
||||||
if name:
|
if name:
|
||||||
user = name
|
user = name
|
||||||
if not (sops_users_folder(flake_name) / user).exists():
|
if not (flake_dir / user).exists():
|
||||||
return user
|
return user
|
||||||
print(f"{sops_users_folder(flake_name) / user} already exists")
|
print(f"{flake_dir / user} already exists")
|
||||||
|
|
||||||
|
|
||||||
def ensure_user_or_machine(flake_name: FlakeName, pub_key: str) -> SopsKey:
|
def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
|
||||||
key = SopsKey(pub_key, username="")
|
key = SopsKey(pub_key, username="")
|
||||||
folders = [sops_users_folder(flake_name), sops_machines_folder(flake_name)]
|
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
|
||||||
for folder in folders:
|
for folder in folders:
|
||||||
if folder.exists():
|
if folder.exists():
|
||||||
for user in folder.iterdir():
|
for user in folder.iterdir():
|
||||||
@@ -91,13 +90,13 @@ def default_sops_key_path() -> Path:
|
|||||||
return user_config_dir() / "sops" / "age" / "keys.txt"
|
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||||
|
|
||||||
|
|
||||||
def ensure_sops_key(flake_name: FlakeName) -> SopsKey:
|
def ensure_sops_key(flake_dir: Path) -> SopsKey:
|
||||||
key = os.environ.get("SOPS_AGE_KEY")
|
key = os.environ.get("SOPS_AGE_KEY")
|
||||||
if key:
|
if key:
|
||||||
return ensure_user_or_machine(flake_name, get_public_key(key))
|
return ensure_user_or_machine(flake_dir, get_public_key(key))
|
||||||
path = default_sops_key_path()
|
path = default_sops_key_path()
|
||||||
if path.exists():
|
if path.exists():
|
||||||
return ensure_user_or_machine(flake_name, get_public_key(path.read_text()))
|
return ensure_user_or_machine(flake_dir, get_public_key(path.read_text()))
|
||||||
else:
|
else:
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
"No sops key found. Please generate one with 'clan secrets key generate'."
|
"No sops key found. Please generate one with 'clan secrets key generate'."
|
||||||
|
|||||||
@@ -10,9 +10,7 @@ from typing import Any
|
|||||||
|
|
||||||
from clan_cli.nix import nix_shell
|
from clan_cli.nix import nix_shell
|
||||||
|
|
||||||
from ..dirs import specific_flake_dir
|
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..types import FlakeName
|
|
||||||
from .folders import sops_secrets_folder
|
from .folders import sops_secrets_folder
|
||||||
from .machines import add_machine, has_machine
|
from .machines import add_machine, has_machine
|
||||||
from .secrets import decrypt_secret, encrypt_secret, has_secret
|
from .secrets import decrypt_secret, encrypt_secret, has_secret
|
||||||
@@ -21,29 +19,29 @@ from .sops import generate_private_key
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def generate_host_key(flake_name: FlakeName, machine_name: str) -> None:
|
def generate_host_key(flake_dir: Path, machine_name: str) -> None:
|
||||||
if has_machine(flake_name, machine_name):
|
if has_machine(flake_dir, machine_name):
|
||||||
return
|
return
|
||||||
priv_key, pub_key = generate_private_key()
|
priv_key, pub_key = generate_private_key()
|
||||||
encrypt_secret(
|
encrypt_secret(
|
||||||
flake_name,
|
flake_dir,
|
||||||
sops_secrets_folder(flake_name) / f"{machine_name}-age.key",
|
sops_secrets_folder(flake_dir) / f"{machine_name}-age.key",
|
||||||
priv_key,
|
priv_key,
|
||||||
)
|
)
|
||||||
add_machine(flake_name, machine_name, pub_key, False)
|
add_machine(flake_dir, machine_name, pub_key, False)
|
||||||
|
|
||||||
|
|
||||||
def generate_secrets_group(
|
def generate_secrets_group(
|
||||||
flake_name: FlakeName,
|
flake_dir: Path,
|
||||||
secret_group: str,
|
secret_group: str,
|
||||||
machine_name: str,
|
machine_name: str,
|
||||||
tempdir: Path,
|
tempdir: Path,
|
||||||
secret_options: dict[str, Any],
|
secret_options: dict[str, Any],
|
||||||
) -> None:
|
) -> None:
|
||||||
clan_dir = specific_flake_dir(flake_name)
|
clan_dir = flake_dir
|
||||||
secrets = secret_options["secrets"]
|
secrets = secret_options["secrets"]
|
||||||
needs_regeneration = any(
|
needs_regeneration = any(
|
||||||
not has_secret(flake_name, f"{machine_name}-{secret['name']}")
|
not has_secret(flake_dir, f"{machine_name}-{secret['name']}")
|
||||||
for secret in secrets.values()
|
for secret in secrets.values()
|
||||||
)
|
)
|
||||||
generator = secret_options["generator"]
|
generator = secret_options["generator"]
|
||||||
@@ -74,8 +72,8 @@ export secrets={shlex.quote(str(secrets_dir))}
|
|||||||
msg += text
|
msg += text
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
encrypt_secret(
|
encrypt_secret(
|
||||||
flake_name,
|
flake_dir,
|
||||||
sops_secrets_folder(flake_name) / f"{machine_name}-{secret['name']}",
|
sops_secrets_folder(flake_dir) / f"{machine_name}-{secret['name']}",
|
||||||
secret_file.read_text(),
|
secret_file.read_text(),
|
||||||
add_machines=[machine_name],
|
add_machines=[machine_name],
|
||||||
)
|
)
|
||||||
@@ -92,21 +90,19 @@ export secrets={shlex.quote(str(secrets_dir))}
|
|||||||
|
|
||||||
# this is called by the sops.nix clan core module
|
# this is called by the sops.nix clan core module
|
||||||
def generate_secrets_from_nix(
|
def generate_secrets_from_nix(
|
||||||
flake_name: FlakeName,
|
|
||||||
machine_name: str,
|
machine_name: str,
|
||||||
secret_submodules: dict[str, Any],
|
secret_submodules: dict[str, Any],
|
||||||
) -> None:
|
) -> None:
|
||||||
generate_host_key(flake_name, machine_name)
|
flake_dir = Path(os.environ["CLAN_DIR"])
|
||||||
|
generate_host_key(flake_dir, machine_name)
|
||||||
errors = {}
|
errors = {}
|
||||||
log.debug(
|
log.debug("Generating secrets for machine %s and flake %s", machine_name, flake_dir)
|
||||||
"Generating secrets for machine %s and flake %s", machine_name, flake_name
|
|
||||||
)
|
|
||||||
with TemporaryDirectory() as d:
|
with TemporaryDirectory() as d:
|
||||||
# if any of the secrets are missing, we regenerate all connected facts/secrets
|
# if any of the secrets are missing, we regenerate all connected facts/secrets
|
||||||
for secret_group, secret_options in secret_submodules.items():
|
for secret_group, secret_options in secret_submodules.items():
|
||||||
try:
|
try:
|
||||||
generate_secrets_group(
|
generate_secrets_group(
|
||||||
flake_name, secret_group, machine_name, Path(d), secret_options
|
flake_dir, secret_group, machine_name, Path(d), secret_options
|
||||||
)
|
)
|
||||||
except ClanError as e:
|
except ClanError as e:
|
||||||
errors[secret_group] = e
|
errors[secret_group] = e
|
||||||
@@ -119,16 +115,16 @@ def generate_secrets_from_nix(
|
|||||||
|
|
||||||
# this is called by the sops.nix clan core module
|
# this is called by the sops.nix clan core module
|
||||||
def upload_age_key_from_nix(
|
def upload_age_key_from_nix(
|
||||||
flake_name: FlakeName,
|
|
||||||
machine_name: str,
|
machine_name: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
log.debug("Uploading secrets for machine %s and flake %s", machine_name, flake_name)
|
flake_dir = Path(os.environ["CLAN_DIR"])
|
||||||
|
log.debug("Uploading secrets for machine %s and flake %s", machine_name, flake_dir)
|
||||||
secret_name = f"{machine_name}-age.key"
|
secret_name = f"{machine_name}-age.key"
|
||||||
if not has_secret(
|
if not has_secret(
|
||||||
flake_name, secret_name
|
flake_dir, secret_name
|
||||||
): # skip uploading the secret, not managed by us
|
): # skip uploading the secret, not managed by us
|
||||||
return
|
return
|
||||||
secret = decrypt_secret(flake_name, secret_name)
|
secret = decrypt_secret(flake_dir, secret_name)
|
||||||
|
|
||||||
secrets_dir = Path(os.environ["SECRETS_DIR"])
|
secrets_dir = Path(os.environ["SECRETS_DIR"])
|
||||||
(secrets_dir / "key.txt").write_text(secret)
|
(secrets_dir / "key.txt").write_text(secret)
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import subprocess
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
from ..dirs import specific_flake_dir
|
|
||||||
from ..machines.machines import Machine
|
from ..machines.machines import Machine
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
|
|
||||||
@@ -38,7 +37,7 @@ def upload_secrets(machine: Machine) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def upload_command(args: argparse.Namespace) -> None:
|
def upload_command(args: argparse.Namespace) -> None:
|
||||||
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
|
machine = Machine(name=args.machine, flake_dir=args.flake)
|
||||||
upload_secrets(machine)
|
upload_secrets(machine)
|
||||||
|
|
||||||
|
|
||||||
@@ -47,9 +46,4 @@ def register_upload_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
"machine",
|
"machine",
|
||||||
help="The machine to upload secrets to",
|
help="The machine to upload secrets to",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
parser.set_defaults(func=upload_command)
|
parser.set_defaults(func=upload_command)
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from ..types import FlakeName
|
from ..errors import ClanError
|
||||||
from . import secrets
|
from . import secrets
|
||||||
from .folders import list_objects, remove_object, sops_users_folder
|
from .folders import list_objects, remove_object, sops_users_folder
|
||||||
from .sops import read_key, write_key
|
from .sops import read_key, write_key
|
||||||
@@ -12,20 +13,20 @@ from .types import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_user(flake_name: FlakeName, name: str, key: str, force: bool) -> None:
|
def add_user(flake_dir: Path, name: str, key: str, force: bool) -> None:
|
||||||
write_key(sops_users_folder(flake_name) / name, key, force)
|
write_key(sops_users_folder(flake_dir) / name, key, force)
|
||||||
|
|
||||||
|
|
||||||
def remove_user(flake_name: FlakeName, name: str) -> None:
|
def remove_user(flake_dir: Path, name: str) -> None:
|
||||||
remove_object(sops_users_folder(flake_name), name)
|
remove_object(sops_users_folder(flake_dir), name)
|
||||||
|
|
||||||
|
|
||||||
def get_user(flake_name: FlakeName, name: str) -> str:
|
def get_user(flake_dir: Path, name: str) -> str:
|
||||||
return read_key(sops_users_folder(flake_name) / name)
|
return read_key(sops_users_folder(flake_dir) / name)
|
||||||
|
|
||||||
|
|
||||||
def list_users(flake_name: FlakeName) -> list[str]:
|
def list_users(flake_dir: Path) -> list[str]:
|
||||||
path = sops_users_folder(flake_name)
|
path = sops_users_folder(flake_dir)
|
||||||
|
|
||||||
def validate(name: str) -> bool:
|
def validate(name: str) -> bool:
|
||||||
return (
|
return (
|
||||||
@@ -36,40 +37,52 @@ def list_users(flake_name: FlakeName) -> list[str]:
|
|||||||
return list_objects(path, validate)
|
return list_objects(path, validate)
|
||||||
|
|
||||||
|
|
||||||
def add_secret(flake_name: FlakeName, user: str, secret: str) -> None:
|
def add_secret(flake_dir: Path, user: str, secret: str) -> None:
|
||||||
secrets.allow_member(
|
secrets.allow_member(
|
||||||
secrets.users_folder(flake_name, secret), sops_users_folder(flake_name), user
|
secrets.users_folder(flake_dir, secret), sops_users_folder(flake_dir), user
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def remove_secret(flake_name: FlakeName, user: str, secret: str) -> None:
|
def remove_secret(flake_dir: Path, user: str, secret: str) -> None:
|
||||||
secrets.disallow_member(secrets.users_folder(flake_name, secret), user)
|
secrets.disallow_member(secrets.users_folder(flake_dir, secret), user)
|
||||||
|
|
||||||
|
|
||||||
def list_command(args: argparse.Namespace) -> None:
|
def list_command(args: argparse.Namespace) -> None:
|
||||||
lst = list_users(args.flake)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
lst = list_users(Path(args.flake))
|
||||||
if len(lst) > 0:
|
if len(lst) > 0:
|
||||||
print("\n".join(lst))
|
print("\n".join(lst))
|
||||||
|
|
||||||
|
|
||||||
def add_command(args: argparse.Namespace) -> None:
|
def add_command(args: argparse.Namespace) -> None:
|
||||||
add_user(args.flake, args.user, args.key, args.force)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
add_user(Path(args.flake), args.user, args.key, args.force)
|
||||||
|
|
||||||
|
|
||||||
def get_command(args: argparse.Namespace) -> None:
|
def get_command(args: argparse.Namespace) -> None:
|
||||||
print(get_user(args.flake, args.user))
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
print(get_user(Path(args.flake), args.user))
|
||||||
|
|
||||||
|
|
||||||
def remove_command(args: argparse.Namespace) -> None:
|
def remove_command(args: argparse.Namespace) -> None:
|
||||||
remove_user(args.flake, args.user)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
remove_user(Path(args.flake), args.user)
|
||||||
|
|
||||||
|
|
||||||
def add_secret_command(args: argparse.Namespace) -> None:
|
def add_secret_command(args: argparse.Namespace) -> None:
|
||||||
add_secret(args.flake, args.user, args.secret)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
add_secret(Path(args.flake), args.user, args.secret)
|
||||||
|
|
||||||
|
|
||||||
def remove_secret_command(args: argparse.Namespace) -> None:
|
def remove_secret_command(args: argparse.Namespace) -> None:
|
||||||
remove_secret(args.flake, args.user, args.secret)
|
if args.flake is None:
|
||||||
|
raise ClanError("Could not find clan flake toplevel directory")
|
||||||
|
remove_secret(Path(args.flake), args.user, args.secret)
|
||||||
|
|
||||||
|
|
||||||
def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
@@ -80,11 +93,6 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
required=True,
|
required=True,
|
||||||
)
|
)
|
||||||
list_parser = subparser.add_parser("list", help="list users")
|
list_parser = subparser.add_parser("list", help="list users")
|
||||||
list_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
list_parser.set_defaults(func=list_command)
|
list_parser.set_defaults(func=list_command)
|
||||||
|
|
||||||
add_parser = subparser.add_parser("add", help="add a user")
|
add_parser = subparser.add_parser("add", help="add a user")
|
||||||
@@ -98,29 +106,14 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
type=public_or_private_age_key_type,
|
type=public_or_private_age_key_type,
|
||||||
)
|
)
|
||||||
add_parser.set_defaults(func=add_command)
|
add_parser.set_defaults(func=add_command)
|
||||||
add_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
|
|
||||||
get_parser = subparser.add_parser("get", help="get a user public key")
|
get_parser = subparser.add_parser("get", help="get a user public key")
|
||||||
get_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
get_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
||||||
get_parser.set_defaults(func=get_command)
|
get_parser.set_defaults(func=get_command)
|
||||||
get_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
|
|
||||||
remove_parser = subparser.add_parser("remove", help="remove a user")
|
remove_parser = subparser.add_parser("remove", help="remove a user")
|
||||||
remove_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
remove_parser.add_argument("user", help="the name of the user", type=user_name_type)
|
||||||
remove_parser.set_defaults(func=remove_command)
|
remove_parser.set_defaults(func=remove_command)
|
||||||
remove_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
|
|
||||||
add_secret_parser = subparser.add_parser(
|
add_secret_parser = subparser.add_parser(
|
||||||
"add-secret", help="allow a user to access a secret"
|
"add-secret", help="allow a user to access a secret"
|
||||||
@@ -131,11 +124,6 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
add_secret_parser.add_argument(
|
add_secret_parser.add_argument(
|
||||||
"secret", help="the name of the secret", type=secret_name_type
|
"secret", help="the name of the secret", type=secret_name_type
|
||||||
)
|
)
|
||||||
add_secret_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
add_secret_parser.set_defaults(func=add_secret_command)
|
add_secret_parser.set_defaults(func=add_secret_command)
|
||||||
|
|
||||||
remove_secret_parser = subparser.add_parser(
|
remove_secret_parser = subparser.add_parser(
|
||||||
@@ -147,9 +135,4 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
remove_secret_parser.add_argument(
|
remove_secret_parser.add_argument(
|
||||||
"secret", help="the name of the secret", type=secret_name_type
|
"secret", help="the name of the secret", type=secret_name_type
|
||||||
)
|
)
|
||||||
remove_secret_parser.add_argument(
|
|
||||||
"flake",
|
|
||||||
type=str,
|
|
||||||
help="name of the flake to create machine for",
|
|
||||||
)
|
|
||||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||||
|
|||||||
@@ -101,7 +101,7 @@ class BuildVmTask(BaseTask):
|
|||||||
|
|
||||||
cmd = next(cmds)
|
cmd = next(cmds)
|
||||||
cmd.run(
|
cmd.run(
|
||||||
[vm_config["uploadSecrets"], clan_name],
|
[vm_config["uploadSecrets"]],
|
||||||
env=env,
|
env=env,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import shlex
|
|||||||
|
|
||||||
from clan_cli import create_parser
|
from clan_cli import create_parser
|
||||||
from clan_cli.custom_logger import get_caller
|
from clan_cli.custom_logger import get_caller
|
||||||
|
from clan_cli.dirs import get_clan_flake_toplevel
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -17,6 +18,8 @@ class Cli:
|
|||||||
log.debug(f"$ {cmd}")
|
log.debug(f"$ {cmd}")
|
||||||
log.debug(f"Caller {get_caller()}")
|
log.debug(f"Caller {get_caller()}")
|
||||||
parsed = self.parser.parse_args(args)
|
parsed = self.parser.parse_args(args)
|
||||||
|
if parsed.flake is None:
|
||||||
|
parsed.flake = get_clan_flake_toplevel()
|
||||||
if hasattr(parsed, "func"):
|
if hasattr(parsed, "func"):
|
||||||
parsed.func(parsed)
|
parsed.func(parsed)
|
||||||
return parsed
|
return parsed
|
||||||
|
|||||||
@@ -20,16 +20,66 @@ def test_import_sops(
|
|||||||
|
|
||||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey)
|
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey)
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"machines",
|
||||||
|
"add",
|
||||||
|
"machine1",
|
||||||
|
age_keys[0].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"add",
|
||||||
|
"user1",
|
||||||
|
age_keys[1].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"add",
|
||||||
|
"user2",
|
||||||
|
age_keys[2].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"group1",
|
||||||
|
"user1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"group1",
|
||||||
|
"user2",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
|
|
||||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
|
|
||||||
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
|
|
||||||
cli.run(["secrets", "groups", "add-user", "group1", "user2", test_flake.name])
|
|
||||||
|
|
||||||
# To edit:
|
# To edit:
|
||||||
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
|
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
|
||||||
cmd = [
|
cmd = [
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
"secrets",
|
"secrets",
|
||||||
"import-sops",
|
"import-sops",
|
||||||
"--group",
|
"--group",
|
||||||
@@ -37,15 +87,14 @@ def test_import_sops(
|
|||||||
"--machine",
|
"--machine",
|
||||||
"machine1",
|
"machine1",
|
||||||
str(test_root.joinpath("data", "secrets.yaml")),
|
str(test_root.joinpath("data", "secrets.yaml")),
|
||||||
test_flake.name,
|
|
||||||
]
|
]
|
||||||
|
|
||||||
cli.run(cmd)
|
cli.run(cmd)
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "users", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "users", "list"])
|
||||||
users = sorted(capsys.readouterr().out.rstrip().split())
|
users = sorted(capsys.readouterr().out.rstrip().split())
|
||||||
assert users == ["user1", "user2"]
|
assert users == ["user1", "user2"]
|
||||||
|
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "secret-key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "secret-key"])
|
||||||
assert capsys.readouterr().out == "secret-value"
|
assert capsys.readouterr().out == "secret-value"
|
||||||
|
|||||||
@@ -24,41 +24,61 @@ def _test_identities(
|
|||||||
cli = Cli()
|
cli = Cli()
|
||||||
sops_folder = test_flake.path / "sops"
|
sops_folder = test_flake.path / "sops"
|
||||||
|
|
||||||
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
what,
|
||||||
|
"add",
|
||||||
|
"foo",
|
||||||
|
age_keys[0].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
assert (sops_folder / what / "foo" / "key.json").exists()
|
assert (sops_folder / what / "foo" / "key.json").exists()
|
||||||
with pytest.raises(ClanError):
|
with pytest.raises(ClanError):
|
||||||
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
|
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey])
|
||||||
|
|
||||||
cli.run(
|
cli.run(
|
||||||
[
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
"secrets",
|
"secrets",
|
||||||
what,
|
what,
|
||||||
"add",
|
"add",
|
||||||
"-f",
|
"-f",
|
||||||
"foo",
|
"foo",
|
||||||
age_keys[0].privkey,
|
age_keys[0].privkey,
|
||||||
test_flake.name,
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", what, "get", "foo", test_flake.name])
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
what,
|
||||||
|
"get",
|
||||||
|
"foo",
|
||||||
|
]
|
||||||
|
)
|
||||||
out = capsys.readouterr() # empty the buffer
|
out = capsys.readouterr() # empty the buffer
|
||||||
assert age_keys[0].pubkey in out.out
|
assert age_keys[0].pubkey in out.out
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", what, "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", what, "list"])
|
||||||
out = capsys.readouterr() # empty the buffer
|
out = capsys.readouterr() # empty the buffer
|
||||||
assert "foo" in out.out
|
assert "foo" in out.out
|
||||||
|
|
||||||
cli.run(["secrets", what, "remove", "foo", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", what, "remove", "foo"])
|
||||||
assert not (sops_folder / what / "foo" / "key.json").exists()
|
assert not (sops_folder / what / "foo" / "key.json").exists()
|
||||||
|
|
||||||
with pytest.raises(ClanError): # already removed
|
with pytest.raises(ClanError): # already removed
|
||||||
cli.run(["secrets", what, "remove", "foo", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", what, "remove", "foo"])
|
||||||
|
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", what, "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", what, "list"])
|
||||||
out = capsys.readouterr()
|
out = capsys.readouterr()
|
||||||
assert "foo" not in out.out
|
assert "foo" not in out.out
|
||||||
|
|
||||||
@@ -80,35 +100,119 @@ def test_groups(
|
|||||||
) -> None:
|
) -> None:
|
||||||
cli = Cli()
|
cli = Cli()
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "groups", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "groups", "list"])
|
||||||
assert capsys.readouterr().out == ""
|
assert capsys.readouterr().out == ""
|
||||||
|
|
||||||
with pytest.raises(ClanError): # machine does not exist yet
|
with pytest.raises(ClanError): # machine does not exist yet
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-machine",
|
||||||
|
"group1",
|
||||||
|
"machine1",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
with pytest.raises(ClanError): # user does not exist yet
|
with pytest.raises(ClanError): # user does not exist yet
|
||||||
cli.run(["secrets", "groups", "add-user", "groupb1", "user1", test_flake.name])
|
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"groupb1",
|
||||||
|
"user1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"machines",
|
||||||
|
"add",
|
||||||
|
"machine1",
|
||||||
|
age_keys[0].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-machine",
|
||||||
|
"group1",
|
||||||
|
"machine1",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
|
|
||||||
|
|
||||||
# Should this fail?
|
# Should this fail?
|
||||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-machine",
|
||||||
|
"group1",
|
||||||
|
"machine1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
|
cli.run(
|
||||||
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"add",
|
||||||
|
"user1",
|
||||||
|
age_keys[0].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"group1",
|
||||||
|
"user1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "groups", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "groups", "list"])
|
||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
assert "user1" in out
|
assert "user1" in out
|
||||||
assert "machine1" in out
|
assert "machine1" in out
|
||||||
|
|
||||||
cli.run(["secrets", "groups", "remove-user", "group1", "user1", test_flake.name])
|
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "groups", "remove-machine", "group1", "machine1", test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"remove-user",
|
||||||
|
"group1",
|
||||||
|
"user1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"remove-machine",
|
||||||
|
"group1",
|
||||||
|
"machine1",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
groups = os.listdir(test_flake.path / "sops" / "groups")
|
groups = os.listdir(test_flake.path / "sops" / "groups")
|
||||||
assert len(groups) == 0
|
assert len(groups) == 0
|
||||||
@@ -134,107 +238,249 @@ def test_secrets(
|
|||||||
) -> None:
|
) -> None:
|
||||||
cli = Cli()
|
cli = Cli()
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "list"])
|
||||||
assert capsys.readouterr().out == ""
|
assert capsys.readouterr().out == ""
|
||||||
|
|
||||||
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
|
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
|
||||||
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
|
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
|
||||||
cli.run(["secrets", "key", "generate"])
|
cli.run(["--flake", str(test_flake.path), "secrets", "key", "generate"])
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "key", "show"])
|
cli.run(["--flake", str(test_flake.path), "secrets", "key", "show"])
|
||||||
key = capsys.readouterr().out
|
key = capsys.readouterr().out
|
||||||
assert key.startswith("age1")
|
assert key.startswith("age1")
|
||||||
cli.run(["secrets", "users", "add", "testuser", key, test_flake.name])
|
cli.run(
|
||||||
|
["--flake", str(test_flake.path), "secrets", "users", "add", "testuser", key]
|
||||||
|
)
|
||||||
|
|
||||||
with pytest.raises(ClanError): # does not exist yet
|
with pytest.raises(ClanError): # does not exist yet
|
||||||
cli.run(["secrets", "get", "nonexisting", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "nonexisting"])
|
||||||
cli.run(["secrets", "set", "initialkey", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "set", "initialkey"])
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "initialkey", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "initialkey"])
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "users", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "users", "list"])
|
||||||
users = capsys.readouterr().out.rstrip().split("\n")
|
users = capsys.readouterr().out.rstrip().split("\n")
|
||||||
assert len(users) == 1, f"users: {users}"
|
assert len(users) == 1, f"users: {users}"
|
||||||
owner = users[0]
|
owner = users[0]
|
||||||
|
|
||||||
monkeypatch.setenv("EDITOR", "cat")
|
monkeypatch.setenv("EDITOR", "cat")
|
||||||
cli.run(["secrets", "set", "--edit", "initialkey", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "set", "--edit", "initialkey"])
|
||||||
monkeypatch.delenv("EDITOR")
|
monkeypatch.delenv("EDITOR")
|
||||||
|
|
||||||
cli.run(["secrets", "rename", "initialkey", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "rename", "initialkey", "key"])
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "list"])
|
||||||
assert capsys.readouterr().out == "key\n"
|
assert capsys.readouterr().out == "key\n"
|
||||||
|
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"machines",
|
||||||
|
"add",
|
||||||
|
"machine1",
|
||||||
|
age_keys[0].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"machines",
|
||||||
|
"add-secret",
|
||||||
|
"machine1",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
cli.run(["secrets", "machines", "add-secret", "machine1", "key", test_flake.name])
|
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "machines", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "machines", "list"])
|
||||||
assert capsys.readouterr().out == "machine1\n"
|
assert capsys.readouterr().out == "machine1\n"
|
||||||
|
|
||||||
with use_key(age_keys[0].privkey, monkeypatch):
|
with use_key(age_keys[0].privkey, monkeypatch):
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
|
||||||
|
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
|
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "machines", "remove-secret", "machine1", "key", test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"machines",
|
||||||
|
"remove-secret",
|
||||||
|
"machine1",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
|
cli.run(
|
||||||
cli.run(["secrets", "users", "add-secret", "user1", "key", test_flake.name])
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"add",
|
||||||
|
"user1",
|
||||||
|
age_keys[1].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"add-secret",
|
||||||
|
"user1",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
|
)
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
with use_key(age_keys[1].privkey, monkeypatch):
|
with use_key(age_keys[1].privkey, monkeypatch):
|
||||||
cli.run(["secrets", "get", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
cli.run(["secrets", "users", "remove-secret", "user1", "key", test_flake.name])
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"remove-secret",
|
||||||
|
"user1",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
with pytest.raises(ClanError): # does not exist yet
|
with pytest.raises(ClanError): # does not exist yet
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-secret",
|
||||||
|
"admin-group",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"admin-group",
|
||||||
|
"user1",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"admin-group",
|
||||||
|
owner,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-secret",
|
||||||
|
"admin-group",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user1", test_flake.name])
|
|
||||||
cli.run(["secrets", "groups", "add-user", "admin-group", owner, test_flake.name])
|
|
||||||
cli.run(["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name])
|
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "set", "--group", "admin-group", "key2", test_flake.name])
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"set",
|
||||||
|
"--group",
|
||||||
|
"admin-group",
|
||||||
|
"key2",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
with use_key(age_keys[1].privkey, monkeypatch):
|
with use_key(age_keys[1].privkey, monkeypatch):
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
|
|
||||||
# extend group will update secrets
|
# extend group will update secrets
|
||||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
|
cli.run(
|
||||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user2", test_flake.name])
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"users",
|
||||||
|
"add",
|
||||||
|
"user2",
|
||||||
|
age_keys[2].pubkey,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
cli.run(
|
||||||
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"add-user",
|
||||||
|
"admin-group",
|
||||||
|
"user2",
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
with use_key(age_keys[2].privkey, monkeypatch): # user2
|
with use_key(age_keys[2].privkey, monkeypatch): # user2
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
|
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "groups", "remove-user", "admin-group", "user2", test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"remove-user",
|
||||||
|
"admin-group",
|
||||||
|
"user2",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
|
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
|
||||||
# user2 is not in the group anymore
|
# user2 is not in the group anymore
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "get", "key"])
|
||||||
print(capsys.readouterr().out)
|
print(capsys.readouterr().out)
|
||||||
|
|
||||||
cli.run(
|
cli.run(
|
||||||
["secrets", "groups", "remove-secret", "admin-group", "key", test_flake.name]
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake.path),
|
||||||
|
"secrets",
|
||||||
|
"groups",
|
||||||
|
"remove-secret",
|
||||||
|
"admin-group",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
cli.run(["secrets", "remove", "key", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "remove", "key"])
|
||||||
cli.run(["secrets", "remove", "key2", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "remove", "key2"])
|
||||||
|
|
||||||
capsys.readouterr() # empty the buffer
|
capsys.readouterr() # empty the buffer
|
||||||
cli.run(["secrets", "list", test_flake.name])
|
cli.run(["--flake", str(test_flake.path), "secrets", "list"])
|
||||||
assert capsys.readouterr().out == ""
|
assert capsys.readouterr().out == ""
|
||||||
|
|||||||
@@ -23,28 +23,29 @@ def test_generate_secret(
|
|||||||
cli = Cli()
|
cli = Cli()
|
||||||
cli.run(
|
cli.run(
|
||||||
[
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake_with_core.path),
|
||||||
"secrets",
|
"secrets",
|
||||||
"users",
|
"users",
|
||||||
"add",
|
"add",
|
||||||
"user1",
|
"user1",
|
||||||
age_keys[0].pubkey,
|
age_keys[0].pubkey,
|
||||||
test_flake_with_core.name,
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
|
cli.run(["--flake", str(test_flake_with_core.path), "secrets", "generate", "vm1"])
|
||||||
has_secret(test_flake_with_core.name, "vm1-age.key")
|
has_secret(test_flake_with_core.path, "vm1-age.key")
|
||||||
has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret")
|
has_secret(test_flake_with_core.path, "vm1-zerotier-identity-secret")
|
||||||
network_id = machine_get_fact(
|
network_id = machine_get_fact(
|
||||||
test_flake_with_core.name, "vm1", "zerotier-network-id"
|
test_flake_with_core.name, "vm1", "zerotier-network-id"
|
||||||
)
|
)
|
||||||
assert len(network_id) == 16
|
assert len(network_id) == 16
|
||||||
age_key = (
|
age_key = (
|
||||||
sops_secrets_folder(test_flake_with_core.name)
|
sops_secrets_folder(test_flake_with_core.path)
|
||||||
.joinpath("vm1-age.key")
|
.joinpath("vm1-age.key")
|
||||||
.joinpath("secret")
|
.joinpath("secret")
|
||||||
)
|
)
|
||||||
identity_secret = (
|
identity_secret = (
|
||||||
sops_secrets_folder(test_flake_with_core.name)
|
sops_secrets_folder(test_flake_with_core.path)
|
||||||
.joinpath("vm1-zerotier-identity-secret")
|
.joinpath("vm1-zerotier-identity-secret")
|
||||||
.joinpath("secret")
|
.joinpath("secret")
|
||||||
)
|
)
|
||||||
@@ -52,12 +53,12 @@ def test_generate_secret(
|
|||||||
secret1_mtime = identity_secret.lstat().st_mtime_ns
|
secret1_mtime = identity_secret.lstat().st_mtime_ns
|
||||||
|
|
||||||
# test idempotency
|
# test idempotency
|
||||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
|
cli.run(["secrets", "generate", "vm1"])
|
||||||
assert age_key.lstat().st_mtime_ns == age_key_mtime
|
assert age_key.lstat().st_mtime_ns == age_key_mtime
|
||||||
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
|
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
|
||||||
|
|
||||||
machine_path = (
|
machine_path = (
|
||||||
sops_secrets_folder(test_flake_with_core.name)
|
sops_secrets_folder(test_flake_with_core.path)
|
||||||
.joinpath("vm1-zerotier-identity-secret")
|
.joinpath("vm1-zerotier-identity-secret")
|
||||||
.joinpath("machines")
|
.joinpath("machines")
|
||||||
.joinpath("vm1")
|
.joinpath("vm1")
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ def test_upload_secret(
|
|||||||
check=True,
|
check=True,
|
||||||
)
|
)
|
||||||
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
|
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
|
||||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
|
cli.run(["secrets", "generate", "vm1"])
|
||||||
network_id = machine_get_fact(
|
network_id = machine_get_fact(
|
||||||
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
|
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
|
||||||
)
|
)
|
||||||
@@ -50,7 +50,7 @@ def test_upload_secret(
|
|||||||
secret1_mtime = identity_secret.lstat().st_mtime_ns
|
secret1_mtime = identity_secret.lstat().st_mtime_ns
|
||||||
|
|
||||||
# test idempotency
|
# test idempotency
|
||||||
cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
|
cli.run(["secrets", "generate", "vm1"])
|
||||||
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
|
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
|
||||||
|
|
||||||
flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
|
flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
|
||||||
@@ -58,7 +58,7 @@ def test_upload_secret(
|
|||||||
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
|
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
|
||||||
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
|
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
|
||||||
flake.write_text(new_text)
|
flake.write_text(new_text)
|
||||||
cli.run(["secrets", "upload", "vm1", test_flake_with_core_and_pass.name])
|
cli.run(["secrets", "upload", "vm1"])
|
||||||
zerotier_identity_secret = (
|
zerotier_identity_secret = (
|
||||||
test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
|
test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -23,27 +23,31 @@ def test_secrets_upload(
|
|||||||
cli = Cli()
|
cli = Cli()
|
||||||
cli.run(
|
cli.run(
|
||||||
[
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake_with_core.path),
|
||||||
"secrets",
|
"secrets",
|
||||||
"users",
|
"users",
|
||||||
"add",
|
"add",
|
||||||
"user1",
|
"user1",
|
||||||
age_keys[0].pubkey,
|
age_keys[0].pubkey,
|
||||||
test_flake_with_core.name,
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
cli.run(
|
cli.run(
|
||||||
[
|
[
|
||||||
|
"--flake",
|
||||||
|
str(test_flake_with_core.path),
|
||||||
"secrets",
|
"secrets",
|
||||||
"machines",
|
"machines",
|
||||||
"add",
|
"add",
|
||||||
"vm1",
|
"vm1",
|
||||||
age_keys[1].pubkey,
|
age_keys[1].pubkey,
|
||||||
test_flake_with_core.name,
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
|
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
|
||||||
cli.run(["secrets", "set", "vm1-age.key", test_flake_with_core.name])
|
cli.run(
|
||||||
|
["--flake", str(test_flake_with_core.path), "secrets", "set", "vm1-age.key"]
|
||||||
|
)
|
||||||
|
|
||||||
flake = test_flake_with_core.path.joinpath("flake.nix")
|
flake = test_flake_with_core.path.joinpath("flake.nix")
|
||||||
host = host_group.hosts[0]
|
host = host_group.hosts[0]
|
||||||
@@ -51,7 +55,7 @@ def test_secrets_upload(
|
|||||||
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
|
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
|
||||||
|
|
||||||
flake.write_text(new_text)
|
flake.write_text(new_text)
|
||||||
cli.run(["secrets", "upload", "vm1", test_flake_with_core.name])
|
cli.run(["--flake", str(test_flake_with_core.path), "secrets", "upload", "vm1"])
|
||||||
|
|
||||||
# the flake defines this path as the location where the sops key should be installed
|
# the flake defines this path as the location where the sops key should be installed
|
||||||
sops_key = test_flake_with_core.path.joinpath("key.txt")
|
sops_key = test_flake_with_core.path.joinpath("key.txt")
|
||||||
|
|||||||
@@ -92,12 +92,13 @@ def test_create_local(
|
|||||||
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
|
||||||
cli = Cli()
|
cli = Cli()
|
||||||
cmd = [
|
cmd = [
|
||||||
|
"--flake",
|
||||||
|
str(flake_with_vm_with_secrets.path),
|
||||||
"secrets",
|
"secrets",
|
||||||
"users",
|
"users",
|
||||||
"add",
|
"add",
|
||||||
"user1",
|
"user1",
|
||||||
age_keys[0].pubkey,
|
age_keys[0].pubkey,
|
||||||
flake_with_vm_with_secrets.name,
|
|
||||||
]
|
]
|
||||||
cli.run(cmd)
|
cli.run(cmd)
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ def test_create(
|
|||||||
"add",
|
"add",
|
||||||
"user1",
|
"user1",
|
||||||
age_keys[0].pubkey,
|
age_keys[0].pubkey,
|
||||||
test_flake_with_core.name,
|
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
cli.run(["vms", "create", "vm1", test_flake_with_core.name])
|
cli.run(["vms", "create", "vm1", test_flake_with_core.name])
|
||||||
|
|||||||
Reference in New Issue
Block a user