diff --git a/nixosModules/clanCore/vars/default.nix b/nixosModules/clanCore/vars/default.nix index c253908c0..0afedaa12 100644 --- a/nixosModules/clanCore/vars/default.nix +++ b/nixosModules/clanCore/vars/default.nix @@ -39,7 +39,7 @@ in vars = { generators = lib.flip lib.mapAttrs config.clan.core.vars.generators ( _name: generator: { - inherit (generator) dependencies finalScript; + inherit (generator) dependencies finalScript prompts; files = lib.flip lib.mapAttrs generator.files (_name: file: { inherit (file) secret; }); } ); diff --git a/nixosModules/clanCore/vars/interface.nix b/nixosModules/clanCore/vars/interface.nix index 36996702c..4b42766f3 100644 --- a/nixosModules/clanCore/vars/interface.nix +++ b/nixosModules/clanCore/vars/interface.nix @@ -108,8 +108,9 @@ in Prompts are available to the generator script as files. For example, a prompt named 'prompt1' will be available via $prompts/prompt1 ''; + default = { }; type = attrsOf (submodule { - options = { + options = options { description = { description = '' The description of the prompted value diff --git a/pkgs/clan-cli/clan_cli/vars/generate.py b/pkgs/clan-cli/clan_cli/vars/generate.py index fee3c5b68..d6f80e01a 100644 --- a/pkgs/clan-cli/clan_cli/vars/generate.py +++ b/pkgs/clan-cli/clan_cli/vars/generate.py @@ -2,9 +2,8 @@ import argparse import importlib import logging import os -import subprocess import sys -from collections.abc import Callable +from getpass import getpass from graphlib import TopologicalSorter from pathlib import Path from tempfile import TemporaryDirectory @@ -29,17 +28,7 @@ from .secret_modules import SecretStoreBase log = logging.getLogger(__name__) -def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str: - """ - Read multi-line input from stdin. - """ - print(prompt, flush=True) - proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True) - log.info("Input received. Processing...") - return proc.stdout - - -def bubblewrap_cmd(generator: str, generator_dir: Path, dep_tmpdir: Path) -> list[str]: +def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]: # fmt: off return nix_shell( [ @@ -51,8 +40,7 @@ def bubblewrap_cmd(generator: str, generator_dir: Path, dep_tmpdir: Path) -> lis "--ro-bind", "/nix/store", "/nix/store", "--tmpfs", "/usr/lib/systemd", "--dev", "/dev", - "--bind", str(generator_dir), str(generator_dir), - "--ro-bind", str(dep_tmpdir), str(dep_tmpdir), + "--bind", str(tmpdir), str(tmpdir), "--unshare-all", "--unshare-user", "--uid", "1000", @@ -92,7 +80,7 @@ def decrypt_dependencies( def dependencies_as_dir( decrypted_dependencies: dict[str, dict[str, bytes]], tmpdir: Path, -) -> Path: +) -> None: for dep_generator, files in decrypted_dependencies.items(): dep_generator_dir = tmpdir / dep_generator dep_generator_dir.mkdir() @@ -102,7 +90,6 @@ def dependencies_as_dir( file_path.touch() file_path.chmod(0o600) file_path.write_bytes(file) - return tmpdir def execute_generator( @@ -111,10 +98,7 @@ def execute_generator( regenerate: bool, secret_vars_store: SecretStoreBase, public_vars_store: FactStoreBase, - dep_tmpdir: Path, - prompt: Callable[[str], str], ) -> bool: - generator_dir = dep_tmpdir / generator_name # check if all secrets exist and generate them if at least one is missing needs_regeneration = not check_secrets(machine, generator_name=generator_name) log.debug(f"{generator_name} needs_regeneration: {needs_regeneration}") @@ -124,51 +108,65 @@ def execute_generator( msg = f"flake is not a Path: {machine.flake}" msg += "fact/secret generation is only supported for local flakes" - # compatibility for old outputs.nix users generator = machine.vars_generators[generator_name]["finalScript"] - # if machine.vars_data[generator_name]["generator"]["prompt"]: - # prompt_value = prompt(machine.vars_data[generator_name]["generator"]["prompt"]) - # env["prompt_value"] = prompt_value # build temporary file tree of dependencies decrypted_dependencies = decrypt_dependencies( machine, generator_name, secret_vars_store, public_vars_store ) env = os.environ.copy() - generator_dir.mkdir(parents=True) - env["out"] = str(generator_dir) with TemporaryDirectory() as tmp: - dep_tmpdir = dependencies_as_dir(decrypted_dependencies, Path(tmp)) - env["in"] = str(dep_tmpdir) + tmpdir = Path(tmp) + tmpdir_in = tmpdir / "in" + tmpdir_prompts = tmpdir / "prompts" + tmpdir_out = tmpdir / "out" + tmpdir_in.mkdir() + tmpdir_out.mkdir() + env["in"] = str(tmpdir_in) + env["out"] = str(tmpdir_out) + # populate dependency inputs + dependencies_as_dir(decrypted_dependencies, tmpdir_in) + # populate prompted values + # TODO: make prompts rest API friendly + if machine.vars_generators[generator_name]["prompts"]: + tmpdir_prompts.mkdir() + env["prompts"] = str(tmpdir_prompts) + for prompt_name, prompt in machine.vars_generators[generator_name][ + "prompts" + ].items(): + prompt_file = tmpdir_prompts / prompt_name + value = prompt_func(prompt["description"], prompt["type"]) + prompt_file.write_text(value) + if sys.platform == "linux": - cmd = bubblewrap_cmd(generator, generator_dir, dep_tmpdir=dep_tmpdir) + cmd = bubblewrap_cmd(generator, tmpdir) else: cmd = ["bash", "-c", generator] run( cmd, env=env, ) - files_to_commit = [] - # store secrets - files = machine.vars_generators[generator_name]["files"] - for file_name, file in files.items(): - groups = machine.deployment["sops"]["defaultGroups"] + files_to_commit = [] + # store secrets + files = machine.vars_generators[generator_name]["files"] + for file_name, file in files.items(): + groups = machine.deployment["sops"]["defaultGroups"] - secret_file = generator_dir / file_name - if not secret_file.is_file(): - msg = f"did not generate a file for '{file_name}' when running the following command:\n" - msg += generator - raise ClanError(msg) - if file["secret"]: - file_path = secret_vars_store.set( - generator_name, file_name, secret_file.read_bytes(), groups - ) - else: - file_path = public_vars_store.set( - generator_name, file_name, secret_file.read_bytes() - ) - if file_path: - files_to_commit.append(file_path) + secret_file = tmpdir_out / file_name + if not secret_file.is_file(): + msg = f"did not generate a file for '{file_name}' when running the following command:\n" + msg += generator + raise ClanError(msg) + if file["secret"]: + file_path = secret_vars_store.set( + generator_name, file_name, secret_file.read_bytes(), groups + ) + else: + file_path = public_vars_store.set( + generator_name, file_name, secret_file.read_bytes() + ) + if file_path: + files_to_commit.append(file_path) commit_files( files_to_commit, machine.flake_dir, @@ -177,9 +175,18 @@ def execute_generator( return True -def prompt_func(text: str) -> str: - print(f"{text}: ") - return read_multiline_input() +def prompt_func(description: str, input_type: str) -> str: + if input_type == "line": + result = input(f"Enter the value for {description}: ") + elif input_type == "multiline": + print(f"Enter the value for {description} (Finish with Ctrl-D): ") + result = sys.stdin.read() + elif input_type == "hidden": + result = getpass(f"Enter the value for {description} (hidden): ") + else: + raise ClanError(f"Unknown input type: {input_type} for prompt {description}") + log.info("Input received. Processing...") + return result def _get_subgraph(graph: dict[str, set], vertex: str) -> dict[str, set]: @@ -197,11 +204,7 @@ def _generate_vars_for_machine( machine: Machine, generator_name: str | None, regenerate: bool, - tmpdir: Path, - prompt: Callable[[str], str] = prompt_func, ) -> bool: - local_temp = tmpdir / machine.name - local_temp.mkdir() secret_vars_module = importlib.import_module(machine.secret_vars_module) secret_vars_store = secret_vars_module.SecretStore(machine=machine) @@ -216,13 +219,6 @@ def _generate_vars_for_machine( f"Could not find generator with name: {generator_name}. The following generators are available: {generators}" ) - # if generator_name: - # machine_generator_facts = { - # generator_name: machine.vars_generators[generator_name] - # } - # else: - # machine_generator_facts = machine.vars_generators - graph = { gen_name: set(generator["dependencies"]) for gen_name, generator in machine.vars_generators.items() @@ -250,8 +246,6 @@ def _generate_vars_for_machine( regenerate=regenerate, secret_vars_store=secret_vars_store, public_vars_store=public_vars_store, - dep_tmpdir=local_temp, - prompt=prompt, ) if machine_updated: # flush caches to make sure the new secrets are available in evaluation @@ -263,25 +257,21 @@ def generate_vars( machines: list[Machine], generator_name: str | None, regenerate: bool, - prompt: Callable[[str], str] = prompt_func, ) -> bool: was_regenerated = False - with TemporaryDirectory() as tmp: - tmpdir = Path(tmp) - - for machine in machines: - errors = 0 - try: - was_regenerated |= _generate_vars_for_machine( - machine, generator_name, regenerate, tmpdir, prompt - ) - except Exception as exc: - log.error(f"Failed to generate facts for {machine.name}: {exc}") - errors += 1 - if errors > 0: - raise ClanError( - f"Failed to generate facts for {errors} hosts. Check the logs above" - ) + for machine in machines: + errors = 0 + try: + was_regenerated |= _generate_vars_for_machine( + machine, generator_name, regenerate + ) + except Exception as exc: + log.error(f"Failed to generate facts for {machine.name}: {exc}") + errors += 1 + if errors > 0: + raise ClanError( + f"Failed to generate facts for {errors} hosts. Check the logs above" + ) if not was_regenerated: print("All secrets and facts are already up to date") diff --git a/pkgs/clan-cli/tests/test_vars.py b/pkgs/clan-cli/tests/test_vars.py index e79cfb207..66e82ab7f 100644 --- a/pkgs/clan-cli/tests/test_vars.py +++ b/pkgs/clan-cli/tests/test_vars.py @@ -1,6 +1,7 @@ import os from collections import defaultdict from collections.abc import Callable +from io import StringIO from pathlib import Path from tempfile import TemporaryDirectory from typing import Any @@ -55,7 +56,8 @@ def test_dependencies_as_files() -> None: ), ) with TemporaryDirectory() as tmpdir: - dep_tmpdir = dependencies_as_dir(decrypted_dependencies, Path(tmpdir)) + dep_tmpdir = Path(tmpdir) + dependencies_as_dir(decrypted_dependencies, dep_tmpdir) assert dep_tmpdir.is_dir() assert (dep_tmpdir / "gen_1" / "var_1a").read_bytes() == b"var_1a" assert (dep_tmpdir / "gen_1" / "var_1b").read_bytes() == b"var_1b" @@ -232,3 +234,40 @@ def test_dependant_generators( ) assert child_file_path.is_file() assert child_file_path.read_text() == "hello\n" + + +@pytest.mark.impure +@pytest.mark.parametrize( + ("prompt_type", "input_value"), + [ + ("line", "my input"), + ("multiline", "my\nmultiline\ninput\n"), + # The hidden type cannot easily be tested, as getpass() reads from /dev/tty directly + # ("hidden", "my hidden input"), + ], +) +def test_prompt( + monkeypatch: pytest.MonkeyPatch, + temporary_home: Path, + prompt_type: str, + input_value: str, +) -> None: + config = nested_dict() + my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] + my_generator["files"]["my_value"]["secret"] = False + my_generator["prompts"]["prompt1"]["description"] = "dream2nix" + my_generator["prompts"]["prompt1"]["type"] = prompt_type + my_generator["script"] = "cat $prompts/prompt1 > $out/my_value" + flake = generate_flake( + temporary_home, + flake_template=CLAN_CORE / "templates" / "minimal", + machine_configs=dict(my_machine=config), + ) + monkeypatch.chdir(flake.path) + monkeypatch.setattr("sys.stdin", StringIO(input_value)) + cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) + var_file_path = ( + flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_value" + ) + assert var_file_path.is_file() + assert var_file_path.read_text() == input_value