Files
clan-core/pkgs/clan-cli/clan_cli/vars/generate.py
Jeremy Fleischman 5726dd1010 Add /bin/sh to bubblewrap sandbox
I ran into this error when trying to run `opendkim-genkey` in a vars
generator:

```console
=========================================================================== Command ===========================================================================
nix \
    --extra-experimental-features 'nix-command flakes' \
    shell \
    --inputs-from /nix/store/9r3ddw80dz4qzci9pj57ppbh6gy2pgv9-clan-cli/lib/python3.12/site-packages/clan_cli/nixpkgs \
    'nixpkgs#bash' \
    'nixpkgs#bubblewrap' \
    -c bwrap \
    --unshare-all --tmpfs \
    / \
    --ro-bind /nix/store \
    /nix/store \
    --dev /dev \
    --bind /tmp/nix-shell.ClOjgJ/vars-kh4qrnas \
    /tmp/nix-shell.ClOjgJ/vars-kh4qrnas \
    --chdir / \
    --bind /proc \
    /proc \
    --uid 1000 \
    --gid 1000 \
    -- bash \
    -c /nix/store/p0089w4y1w3h535g7ipv4jl4r6mb2hs2-generator-dkim-playground.jflei.com.mail

=========================================================================== Stderr ============================================================================
perl: warning: Setting locale failed.
perl: warning: Please check that your locale settings:
	LANGUAGE = (unset),
	LC_ALL = (unset),
	LC_CTYPE = (unset),
	LC_NUMERIC = (unset),
	LC_COLLATE = (unset),
	LC_TIME = (unset),
	LC_MESSAGES = (unset),
	LC_MONETARY = (unset),
	LC_ADDRESS = (unset),
	LC_IDENTIFICATION = (unset),
	LC_MEASUREMENT = (unset),
	LC_PAPER = (unset),
	LC_TELEPHONE = (unset),
	LC_NAME = (unset),
	LANG = "en_US.UTF-8"
    are supported and installed on your system.
perl: warning: Falling back to the standard locale ("C").
Can't exec "/bin/sh": No such file or directory at /nix/store/nfawbww80p1hgpymfgq1vq8wqlak75yh-opendkim-2.11.0-Beta2/sbin/.opendkim-genkey-wrapped line 139.
.opendkim-genkey-wrapped: openssl died with signal %d
127
Return Code: 1

1 hosts failed with an error. Check the logs above
```

As we allow `/bin/sh` in the nix build sandbox, I assume we're OK
allowing it here as well?
2025-05-09 18:33:08 -07:00

542 lines
18 KiB
Python

