Merge branch 'main' into DavHau-dave

This commit is contained in:
Mic92
2024-07-09 09:33:11 +00:00
51 changed files with 1930 additions and 748 deletions

View File

@@ -1,3 +1,4 @@
# shellcheck shell=bash
source_up
watch_file flake-module.nix shell.nix default.nix

View File

@@ -23,6 +23,7 @@ from . import (
machines,
secrets,
state,
vars,
vms,
)
from .clan_uri import FlakeId
@@ -272,6 +273,43 @@ For more detailed information, visit: {help_hyperlink("secrets", "https://docs.c
)
facts.register_parser(parser_facts)
# like facts but with vars instead of facts
parser_vars = subparsers.add_parser(
"vars",
help="WIP: manage vars",
description="WIP: manage vars",
epilog=(
f"""
This subcommand provides an interface to vars of clan machines.
Vars are variables that a service can generate.
There are public and secret vars.
Public vars can be referenced by other machines directly.
Public vars can include: ip addresses, public keys.
Secret vars can include: passwords, private keys.
A service is an included clan-module that implements vars generation functionality.
For example the zerotier module will generate private and public vars.
In this case the public var will be the resulting zerotier-ip of the machine.
The secret var will be the zerotier-identity-secret, which is used by zerotier
to prove the machine has control of the zerotier-ip.
Examples:
$ clan vars generate
Will generate vars for all machines.
$ clan vars generate --service [SERVICE] --regenerate
Will regenerate vars, if they are already generated for a specific service.
This is especially useful for resetting certain passwords while leaving the rest
of the vars for a machine in place.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
vars.register_parser(parser_vars)
parser_machine = subparsers.add_parser(
"machines",
help="manage machines and their configuration",

View File

@@ -76,6 +76,7 @@ def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) ->
properties = {
f.name: type_to_dict(f.type, f"{scope} {t.__name__}.{f.name}", type_map)
for f in fields
if not f.name.startswith("_")
}
required = set()
@@ -127,7 +128,7 @@ def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) ->
if origin is None:
# Non-generic user-defined or built-in type
# TODO: handle custom types
raise JSchemaTypeError("Unhandled Type: ", origin)
raise JSchemaTypeError(f"{scope} Unhandled Type: ", origin)
elif origin is Literal:
# Handle Literal values for enums in JSON Schema
@@ -172,7 +173,7 @@ def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) ->
new_map.update(inspect_dataclass_fields(t))
return type_to_dict(origin, scope, new_map)
raise JSchemaTypeError(f"Error api type not yet supported {t!s}")
raise JSchemaTypeError(f"{scope} - Error api type not yet supported {t!s}")
elif isinstance(t, type):
if t is str:
@@ -187,7 +188,7 @@ def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) ->
return {"type": "object"}
if t is Any:
raise JSchemaTypeError(
f"Usage of the Any type is not supported for API functions. In: {scope}"
f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}"
)
if t is pathlib.Path:
return {
@@ -196,13 +197,13 @@ def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) ->
}
if t is dict:
raise JSchemaTypeError(
"Error: generic dict type not supported. Use dict[str, Any] instead"
f"{scope} - Generic 'dict' type not supported. Use dict[str, Any] or any more expressive type."
)
# Optional[T] gets internally transformed Union[T,NoneType]
if t is NoneType:
return {"type": "null"}
raise JSchemaTypeError(f"Error primitive type not supported {t!s}")
raise JSchemaTypeError(f"{scope} - Error primitive type not supported {t!s}")
else:
raise JSchemaTypeError(f"Error type not supported {t!s}")
raise JSchemaTypeError(f"{scope} - Error type not supported {t!s}")

View File

@@ -81,11 +81,11 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
else:
raise ClanError(f"Invalid value {value} for boolean")
# handle lists
elif get_origin(input_type) == list:
elif get_origin(input_type) is list:
subtype = input_type.__args__[0]
return [cast([x], subtype, opt_description) for x in value]
# handle dicts
elif get_origin(input_type) == dict:
elif get_origin(input_type) is dict:
if not isinstance(value, dict):
raise ClanError(
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"

View File

@@ -20,7 +20,7 @@ class Machine:
name: str
flake: FlakeId
nix_options: list[str] = field(default_factory=list)
cached_deployment: None | dict = None
cached_deployment: None | dict[str, Any] = None
_eval_cache: dict[str, str] = field(default_factory=dict)
_build_cache: dict[str, Path] = field(default_factory=dict)
@@ -69,12 +69,26 @@ class Machine:
def public_facts_module(self) -> str:
return self.deployment["facts"]["publicModule"]
@property
def secret_vars_module(self) -> str:
return self.deployment["vars"]["secretModule"]
@property
def public_vars_module(self) -> str:
return self.deployment["vars"]["publicModule"]
@property
def facts_data(self) -> dict[str, dict[str, Any]]:
if self.deployment["facts"]["services"]:
return self.deployment["facts"]["services"]
return {}
@property
def vars_generators(self) -> dict[str, dict[str, Any]]:
if self.deployment["vars"]["generators"]:
return self.deployment["vars"]["generators"]
return {}
@property
def secrets_upload_directory(self) -> str:
return self.deployment["facts"]["secretUploadDirectory"]

View File

@@ -0,0 +1,132 @@
# !/usr/bin/env python3
import argparse
from ..hyperlink import help_hyperlink
from .check import register_check_parser
from .generate import register_generate_parser
from .list import register_list_parser
from .upload import register_upload_parser
# takes a (sub)parser and configures it
def register_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
check_parser = subparser.add_parser(
"check",
help="check if facts are up to date",
epilog=(
f"""
This subcommand allows checking if all facts are up to date.
Examples:
$ clan facts check [MACHINE]
Will check facts for the specified machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_check_parser(check_parser)
list_parser = subparser.add_parser(
"list",
help="list all facts",
epilog=(
f"""
This subcommand allows listing all public facts for a specific machine.
The resulting list will be a json string with the name of the fact as its key
and the fact itself as it's value.
This is how an example output might look like:
```
\u007b
"[FACT_NAME]": "[FACT]"
\u007d
```
Examples:
$ clan facts list [MACHINE]
Will list facts for the specified machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_list_parser(list_parser)
parser_generate = subparser.add_parser(
"generate",
help="generate public and secret facts for machines",
epilog=(
f"""
This subcommand allows control of the generation of facts.
Often this function will be invoked automatically on deploying machines,
but there are situations the user may want to have more granular control,
especially for the regeneration of certain services.
A service is an included clan-module that implements facts generation functionality.
For example the zerotier module will generate private and public facts.
In this case the public fact will be the resulting zerotier-ip of the machine.
The secret fact will be the zerotier-identity-secret, which is used by zerotier
to prove the machine has control of the zerotier-ip.
Examples:
$ clan facts generate
Will generate facts for all machines.
$ clan facts generate [MACHINE]
Will generate facts for the specified machine.
$ clan facts generate [MACHINE] --service [SERVICE]
Will generate facts for the specified machine for the specified service.
$ clan facts generate --service [SERVICE] --regenerate
Will regenerate facts, if they are already generated for a specific service.
This is especially useful for resetting certain passwords while leaving the rest
of the facts for a machine in place.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_generate_parser(parser_generate)
parser_upload = subparser.add_parser(
"upload",
help="upload secrets for machines",
epilog=(
f"""
This subcommand allows uploading secrets to remote machines.
If using sops as a secret backend it will upload the private key to the machine.
If using password store it uploads all the secrets you manage to the machine.
The default backend is sops.
Examples:
$ clan facts upload [MACHINE]
Will upload secrets to a specific machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/getting-started/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_upload_parser(parser_upload)

View File

@@ -0,0 +1,64 @@
import argparse
import importlib
import logging
from ..completions import add_dynamic_completer, complete_machines
from ..machines.machines import Machine
log = logging.getLogger(__name__)
def check_secrets(machine: Machine, generator_name: None | str = None) -> bool:
secret_vars_module = importlib.import_module(machine.secret_vars_module)
secret_vars_store = secret_vars_module.SecretStore(machine=machine)
public_vars_module = importlib.import_module(machine.public_vars_module)
public_vars_store = public_vars_module.FactStore(machine=machine)
missing_secret_vars = []
missing_public_vars = []
if generator_name:
services = [generator_name]
else:
services = list(machine.vars_generators.keys())
for generator_name in services:
for name, file in machine.vars_generators[generator_name]["files"].items():
if file["secret"] and not secret_vars_store.exists(generator_name, name):
log.info(
f"Secret fact '{name}' for service '{generator_name}' in machine {machine.name} is missing."
)
missing_secret_vars.append((generator_name, name))
if not file["secret"] and not public_vars_store.exists(
generator_name, name
):
log.info(
f"Public fact '{name}' for service '{generator_name}' in machine {machine.name} is missing."
)
missing_public_vars.append((generator_name, name))
log.debug(f"missing_secret_vars: {missing_secret_vars}")
log.debug(f"missing_public_vars: {missing_public_vars}")
if missing_secret_vars or missing_public_vars:
return False
return True
def check_command(args: argparse.Namespace) -> None:
machine = Machine(
name=args.machine,
flake=args.flake,
)
check_secrets(machine, generator_name=args.service)
def register_check_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to check secrets for",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.add_argument(
"--service",
help="the service to check",
)
parser.set_defaults(func=check_command)

View File

@@ -0,0 +1,240 @@
import argparse
import importlib
import logging
import os
import subprocess
import sys
from collections.abc import Callable
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_cli.cmd import run
from ..completions import (
add_dynamic_completer,
complete_machines,
complete_services_for_machine,
)
from ..errors import ClanError
from ..git import commit_files
from ..machines.inventory import get_all_machines, get_selected_machines
from ..machines.machines import Machine
from ..nix import nix_shell
from .check import check_secrets
from .public_modules import FactStoreBase
from .secret_modules import SecretStoreBase
log = logging.getLogger(__name__)
def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
"""
Read multi-line input from stdin.
"""
print(prompt, flush=True)
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True)
log.info("Input received. Processing...")
return proc.stdout
def bubblewrap_cmd(generator: str, generator_dir: Path) -> list[str]:
# fmt: off
return nix_shell(
[
"nixpkgs#bash",
"nixpkgs#bubblewrap",
],
[
"bwrap",
"--ro-bind", "/nix/store", "/nix/store",
"--tmpfs", "/usr/lib/systemd",
"--dev", "/dev",
"--bind", str(generator_dir), str(generator_dir),
"--unshare-all",
"--unshare-user",
"--uid", "1000",
"--",
"bash", "-c", generator
],
)
# fmt: on
def execute_generator(
machine: Machine,
generator_name: str,
regenerate: bool,
secret_vars_store: SecretStoreBase,
public_vars_store: FactStoreBase,
tmpdir: Path,
prompt: Callable[[str], str],
) -> bool:
generator_dir = tmpdir / generator_name
# check if all secrets exist and generate them if at least one is missing
needs_regeneration = not check_secrets(machine, generator_name=generator_name)
log.debug(f"{generator_name} needs_regeneration: {needs_regeneration}")
if not (needs_regeneration or regenerate):
return False
if not isinstance(machine.flake, Path):
msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes"
env = os.environ.copy()
generator_dir.mkdir(parents=True)
env["out"] = str(generator_dir)
# compatibility for old outputs.nix users
generator = machine.vars_generators[generator_name]["finalScript"]
# if machine.vars_data[generator_name]["generator"]["prompt"]:
# prompt_value = prompt(machine.vars_data[generator_name]["generator"]["prompt"])
# env["prompt_value"] = prompt_value
if sys.platform == "linux":
cmd = bubblewrap_cmd(generator, generator_dir)
else:
cmd = ["bash", "-c", generator]
run(
cmd,
env=env,
)
files_to_commit = []
# store secrets
files = machine.vars_generators[generator_name]["files"]
for file_name, file in files.items():
groups = file.get("groups", [])
secret_file = generator_dir / file_name
if not secret_file.is_file():
msg = f"did not generate a file for '{file_name}' when running the following command:\n"
msg += generator
raise ClanError(msg)
if file["secret"]:
file_path = secret_vars_store.set(
generator_name, file_name, secret_file.read_bytes(), groups
)
else:
file_path = public_vars_store.set(
generator_name, file_name, secret_file.read_bytes()
)
if file_path:
files_to_commit.append(file_path)
commit_files(
files_to_commit,
machine.flake_dir,
f"Update facts/secrets for service {generator_name} in machine {machine.name}",
)
return True
def prompt_func(text: str) -> str:
print(f"{text}: ")
return read_multiline_input()
def _generate_vars_for_machine(
machine: Machine,
generator_name: str | None,
regenerate: bool,
tmpdir: Path,
prompt: Callable[[str], str] = prompt_func,
) -> bool:
local_temp = tmpdir / machine.name
local_temp.mkdir()
secret_vars_module = importlib.import_module(machine.secret_vars_module)
secret_vars_store = secret_vars_module.SecretStore(machine=machine)
public_vars_module = importlib.import_module(machine.public_vars_module)
public_vars_store = public_vars_module.FactStore(machine=machine)
machine_updated = False
if generator_name and generator_name not in machine.vars_generators:
generators = list(machine.vars_generators.keys())
raise ClanError(
f"Could not find generator with name: {generator_name}. The following generators are available: {generators}"
)
if generator_name:
machine_generator_facts = {
generator_name: machine.vars_generators[generator_name]
}
else:
machine_generator_facts = machine.vars_generators
for generator_name in machine_generator_facts:
machine_updated |= execute_generator(
machine=machine,
generator_name=generator_name,
regenerate=regenerate,
secret_vars_store=secret_vars_store,
public_vars_store=public_vars_store,
tmpdir=local_temp,
prompt=prompt,
)
if machine_updated:
# flush caches to make sure the new secrets are available in evaluation
machine.flush_caches()
return machine_updated
def generate_vars(
machines: list[Machine],
generator_name: str | None,
regenerate: bool,
prompt: Callable[[str], str] = prompt_func,
) -> bool:
was_regenerated = False
with TemporaryDirectory() as tmp:
tmpdir = Path(tmp)
for machine in machines:
errors = 0
try:
was_regenerated |= _generate_vars_for_machine(
machine, generator_name, regenerate, tmpdir, prompt
)
except Exception as exc:
log.error(f"Failed to generate facts for {machine.name}: {exc}")
errors += 1
if errors > 0:
raise ClanError(
f"Failed to generate facts for {errors} hosts. Check the logs above"
)
if not was_regenerated:
print("All secrets and facts are already up to date")
return was_regenerated
def generate_command(args: argparse.Namespace) -> None:
if len(args.machines) == 0:
machines = get_all_machines(args.flake, args.option)
else:
machines = get_selected_machines(args.flake, args.option, args.machines)
generate_vars(machines, args.service, args.regenerate)
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machines",
type=str,
help="machine to generate facts for. if empty, generate facts for all machines",
nargs="*",
default=[],
)
add_dynamic_completer(machines_parser, complete_machines)
service_parser = parser.add_argument(
"--service",
type=str,
help="service to generate facts for, if empty, generate facts for every service",
default=None,
)
add_dynamic_completer(service_parser, complete_services_for_machine)
parser.add_argument(
"--regenerate",
type=bool,
action=argparse.BooleanOptionalAction,
help="whether to regenerate facts for the specified machine",
default=None,
)
parser.set_defaults(func=generate_command)

View File

@@ -0,0 +1,47 @@
import argparse
import importlib
import json
import logging
from ..completions import add_dynamic_completer, complete_machines
from ..machines.machines import Machine
log = logging.getLogger(__name__)
# TODO get also secret facts
def get_all_facts(machine: Machine) -> dict:
public_facts_module = importlib.import_module(machine.public_facts_module)
public_facts_store = public_facts_module.FactStore(machine=machine)
# for service in machine.secrets_data:
# facts[service] = {}
# for fact in machine.secrets_data[service]["facts"]:
# fact_content = fact_store.get(service, fact)
# if fact_content:
# facts[service][fact] = fact_content.decode()
# else:
# log.error(f"Fact {fact} for service {service} is missing")
return public_facts_store.get_all()
def get_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake=args.flake)
# the raw_facts are bytestrings making them not json serializable
raw_facts = get_all_facts(machine)
facts = dict()
for key in raw_facts["TODO"]:
facts[key] = raw_facts["TODO"][key].decode("utf8")
print(json.dumps(facts, indent=4))
def register_list_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to print facts for",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.set_defaults(func=get_command)

View File

@@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
from pathlib import Path
from clan_cli.machines.machines import Machine
class FactStoreBase(ABC):
@abstractmethod
def __init__(self, machine: Machine) -> None:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
@abstractmethod
def set(self, service: str, name: str, value: bytes) -> Path | None:
pass
# get a single fact
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
# get all facts
@abstractmethod
def get_all(self) -> dict[str, dict[str, bytes]]:
pass

View File

@@ -0,0 +1,64 @@
from pathlib import Path
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine
from . import FactStoreBase
class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
def set(self, generator_name: str, name: str, value: bytes) -> Path | None:
if self.machine.flake.is_local():
fact_path = (
self.machine.flake.path
/ "machines"
/ self.machine.name
/ "vars"
/ generator_name
/ name
)
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.touch()
fact_path.write_bytes(value)
return fact_path
else:
raise ClanError(
f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
)
def exists(self, generator_name: str, name: str) -> bool:
fact_path = (
self.machine.flake_dir
/ "machines"
/ self.machine.name
/ "vars"
/ generator_name
/ name
)
return fact_path.exists()
# get a single fact
def get(self, generator_name: str, name: str) -> bytes:
fact_path = (
self.machine.flake_dir
/ "machines"
/ self.machine.name
/ "vars"
/ generator_name
/ name
)
return fact_path.read_bytes()
# get all public vars
def get_all(self) -> dict[str, dict[str, bytes]]:
facts_folder = self.machine.flake_dir / "machines" / self.machine.name / "vars"
facts: dict[str, dict[str, bytes]] = {}
facts["TODO"] = {}
if facts_folder.exists():
for fact_path in facts_folder.iterdir():
facts["TODO"][fact_path.name] = fact_path.read_bytes()
return facts

View File

@@ -0,0 +1,46 @@
import logging
from pathlib import Path
from clan_cli.dirs import vm_state_dir
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine
from . import FactStoreBase
log = logging.getLogger(__name__)
class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
self.dir = vm_state_dir(str(machine.flake), machine.name) / "facts"
log.debug(f"FactStore initialized with dir {self.dir}")
def exists(self, service: str, name: str) -> bool:
fact_path = self.dir / service / name
return fact_path.exists()
def set(self, service: str, name: str, value: bytes) -> Path | None:
fact_path = self.dir / service / name
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.write_bytes(value)
return None
# get a single fact
def get(self, service: str, name: str) -> bytes:
fact_path = self.dir / service / name
if fact_path.exists():
return fact_path.read_bytes()
raise ClanError(f"Fact {name} for service {service} not found")
# get all facts
def get_all(self) -> dict[str, dict[str, bytes]]:
facts: dict[str, dict[str, bytes]] = {}
if self.dir.exists():
for service in self.dir.iterdir():
facts[service.name] = {}
for fact in service.iterdir():
facts[service.name][fact.name] = fact.read_bytes()
return facts

View File

@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from pathlib import Path
from clan_cli.machines.machines import Machine
class SecretStoreBase(ABC):
@abstractmethod
def __init__(self, machine: Machine) -> None:
pass
@abstractmethod
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
pass
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
def update_check(self) -> bool:
return False
@abstractmethod
def upload(self, output_dir: Path) -> None:
pass

View File

@@ -0,0 +1,117 @@
import os
import subprocess
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value,
check=True,
)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["nixpkgs#git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
# FIXME: add this when we switch to python3.12
# @override
def update_check(self) -> bool:
local_hash = self.generate_hash()
remote_hash = self.machine.target_host.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
check=False,
stdout=subprocess.PIPE,
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return False
return local_hash.decode() == remote_hash
def upload(self, output_dir: Path) -> None:
for service in self.machine.facts_data:
for secret in self.machine.facts_data[service]["secret"]:
if isinstance(secret, dict):
secret_name = secret["name"]
else:
# TODO: drop old format soon
secret_name = secret
(output_dir / secret_name).write_bytes(self.get(service, secret_name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -0,0 +1,69 @@
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "vars_data") or not self.machine.vars_generators:
return
for generator in self.machine.vars_generators.values():
if "files" in generator:
for file in generator["files"].values():
if file["secret"]:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(
self, generator_name: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
path = (
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-{generator_name}-{name}"
)
encrypt_secret(
self.machine.flake_dir,
path,
value,
add_machines=[self.machine.name],
add_groups=groups,
)
return path
def get(self, service: str, name: str) -> bytes:
return decrypt_secret(
self.machine.flake_dir, f"{self.machine.name}-{name}"
).encode("utf-8")
def exists(self, service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
(output_dir / "key.txt").write_text(key)

View File

@@ -0,0 +1,35 @@
import os
import shutil
from pathlib import Path
from clan_cli.dirs import vm_state_dir
from clan_cli.machines.machines import Machine
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.dir = vm_state_dir(str(machine.flake), machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
secret_file = self.dir / service / name
secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
secret_file = self.dir / service / name
return secret_file.read_bytes()
def exists(self, service: str, name: str) -> bool:
return (self.dir / service / name).exists()
def upload(self, output_dir: Path) -> None:
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
shutil.copytree(self.dir, output_dir)

View File

@@ -0,0 +1,58 @@
import argparse
import importlib
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from ..cmd import Log, run
from ..completions import add_dynamic_completer, complete_machines
from ..machines.machines import Machine
from ..nix import nix_shell
log = logging.getLogger(__name__)
def upload_secrets(machine: Machine) -> None:
secret_facts_module = importlib.import_module(machine.secret_facts_module)
secret_facts_store = secret_facts_module.SecretStore(machine=machine)
if secret_facts_store.update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
secret_facts_store.upload(Path(tempdir))
host = machine.target_host
ssh_cmd = host.ssh_cmd()
run(
nix_shell(
["nixpkgs#rsync"],
[
"rsync",
"-e",
" ".join(["ssh"] + ssh_cmd[2:]),
"-az",
"--delete",
"--chown=root:root",
"--chmod=D700,F600",
f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
],
),
log=Log.BOTH,
)
def upload_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake=args.flake)
upload_secrets(machine)
def register_upload_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to upload secrets to",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.set_defaults(func=upload_command)

View File

@@ -254,6 +254,9 @@ def collect_commands() -> list[Category]:
if isinstance(action, argparse._SubParsersAction):
subparsers: dict[str, argparse.ArgumentParser] = action.choices
for name, subparser in subparsers.items():
if str(subparser.description).startswith("WIP"):
print(f"Excluded {name} from documentation as it is marked as WIP")
continue
(_options, _positionals, _subcommands) = get_subcommands(
subparser, to=result, level=2, prefix=[name]
)

View File

@@ -58,6 +58,7 @@
python docs.py reference
mkdir -p $out
cp -r out/* $out
ls -lah $out
'';
};
clan-ts-api = pkgs.stdenv.mkDerivation {

View File

@@ -0,0 +1,142 @@
import ast
import importlib.util
import os
import sys
from dataclasses import is_dataclass
from pathlib import Path
from clan_cli.api.util import JSchemaTypeError, type_to_dict
from clan_cli.errors import ClanError
def find_dataclasses_in_directory(
directory: Path, exclude_paths: list[str] = []
) -> list[tuple[str, str]]:
"""
Find all dataclass classes in all Python files within a nested directory.
Args:
directory (str): The root directory to start searching from.
Returns:
List[Tuple[str, str]]: A list of tuples containing the file path and the dataclass name.
"""
dataclass_files = []
excludes = [os.path.join(directory, d) for d in exclude_paths]
for root, _, files in os.walk(directory, topdown=False):
for file in files:
if not file.endswith(".py"):
continue
file_path = os.path.join(root, file)
if file_path in excludes:
print(f"Skipping dataclass check for file: {file_path}")
continue
with open(file_path, encoding="utf-8") as f:
try:
tree = ast.parse(f.read(), filename=file_path)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
for deco in node.decorator_list:
if (
isinstance(deco, ast.Name)
and deco.id == "dataclass"
):
dataclass_files.append((file_path, node.name))
elif (
isinstance(deco, ast.Call)
and isinstance(deco.func, ast.Name)
and deco.func.id == "dataclass"
):
dataclass_files.append((file_path, node.name))
except (SyntaxError, UnicodeDecodeError) as e:
print(f"Error parsing {file_path}: {e}")
return dataclass_files
def load_dataclass_from_file(
file_path: str, class_name: str, root_dir: str
) -> type | None:
"""
Load a dataclass from a given file path.
Args:
file_path (str): Path to the file.
class_name (str): Name of the class to load.
Returns:
List[Type]: The dataclass type if found, else an empty list.
"""
module_name = (
os.path.relpath(file_path, root_dir).replace(os.path.sep, ".").rstrip(".py")
)
try:
sys.path.insert(0, root_dir)
spec = importlib.util.spec_from_file_location(module_name, file_path)
if not spec:
raise ClanError(f"Could not load spec from file: {file_path}")
module = importlib.util.module_from_spec(spec)
if not module:
raise ClanError(f"Could not create module: {file_path}")
if not spec.loader:
raise ClanError(f"Could not load loader from spec: {spec}")
spec.loader.exec_module(module)
finally:
sys.path.pop(0)
dataclass_type = getattr(module, class_name, None)
if dataclass_type and is_dataclass(dataclass_type):
return dataclass_type
raise ClanError(f"Could not load dataclass {class_name} from file: {file_path}")
def test_all_dataclasses() -> None:
"""
This Test ensures that all dataclasses are compatible with the API.
It will load all dataclasses from the clan_cli directory and
generate a JSON schema for each of them.
It will fail if any dataclass cannot be converted to JSON schema.
This means the dataclass in its current form is not compatible with the API.
"""
# Excludes:
# - API includes Type Generic wrappers, that are not known in the init file.
excludes = ["api/__init__.py"]
cli_path = Path("clan_cli").resolve()
dataclasses = find_dataclasses_in_directory(cli_path, excludes)
for file, dataclass in dataclasses:
print(f"checking dataclass {dataclass} in file: {file}")
try:
dclass = load_dataclass_from_file(file, dataclass, str(cli_path.parent))
type_to_dict(dclass)
except JSchemaTypeError as e:
print(f"Error loading dataclass {dataclass} from {file}: {e}")
raise ClanError(
f"""
--------------------------------------------------------------------------------
Error converting dataclass 'class {dataclass}()' from {file}
Details:
{e}
Help:
- Converting public fields to PRIVATE by prefixing them with underscore ('_')
- Ensure all private fields are initialized the API wont provide initial values for them.
--------------------------------------------------------------------------------
""",
location=__file__,
)

View File

@@ -108,7 +108,7 @@ def test_type_from_schema_path_simple() -> None:
schema = dict(
type="boolean",
)
assert parsing.type_from_schema_path(schema, []) == bool
assert parsing.type_from_schema_path(schema, []) is bool
def test_type_from_schema_path_nested() -> None:
@@ -125,8 +125,8 @@ def test_type_from_schema_path_nested() -> None:
age=dict(type="integer"),
),
)
assert parsing.type_from_schema_path(schema, ["age"]) == int
assert parsing.type_from_schema_path(schema, ["name", "first"]) == str
assert parsing.type_from_schema_path(schema, ["age"]) is int
assert parsing.type_from_schema_path(schema, ["name", "first"]) is str
def test_type_from_schema_path_dynamic_attrs() -> None:
@@ -140,16 +140,16 @@ def test_type_from_schema_path_dynamic_attrs() -> None:
),
),
)
assert parsing.type_from_schema_path(schema, ["age"]) == int
assert parsing.type_from_schema_path(schema, ["users", "foo"]) == str
assert parsing.type_from_schema_path(schema, ["age"]) is int
assert parsing.type_from_schema_path(schema, ["users", "foo"]) is str
def test_map_type() -> None:
with pytest.raises(ClanError):
config.map_type("foo")
assert config.map_type("string") == str
assert config.map_type("integer") == int
assert config.map_type("boolean") == bool
assert config.map_type("string") is str
assert config.map_type("integer") is int
assert config.map_type("boolean") is bool
assert config.map_type("attribute set of string") == dict[str, str]
assert config.map_type("attribute set of integer") == dict[str, int]
assert config.map_type("null or string") == str | None

View File

@@ -0,0 +1,49 @@
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from fixtures_flakes import generate_flake
from helpers.cli import Cli
from root import CLAN_CORE
if TYPE_CHECKING:
pass
@pytest.mark.impure
def test_generate_secret(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
# age_keys: list["KeyPair"],
) -> None:
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(
my_machine=dict(
clan=dict(
core=dict(
vars=dict(
generators=dict(
my_generator=dict(
files=dict(
my_secret=dict(
secret=False,
)
),
script="echo hello > $out/my_secret",
)
)
)
)
)
)
),
)
monkeypatch.chdir(flake.path)
cli = Cli()
cmd = ["vars", "generate", "--flake", str(flake.path), "my_machine"]
cli.run(cmd)
assert (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
).is_file()