Invalidation doesn't need to be done after each generator is executed. We cannot interpolate values from other generators into another generator. The generators are executed in order. The finalScript of each generator stays constant. After the complete closure is generated the caller of generate may decide to invalidate the flake cache
569 lines
18 KiB
Python
569 lines
18 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from functools import cached_property
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from clan_cli.cmd import RunOpts, run
|
|
from clan_cli.completions import (
|
|
add_dynamic_completer,
|
|
complete_machines,
|
|
complete_services_for_machine,
|
|
)
|
|
from clan_cli.errors import ClanError
|
|
from clan_cli.git import commit_files
|
|
from clan_cli.machines.inventory import get_all_machines, get_selected_machines
|
|
from clan_cli.nix import nix_config, nix_shell, nix_test_store
|
|
from clan_cli.vars._types import StoreBase
|
|
|
|
from .check import check_vars
|
|
from .graph import (
|
|
minimal_closure,
|
|
requested_closure,
|
|
)
|
|
from .prompt import Prompt, ask
|
|
from .var import Var
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
if TYPE_CHECKING:
|
|
from clan_cli.machines.machines import Machine
|
|
|
|
|
|
@dataclass
|
|
class Generator:
|
|
name: str
|
|
files: list[Var] = field(default_factory=list)
|
|
share: bool = False
|
|
prompts: list[Prompt] = field(default_factory=list)
|
|
dependencies: list[str] = field(default_factory=list)
|
|
|
|
migrate_fact: str | None = None
|
|
|
|
# TODO: remove this
|
|
_machine: "Machine | None" = None
|
|
|
|
def machine(self, machine: "Machine") -> None:
|
|
self._machine = machine
|
|
|
|
@cached_property
|
|
def exists(self) -> bool:
|
|
assert self._machine is not None
|
|
return check_vars(self._machine, generator_name=self.name)
|
|
|
|
@classmethod
|
|
def from_json(cls: type["Generator"], data: dict[str, Any]) -> "Generator":
|
|
return cls(
|
|
name=data["name"],
|
|
share=data["share"],
|
|
files=[Var.from_json(data["name"], f) for f in data["files"].values()],
|
|
dependencies=data["dependencies"],
|
|
migrate_fact=data["migrateFact"],
|
|
prompts=[Prompt.from_json(p) for p in data["prompts"].values()],
|
|
)
|
|
|
|
def final_script(self) -> Path:
|
|
assert self._machine is not None
|
|
final_script = self._machine.build_nix(
|
|
f'config.clan.core.vars.generators."{self.name}".finalScript'
|
|
)
|
|
return final_script
|
|
|
|
def validation(self) -> str | None:
|
|
assert self._machine is not None
|
|
return self._machine.eval_nix(
|
|
f'config.clan.core.vars.generators."{self.name}".validationHash'
|
|
)
|
|
|
|
|
|
def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]:
|
|
test_store = nix_test_store()
|
|
|
|
# fmt: off
|
|
return nix_shell(
|
|
[
|
|
"bash",
|
|
"bubblewrap",
|
|
],
|
|
[
|
|
"bwrap",
|
|
"--unshare-all",
|
|
"--tmpfs", "/",
|
|
"--ro-bind", "/nix/store", "/nix/store",
|
|
*(["--ro-bind", str(test_store), str(test_store)] if test_store else []),
|
|
"--dev", "/dev",
|
|
# not allowed to bind procfs in some sandboxes
|
|
"--bind", str(tmpdir), str(tmpdir),
|
|
"--chdir", "/",
|
|
# Doesn't work in our CI?
|
|
#"--proc", "/proc",
|
|
#"--hostname", "facts",
|
|
"--bind", "/proc", "/proc",
|
|
"--uid", "1000",
|
|
"--gid", "1000",
|
|
"--",
|
|
"bash", "-c", generator
|
|
],
|
|
)
|
|
# fmt: on
|
|
|
|
|
|
# TODO: implement caching to not decrypt the same secret multiple times
|
|
def decrypt_dependencies(
|
|
machine: "Machine",
|
|
generator: Generator,
|
|
secret_vars_store: StoreBase,
|
|
public_vars_store: StoreBase,
|
|
) -> dict[str, dict[str, bytes]]:
|
|
decrypted_dependencies: dict[str, Any] = {}
|
|
for generator_name in set(generator.dependencies):
|
|
decrypted_dependencies[generator_name] = {}
|
|
for dep_generator in machine.vars_generators:
|
|
if generator_name == dep_generator.name:
|
|
break
|
|
else:
|
|
msg = f"Could not find dependent generator {generator_name} in machine {machine.name}"
|
|
raise ClanError(msg)
|
|
dep_files = dep_generator.files
|
|
for file in dep_files:
|
|
if file.secret:
|
|
decrypted_dependencies[generator_name][file.name] = (
|
|
secret_vars_store.get(dep_generator, file.name)
|
|
)
|
|
else:
|
|
decrypted_dependencies[generator_name][file.name] = (
|
|
public_vars_store.get(dep_generator, file.name)
|
|
)
|
|
return decrypted_dependencies
|
|
|
|
|
|
# decrypt dependencies and return temporary file tree
|
|
def dependencies_as_dir(
|
|
decrypted_dependencies: dict[str, dict[str, bytes]],
|
|
tmpdir: Path,
|
|
) -> None:
|
|
for dep_generator, files in decrypted_dependencies.items():
|
|
dep_generator_dir = tmpdir / dep_generator
|
|
dep_generator_dir.mkdir()
|
|
dep_generator_dir.chmod(0o700)
|
|
for file_name, file in files.items():
|
|
file_path = dep_generator_dir / file_name
|
|
file_path.touch()
|
|
file_path.chmod(0o600)
|
|
file_path.write_bytes(file)
|
|
|
|
|
|
def execute_generator(
|
|
machine: "Machine",
|
|
generator: Generator,
|
|
secret_vars_store: StoreBase,
|
|
public_vars_store: StoreBase,
|
|
prompt_values: dict[str, str],
|
|
no_sandbox: bool = False,
|
|
) -> None:
|
|
if not isinstance(machine.flake, Path):
|
|
msg = f"flake is not a Path: {machine.flake}"
|
|
msg += "fact/secret generation is only supported for local flakes"
|
|
|
|
# build temporary file tree of dependencies
|
|
decrypted_dependencies = decrypt_dependencies(
|
|
machine,
|
|
generator,
|
|
secret_vars_store,
|
|
public_vars_store,
|
|
)
|
|
|
|
def get_prompt_value(prompt_name: str) -> str:
|
|
try:
|
|
return prompt_values[prompt_name]
|
|
except KeyError as e:
|
|
msg = f"prompt value for '{prompt_name}' in generator {generator.name} not provided"
|
|
raise ClanError(msg) from e
|
|
|
|
env = os.environ.copy()
|
|
with TemporaryDirectory(prefix="vars-") as _tmpdir:
|
|
tmpdir = Path(_tmpdir).resolve()
|
|
tmpdir_in = tmpdir / "in"
|
|
tmpdir_prompts = tmpdir / "prompts"
|
|
tmpdir_out = tmpdir / "out"
|
|
tmpdir_in.mkdir()
|
|
tmpdir_out.mkdir()
|
|
env["in"] = str(tmpdir_in)
|
|
env["out"] = str(tmpdir_out)
|
|
# populate dependency inputs
|
|
dependencies_as_dir(decrypted_dependencies, tmpdir_in)
|
|
# populate prompted values
|
|
# TODO: make prompts rest API friendly
|
|
if generator.prompts:
|
|
tmpdir_prompts.mkdir()
|
|
env["prompts"] = str(tmpdir_prompts)
|
|
for prompt in generator.prompts:
|
|
prompt_file = tmpdir_prompts / prompt.name
|
|
value = get_prompt_value(prompt.name)
|
|
prompt_file.write_text(value)
|
|
from clan_cli import bwrap
|
|
|
|
final_script = generator.final_script()
|
|
|
|
if sys.platform == "linux":
|
|
if bwrap.bubblewrap_works():
|
|
cmd = bubblewrap_cmd(str(final_script), tmpdir)
|
|
else:
|
|
if not no_sandbox:
|
|
msg = (
|
|
f"Cannot safely execute generator {generator.name}: Sandboxing is not available on this system\n"
|
|
f"Re-run 'vars generate' with '--no-sandbox' to disable sandboxing"
|
|
)
|
|
raise ClanError(msg)
|
|
cmd = ["bash", "-c", str(final_script)]
|
|
else:
|
|
# TODO: implement sandboxing for macOS using sandbox-exec
|
|
cmd = ["bash", "-c", str(final_script)]
|
|
run(cmd, RunOpts(env=env))
|
|
files_to_commit = []
|
|
# store secrets
|
|
files = generator.files
|
|
public_changed = False
|
|
secret_changed = False
|
|
for file in files:
|
|
secret_file = tmpdir_out / file.name
|
|
if not secret_file.is_file():
|
|
msg = f"did not generate a file for '{file.name}' when running the following command:\n"
|
|
msg += str(final_script)
|
|
raise ClanError(msg)
|
|
if file.secret:
|
|
file_path = secret_vars_store.set(
|
|
generator,
|
|
file,
|
|
secret_file.read_bytes(),
|
|
)
|
|
secret_changed = True
|
|
else:
|
|
file_path = public_vars_store.set(
|
|
generator,
|
|
file,
|
|
secret_file.read_bytes(),
|
|
)
|
|
public_changed = True
|
|
if file_path:
|
|
files_to_commit.append(file_path)
|
|
validation = generator.validation()
|
|
if validation is not None:
|
|
if public_changed:
|
|
files_to_commit.append(
|
|
public_vars_store.set_validation(generator, validation)
|
|
)
|
|
if secret_changed:
|
|
files_to_commit.append(
|
|
secret_vars_store.set_validation(generator, validation)
|
|
)
|
|
commit_files(
|
|
files_to_commit,
|
|
machine.flake_dir,
|
|
f"Update vars via generator {generator.name} for machine {machine.name}",
|
|
)
|
|
|
|
|
|
def _ask_prompts(
|
|
generator: Generator,
|
|
) -> dict[str, str]:
|
|
prompt_values: dict[str, str] = {}
|
|
for prompt in generator.prompts:
|
|
var_id = f"{generator.name}/{prompt.name}"
|
|
prompt_values[prompt.name] = ask(
|
|
var_id,
|
|
prompt.prompt_type,
|
|
prompt.description if prompt.description != prompt.name else None,
|
|
)
|
|
return prompt_values
|
|
|
|
|
|
def get_closure(
|
|
machine: "Machine",
|
|
generator_name: str | None,
|
|
regenerate: bool,
|
|
) -> list[Generator]:
|
|
from .graph import all_missing_closure, full_closure
|
|
|
|
vars_generators = machine.vars_generators
|
|
generators: dict[str, Generator] = {
|
|
generator.name: generator for generator in vars_generators
|
|
}
|
|
|
|
# TODO: we should remove this
|
|
for generator in vars_generators:
|
|
generator.machine(machine)
|
|
|
|
if generator_name is None: # all generators selected
|
|
if regenerate:
|
|
return full_closure(generators)
|
|
return all_missing_closure(generators)
|
|
# specific generator selected
|
|
if regenerate:
|
|
return requested_closure([generator_name], generators)
|
|
return minimal_closure([generator_name], generators)
|
|
|
|
|
|
def _migration_file_exists(
|
|
machine: "Machine",
|
|
generator: Generator,
|
|
fact_name: str,
|
|
) -> bool:
|
|
for file in generator.files:
|
|
if file.name == fact_name:
|
|
break
|
|
else:
|
|
msg = f"Could not find file {fact_name} in generator {generator.name}"
|
|
raise ClanError(msg)
|
|
|
|
is_secret = file.secret
|
|
if is_secret:
|
|
if machine.secret_facts_store.exists(generator.name, fact_name):
|
|
return True
|
|
machine.debug(
|
|
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the secret fact store"
|
|
)
|
|
if not is_secret:
|
|
if machine.public_facts_store.exists(generator.name, fact_name):
|
|
return True
|
|
machine.debug(
|
|
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the public fact store"
|
|
)
|
|
return False
|
|
|
|
|
|
def _migrate_file(
|
|
machine: "Machine",
|
|
generator: Generator,
|
|
var_name: str,
|
|
service_name: str,
|
|
fact_name: str,
|
|
) -> list[Path]:
|
|
for file in generator.files:
|
|
if file.name == var_name:
|
|
break
|
|
else:
|
|
msg = f"Could not find file {fact_name} in generator {generator.name}"
|
|
raise ClanError(msg)
|
|
|
|
paths = []
|
|
|
|
if file.secret:
|
|
old_value = machine.secret_facts_store.get(service_name, fact_name)
|
|
maybe_path = machine.secret_vars_store.set(
|
|
generator, file, old_value, is_migration=True
|
|
)
|
|
if maybe_path:
|
|
paths.append(maybe_path)
|
|
else:
|
|
old_value = machine.public_facts_store.get(service_name, fact_name)
|
|
maybe_path = machine.public_vars_store.set(
|
|
generator, file, old_value, is_migration=True
|
|
)
|
|
if maybe_path:
|
|
paths.append(maybe_path)
|
|
|
|
return paths
|
|
|
|
|
|
def _migrate_files(
|
|
machine: "Machine",
|
|
generator: Generator,
|
|
) -> None:
|
|
not_found = []
|
|
files_to_commit = []
|
|
for file in generator.files:
|
|
if _migration_file_exists(machine, generator, file.name):
|
|
assert generator.migrate_fact is not None
|
|
files_to_commit += _migrate_file(
|
|
machine, generator, file.name, generator.migrate_fact, file.name
|
|
)
|
|
else:
|
|
not_found.append(file.name)
|
|
if len(not_found) > 0:
|
|
msg = f"Could not migrate the following files for generator {generator.name}, as no fact or secret exists with the same name: {not_found}"
|
|
raise ClanError(msg)
|
|
commit_files(
|
|
files_to_commit,
|
|
machine.flake_dir,
|
|
f"migrated facts to vars for generator {generator.name} for machine {machine.name}",
|
|
)
|
|
|
|
|
|
def _check_can_migrate(
|
|
machine: "Machine",
|
|
generator: Generator,
|
|
) -> bool:
|
|
service_name = generator.migrate_fact
|
|
if not service_name:
|
|
return False
|
|
# ensure that none of the generated vars already exist in the store
|
|
all_files_missing = True
|
|
all_files_present = True
|
|
for file in generator.files:
|
|
if file.secret:
|
|
if machine.secret_vars_store.exists(generator, file.name):
|
|
all_files_missing = False
|
|
else:
|
|
all_files_present = False
|
|
else:
|
|
if machine.public_vars_store.exists(generator, file.name):
|
|
all_files_missing = False
|
|
else:
|
|
all_files_present = False
|
|
|
|
if not all_files_present and not all_files_missing:
|
|
msg = f"Cannot migrate facts for generator {generator.name} as some files already exist in the store"
|
|
raise ClanError(msg)
|
|
if all_files_present:
|
|
# all files already migrated, no need to run migration again
|
|
return False
|
|
|
|
# ensure that all files can be migrated (exists in the corresponding fact store)
|
|
return bool(
|
|
all(
|
|
_migration_file_exists(machine, generator, file.name)
|
|
for file in generator.files
|
|
)
|
|
)
|
|
|
|
|
|
def generate_vars_for_machine(
|
|
machine: "Machine",
|
|
generator_name: str | None,
|
|
regenerate: bool,
|
|
no_sandbox: bool = False,
|
|
) -> bool:
|
|
_generator = None
|
|
if generator_name:
|
|
for generator in machine.vars_generators:
|
|
if generator.name == generator_name:
|
|
_generator = generator
|
|
break
|
|
|
|
pub_healtcheck_msg = machine.public_vars_store.health_check(_generator)
|
|
sec_healtcheck_msg = machine.secret_vars_store.health_check(_generator)
|
|
|
|
if pub_healtcheck_msg or sec_healtcheck_msg:
|
|
msg = f"Health check failed for machine {machine.name}:\n"
|
|
if pub_healtcheck_msg:
|
|
msg += f"Public vars store: {pub_healtcheck_msg}\n"
|
|
if sec_healtcheck_msg:
|
|
msg += f"Secret vars store: {sec_healtcheck_msg}"
|
|
raise ClanError(msg)
|
|
|
|
closure = get_closure(machine, generator_name, regenerate)
|
|
if len(closure) == 0:
|
|
return False
|
|
for generator in closure:
|
|
if _check_can_migrate(machine, generator):
|
|
_migrate_files(machine, generator)
|
|
else:
|
|
execute_generator(
|
|
machine=machine,
|
|
generator=generator,
|
|
secret_vars_store=machine.secret_vars_store,
|
|
public_vars_store=machine.public_vars_store,
|
|
prompt_values=_ask_prompts(generator),
|
|
no_sandbox=no_sandbox,
|
|
)
|
|
return True
|
|
|
|
|
|
def generate_vars(
|
|
machines: list["Machine"],
|
|
generator_name: str | None = None,
|
|
regenerate: bool = False,
|
|
no_sandbox: bool = False,
|
|
) -> bool:
|
|
was_regenerated = False
|
|
for machine in machines:
|
|
errors = []
|
|
try:
|
|
was_regenerated |= generate_vars_for_machine(
|
|
machine, generator_name, regenerate, no_sandbox=no_sandbox
|
|
)
|
|
except Exception as exc:
|
|
errors += [(machine, exc)]
|
|
if len(errors) == 1:
|
|
raise errors[0][1]
|
|
if len(errors) > 1:
|
|
msg = f"Failed to generate facts for {len(errors)} hosts:"
|
|
for machine, error in errors:
|
|
msg += f"\n{machine}: {error}"
|
|
raise ClanError(msg) from errors[0][1]
|
|
|
|
if not was_regenerated and len(machines) > 0:
|
|
for machine in machines:
|
|
machine.info("All vars are already up to date")
|
|
|
|
return was_regenerated
|
|
|
|
|
|
def generate_command(args: argparse.Namespace) -> None:
|
|
if args.flake is None:
|
|
msg = "Could not find clan flake toplevel directory"
|
|
raise ClanError(msg)
|
|
if len(args.machines) == 0:
|
|
machines = get_all_machines(args.flake, args.option)
|
|
else:
|
|
machines = get_selected_machines(args.flake, args.option, args.machines)
|
|
|
|
# prefetch all vars
|
|
config = nix_config()
|
|
system = config["system"]
|
|
machine_names = [machine.name for machine in machines]
|
|
# test
|
|
args.flake.precache(
|
|
[
|
|
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.validationHash",
|
|
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.system.clan.deployment.file",
|
|
]
|
|
)
|
|
has_changed = generate_vars(
|
|
machines, args.generator, args.regenerate, no_sandbox=args.no_sandbox
|
|
)
|
|
if has_changed:
|
|
args.flake.invalidate_cache()
|
|
|
|
|
|
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
|
|
machines_parser = parser.add_argument(
|
|
"machines",
|
|
type=str,
|
|
help="machine to generate facts for. if empty, generate facts for all machines",
|
|
nargs="*",
|
|
default=[],
|
|
)
|
|
add_dynamic_completer(machines_parser, complete_machines)
|
|
|
|
service_parser = parser.add_argument(
|
|
"--generator",
|
|
"-g",
|
|
type=str,
|
|
help="execute only the specified generator. If unset, execute all generators",
|
|
default=None,
|
|
)
|
|
add_dynamic_completer(service_parser, complete_services_for_machine)
|
|
|
|
parser.add_argument(
|
|
"--regenerate",
|
|
"-r",
|
|
action=argparse.BooleanOptionalAction,
|
|
help="whether to regenerate facts for the specified machine",
|
|
default=None,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--no-sandbox",
|
|
action="store_true",
|
|
help="disable sandboxing when executing the generator. WARNING: potentially executing untrusted code from external clan modules",
|
|
default=False,
|
|
)
|
|
|
|
parser.set_defaults(func=generate_command)
|