import argparse
import logging
import os
import shutil
import sys
from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any
from clan_cli.cmd import RunOpts, run
from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
complete_services_for_machine,
)
from clan_cli.errors import ClanError
from clan_cli.flake import Flake
from clan_cli.git import commit_files
from clan_cli.machines.inventory import get_all_machines, get_selected_machines
from clan_cli.nix import nix_config, nix_shell, nix_test_store
from clan_cli.vars._types import StoreBase
from clan_cli.vars.migration import check_can_migrate, migrate_files
from clan_lib.api import API
from .check import check_vars
from .graph import (
minimal_closure,
requested_closure,
)
from .prompt import Prompt, ask
from .var import Var
log = logging.getLogger(__name__)
if TYPE_CHECKING:
from clan_cli.machines.machines import Machine
@dataclass
class Generator:
name: str
files: list[Var] = field(default_factory=list)
share: bool = False
prompts: list[Prompt] = field(default_factory=list)
dependencies: list[str] = field(default_factory=list)
migrate_fact: str | None = None
# TODO: remove this
_machine: "Machine | None" = None
def machine(self, machine: "Machine") -> None:
self._machine = machine
@cached_property
def exists(self) -> bool:
assert self._machine is not None
return check_vars(self._machine, generator_name=self.name)
@classmethod
def from_json(cls: type["Generator"], data: dict[str, Any]) -> "Generator":
return cls(
name=data["name"],
share=data["share"],
files=[Var.from_json(data["name"], f) for f in data["files"].values()],
dependencies=data["dependencies"],
migrate_fact=data["migrateFact"],
prompts=[Prompt.from_json(p) for p in data["prompts"].values()],
)
def final_script(self) -> Path:
assert self._machine is not None
final_script = self._machine.build_nix(
f'config.clan.core.vars.generators."{self.name}".finalScript'
)
return final_script
def validation(self) -> str | None:
assert self._machine is not None
return self._machine.eval_nix(
f'config.clan.core.vars.generators."{self.name}".validationHash'
)
def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]:
test_store = nix_test_store()
real_bash_path = Path("bash")
if os.environ.get("IN_NIX_SANDBOX"):
bash_executable_path = Path(str(shutil.which("bash")))
real_bash_path = bash_executable_path.resolve()
# fmt: off
return nix_shell(
[
"bash",
"bubblewrap",
],
[
"bwrap",
"--unshare-all",
"--tmpfs", "/",
"--ro-bind", "/nix/store", "/nix/store",
"--ro-bind", "/bin/sh", "/bin/sh",
*(["--ro-bind", str(test_store), str(test_store)] if test_store else []),
"--dev", "/dev",
# not allowed to bind procfs in some sandboxes
"--bind", str(tmpdir), str(tmpdir),
"--chdir", "/",
# Doesn't work in our CI?
#"--proc", "/proc",
#"--hostname", "facts",
"--bind", "/proc", "/proc",
"--uid", "1000",
"--gid", "1000",
"--",
str(real_bash_path), "-c", generator
]
)
# fmt: on
# TODO: implement caching to not decrypt the same secret multiple times
def decrypt_dependencies(
machine: "Machine",
generator: Generator,
secret_vars_store: StoreBase,
public_vars_store: StoreBase,
) -> dict[str, dict[str, bytes]]:
decrypted_dependencies: dict[str, Any] = {}
for generator_name in set(generator.dependencies):
decrypted_dependencies[generator_name] = {}
for dep_generator in machine.vars_generators:
if generator_name == dep_generator.name:
break
else:
msg = f"Could not find dependent generator {generator_name} in machine {machine.name}"
raise ClanError(msg)
dep_files = dep_generator.files
for file in dep_files:
if file.secret:
decrypted_dependencies[generator_name][file.name] = (
secret_vars_store.get(dep_generator, file.name)
)
else:
decrypted_dependencies[generator_name][file.name] = (
public_vars_store.get(dep_generator, file.name)
)
return decrypted_dependencies
# decrypt dependencies and return temporary file tree
def dependencies_as_dir(
decrypted_dependencies: dict[str, dict[str, bytes]],
tmpdir: Path,
) -> None:
for dep_generator, files in decrypted_dependencies.items():
dep_generator_dir = tmpdir / dep_generator
# Explicitly specify parents and exist_ok default values for clarity
dep_generator_dir.mkdir(mode=0o700, parents=False, exist_ok=False)
for file_name, file in files.items():
file_path = dep_generator_dir / file_name
# Avoid the file creation and chmod race
# If the file already existed,
# we'd have to create a temp one and rename instead;
# however, this is a clean dir so there shouldn't be any collisions
file_path.touch(mode=0o600, exist_ok=False)
file_path.write_bytes(file)
def execute_generator(
machine: "Machine",
generator: Generator,
secret_vars_store: StoreBase,
public_vars_store: StoreBase,
prompt_values: dict[str, str],
no_sandbox: bool = False,
) -> None:
if not isinstance(machine.flake, Path):
msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes"
# build temporary file tree of dependencies
decrypted_dependencies = decrypt_dependencies(
machine,
generator,
secret_vars_store,
public_vars_store,
)
def get_prompt_value(prompt_name: str) -> str:
try:
return prompt_values[prompt_name]
except KeyError as e:
msg = f"prompt value for '{prompt_name}' in generator {generator.name} not provided"
raise ClanError(msg) from e
env = os.environ.copy()
with TemporaryDirectory(prefix="vars-") as _tmpdir:
tmpdir = Path(_tmpdir).resolve()
tmpdir_in = tmpdir / "in"
tmpdir_prompts = tmpdir / "prompts"
tmpdir_out = tmpdir / "out"
tmpdir_in.mkdir()
tmpdir_out.mkdir()
env["in"] = str(tmpdir_in)
env["out"] = str(tmpdir_out)
# populate dependency inputs
dependencies_as_dir(decrypted_dependencies, tmpdir_in)
# populate prompted values
# TODO: make prompts rest API friendly
if generator.prompts:
tmpdir_prompts.mkdir()
env["prompts"] = str(tmpdir_prompts)
for prompt in generator.prompts:
prompt_file = tmpdir_prompts / prompt.name
value = get_prompt_value(prompt.name)
prompt_file.write_text(value)
from clan_cli import bwrap
final_script = generator.final_script()
if sys.platform == "linux":
if bwrap.bubblewrap_works():
cmd = bubblewrap_cmd(str(final_script), tmpdir)
else:
if not no_sandbox:
msg = (
f"Cannot safely execute generator {generator.name}: Sandboxing is not available on this system\n"
f"Re-run 'vars generate' with '--no-sandbox' to disable sandboxing"
)
raise ClanError(msg)
cmd = ["bash", "-c", str(final_script)]
else:
# TODO: implement sandboxing for macOS using sandbox-exec
cmd = ["bash", "-c", str(final_script)]
run(cmd, RunOpts(env=env))
files_to_commit = []
# store secrets
files = generator.files
public_changed = False
secret_changed = False
for file in files:
secret_file = tmpdir_out / file.name
if not secret_file.is_file():
msg = f"did not generate a file for '{file.name}' when running the following command:\n"
msg += str(final_script)
raise ClanError(msg)
if file.secret:
file_path = secret_vars_store.set(
generator,
file,
secret_file.read_bytes(),
)
secret_changed = True
else:
file_path = public_vars_store.set(
generator,
file,
secret_file.read_bytes(),
)
public_changed = True
if file_path:
files_to_commit.append(file_path)
validation = generator.validation()
if validation is not None:
if public_changed:
files_to_commit.append(
public_vars_store.set_validation(generator, validation)
)
if secret_changed:
files_to_commit.append(
secret_vars_store.set_validation(generator, validation)
)
commit_files(
files_to_commit,
machine.flake_dir,
f"Update vars via generator {generator.name} for machine {machine.name}",
)
def _ask_prompts(
generator: Generator,
) -> dict[str, str]:
prompt_values: dict[str, str] = {}
for prompt in generator.prompts:
var_id = f"{generator.name}/{prompt.name}"
prompt_values[prompt.name] = ask(
var_id,
prompt.prompt_type,
prompt.description if prompt.description != prompt.name else None,
)
return prompt_values
def _get_previous_value(
machine: "Machine",
generator: Generator,
prompt: Prompt,
) -> str | None:
if not prompt.persist:
return None
pub_store = machine.public_vars_store
if pub_store.exists(generator, prompt.name):
return pub_store.get(generator, prompt.name).decode()
sec_store = machine.secret_vars_store
if sec_store.exists(generator, prompt.name):
return sec_store.get(generator, prompt.name).decode()
return None
def get_closure(
machine: "Machine",
generator_name: str | None,
regenerate: bool,
include_previous_values: bool = False,
) -> list[Generator]:
from .graph import all_missing_closure, full_closure
vars_generators = machine.vars_generators
generators: dict[str, Generator] = {
generator.name: generator for generator in vars_generators
}
# TODO: we should remove this
for generator in vars_generators:
generator.machine(machine)
result_closure = []
if generator_name is None: # all generators selected
if regenerate:
result_closure = full_closure(generators)
else:
result_closure = all_missing_closure(generators)
# specific generator selected
elif regenerate:
result_closure = requested_closure([generator_name], generators)
else:
result_closure = minimal_closure([generator_name], generators)
if include_previous_values:
for generator in result_closure:
for prompt in generator.prompts:
prompt.previous_value = _get_previous_value(machine, generator, prompt)
return result_closure
@API.register
def get_generators_closure(
machine_name: str,
base_dir: Path,
regenerate: bool = False,
include_previous_values: bool = False,
) -> list[Generator]:
from clan_cli.machines.machines import Machine
return get_closure(
machine=Machine(name=machine_name, flake=Flake(str(base_dir))),
generator_name=None,
regenerate=regenerate,
include_previous_values=include_previous_values,
)
def _generate_vars_for_machine(
machine: "Machine",
generators: list[Generator],
all_prompt_values: dict[str, dict[str, str]],
no_sandbox: bool = False,
) -> bool:
for generator in generators:
if check_can_migrate(machine, generator):
migrate_files(machine, generator)
else:
execute_generator(
machine=machine,
generator=generator,
secret_vars_store=machine.secret_vars_store,
public_vars_store=machine.public_vars_store,
prompt_values=all_prompt_values.get(generator.name, {}),
no_sandbox=no_sandbox,
)
return True
@API.register
def generate_vars_for_machine(
machine_name: str,
generators: list[str],
all_prompt_values: dict[str, dict[str, str]],
base_dir: Path,
no_sandbox: bool = False,
) -> bool:
from clan_cli.machines.machines import Machine
machine = Machine(name=machine_name, flake=Flake(str(base_dir)))
generators_set = set(generators)
generators_ = [g for g in machine.vars_generators if g.name in generators_set]
return _generate_vars_for_machine(
machine=machine,
generators=generators_,
all_prompt_values=all_prompt_values,
no_sandbox=no_sandbox,
)
def generate_vars_for_machine_interactive(
machine: "Machine",
generator_name: str | None,
regenerate: bool,
no_sandbox: bool = False,
) -> bool:
_generator = None
if generator_name:
for generator in machine.vars_generators:
if generator.name == generator_name:
_generator = generator
break
pub_healtcheck_msg = machine.public_vars_store.health_check(_generator)
sec_healtcheck_msg = machine.secret_vars_store.health_check(_generator)
if pub_healtcheck_msg or sec_healtcheck_msg:
msg = f"Health check failed for machine {machine.name}:\n"
if pub_healtcheck_msg:
msg += f"Public vars store: {pub_healtcheck_msg}\n"
if sec_healtcheck_msg:
msg += f"Secret vars store: {sec_healtcheck_msg}"
raise ClanError(msg)
generators = get_closure(machine, generator_name, regenerate)
if len(generators) == 0:
return False
all_prompt_values = {}
for generator in generators:
all_prompt_values[generator.name] = _ask_prompts(generator)
return _generate_vars_for_machine(
machine,
generators,
all_prompt_values,
no_sandbox=no_sandbox,
)
def generate_vars(
machines: list["Machine"],
generator_name: str | None = None,
regenerate: bool = False,
no_sandbox: bool = False,
) -> bool:
was_regenerated = False
for machine in machines:
errors = []
try:
was_regenerated |= generate_vars_for_machine_interactive(
machine, generator_name, regenerate, no_sandbox=no_sandbox
)
except Exception as exc:
errors += [(machine, exc)]
if len(errors) == 1:
raise errors[0][1]
if len(errors) > 1:
msg = f"Failed to generate facts for {len(errors)} hosts:"
for machine, error in errors:
msg += f"\n{machine}: {error}"
raise ClanError(msg) from errors[0][1]
if not was_regenerated and len(machines) > 0:
for machine in machines:
machine.info("All vars are already up to date")
return was_regenerated
def generate_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
if len(args.machines) == 0:
machines = get_all_machines(args.flake, args.option)
else:
machines = get_selected_machines(args.flake, args.option, args.machines)
# prefetch all vars
config = nix_config()
system = config["system"]
machine_names = [machine.name for machine in machines]
# test
args.flake.precache(
[
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.validationHash",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.system.clan.deployment.file",
]
)
has_changed = generate_vars(
machines, args.generator, args.regenerate, no_sandbox=args.no_sandbox
)
if has_changed:
args.flake.invalidate_cache()
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machines",
type=str,
help="machine to generate facts for. if empty, generate facts for all machines",
nargs="*",
default=[],
)
add_dynamic_completer(machines_parser, complete_machines)
service_parser = parser.add_argument(
"--generator",
"-g",
type=str,
help="execute only the specified generator. If unset, execute all generators",
default=None,
)
add_dynamic_completer(service_parser, complete_services_for_machine)
parser.add_argument(
"--regenerate",
"-r",
action=argparse.BooleanOptionalAction,
help="whether to regenerate facts for the specified machine",
default=None,
)
parser.add_argument(
"--no-sandbox",
action="store_true",
help="disable sandboxing when executing the generator. WARNING: potentially executing untrusted code from external clan modules",
default=False,
)
parser.set_defaults(func=generate_command)