facts: remove facts and related tests

This commit is contained in:
Johannes Kirschbauer
2025-08-13 19:40:00 +02:00
parent 765bdb262a
commit 842e6f1fca
28 changed files with 9 additions and 1382 deletions

View File

@@ -116,7 +116,6 @@ nav:
- Overview: reference/cli/index.md - Overview: reference/cli/index.md
- reference/cli/backups.md - reference/cli/backups.md
- reference/cli/facts.md
- reference/cli/flakes.md - reference/cli/flakes.md
- reference/cli/flash.md - reference/cli/flash.md
- reference/cli/machines.md - reference/cli/machines.md

View File

@@ -20,7 +20,6 @@ from . import (
) )
from .arg_actions import AppendOptionAction from .arg_actions import AppendOptionAction
from .clan import show from .clan import show
from .facts import cli as facts
from .flash import cli as flash_cli from .flash import cli as flash_cli
from .hyperlink import help_hyperlink from .hyperlink import help_hyperlink
from .machines import cli as machines from .machines import cli as machines
@@ -302,45 +301,6 @@ For more detailed information, visit: {help_hyperlink("secrets", "https://docs.c
) )
secrets.register_parser(parser_secrets) secrets.register_parser(parser_secrets)
parser_facts = subparsers.add_parser(
"facts",
help="Manage facts",
description="Manage facts",
epilog=(
f"""
Note: Facts are being deprecated, please use Vars instead.
For a migration guide visit: {help_hyperlink("vars", "https://docs.clan.lol/guides/migrations/migration-facts-vars")}
This subcommand provides an interface to facts of clan machines.
Facts are artifacts that a service can generate.
There are public and secret facts.
Public facts can be referenced by other machines directly.
Public facts can include: ip addresses, public keys.
Secret facts can include: passwords, private keys.
A service is an included clan-module that implements facts generation functionality.
For example the zerotier module will generate private and public facts.
In this case the public fact will be the resulting zerotier-ip of the machine.
The secret fact will be the zerotier-identity-secret, which is used by zerotier
to prove the machine has control of the zerotier-ip.
Examples:
$ clan facts generate
Will generate facts for all machines.
$ clan facts generate --service [SERVICE] --regenerate
Will regenerate facts, if they are already generated for a specific service.
This is especially useful for resetting certain passwords while leaving the rest
of the facts for a machine in place.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
facts.register_parser(parser_facts)
# like facts but with vars instead of facts # like facts but with vars instead of facts
parser_vars = subparsers.add_parser( parser_vars = subparsers.add_parser(
"vars", "vars",

View File

@@ -1,60 +0,0 @@
import argparse
import logging
from clan_lib.flake import require_flake
from clan_lib.machines.machines import Machine
from clan_cli.completions import add_dynamic_completer, complete_machines
log = logging.getLogger(__name__)
def check_secrets(machine: Machine, service: None | str = None) -> bool:
missing_secret_facts = []
missing_public_facts = []
services = [service] if service else list(machine.facts_data.keys())
for service in services:
for secret_fact in machine.facts_data[service]["secret"]:
if isinstance(secret_fact, str):
secret_name = secret_fact
else:
secret_name = secret_fact["name"]
if not machine.secret_facts_store.exists(service, secret_name):
machine.info(
f"Secret fact '{secret_fact}' for service '{service}' is missing."
)
missing_secret_facts.append((service, secret_name))
for public_fact in machine.facts_data[service]["public"]:
if not machine.public_facts_store.exists(service, public_fact):
machine.info(
f"Public fact '{public_fact}' for service '{service}' is missing."
)
missing_public_facts.append((service, public_fact))
machine.debug(f"missing_secret_facts: {missing_secret_facts}")
machine.debug(f"missing_public_facts: {missing_public_facts}")
return not (missing_secret_facts or missing_public_facts)
def check_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machine = Machine(
name=args.machine,
flake=flake,
)
check_secrets(machine, service=args.service)
def register_check_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to check secrets for",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.add_argument(
"--service",
help="the service to check",
)
parser.set_defaults(func=check_command)

View File

@@ -1,15 +0,0 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_check_command_no_flake(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "check", "machine1"])

View File

@@ -1,133 +0,0 @@
# !/usr/bin/env python3
import argparse
from clan_cli.hyperlink import help_hyperlink
from .check import register_check_parser
from .generate import register_generate_parser
from .list import register_list_parser
from .upload import register_upload_parser
# takes a (sub)parser and configures it
def register_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
check_parser = subparser.add_parser(
"check",
help="check if facts are up to date",
epilog=(
f"""
This subcommand allows checking if all facts are up to date.
Examples:
$ clan facts check [MACHINE]
Will check facts for the specified machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_check_parser(check_parser)
list_parser = subparser.add_parser(
"list",
help="list all facts",
epilog=(
f"""
This subcommand allows listing all public facts for a specific machine.
The resulting list will be a json string with the name of the fact as its key
and the fact itself as it's value.
This is how an example output might look like:
```
\u007b
"[FACT_NAME]": "[FACT]"
\u007d
```
Examples:
$ clan facts list [MACHINE]
Will list facts for the specified machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_list_parser(list_parser)
parser_generate = subparser.add_parser(
"generate",
help="generate public and secret facts for machines",
epilog=(
f"""
This subcommand allows control of the generation of facts.
Often this function will be invoked automatically on deploying machines,
but there are situations the user may want to have more granular control,
especially for the regeneration of certain services.
A service is an included clan-module that implements facts generation functionality.
For example the zerotier module will generate private and public facts.
In this case the public fact will be the resulting zerotier-ip of the machine.
The secret fact will be the zerotier-identity-secret, which is used by zerotier
to prove the machine has control of the zerotier-ip.
Examples:
$ clan facts generate
Will generate facts for all machines.
$ clan facts generate [MACHINE]
Will generate facts for the specified machine.
$ clan facts generate [MACHINE] --service [SERVICE]
Will generate facts for the specified machine for the specified service.
$ clan facts generate --service [SERVICE] --regenerate
Will regenerate facts, if they are already generated for a specific service.
This is especially useful for resetting certain passwords while leaving the rest
of the facts for a machine in place.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_generate_parser(parser_generate)
parser_upload = subparser.add_parser(
"upload",
help="upload secrets for machines",
epilog=(
f"""
This subcommand allows uploading secrets to remote machines.
If using sops as a secret backend it will upload the private key to the machine.
If using password store it uploads all the secrets you manage to the machine.
The default backend is sops.
Examples:
$ clan facts upload [MACHINE]
Will upload secrets to a specific machine.
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_upload_parser(parser_upload)

View File

@@ -1,263 +0,0 @@
import argparse
import logging
import os
import sys
import traceback
from collections.abc import Callable
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_lib.cmd import RunOpts, run
from clan_lib.errors import ClanError
from clan_lib.flake import require_flake
from clan_lib.git import commit_files
from clan_lib.machines.list import list_full_machines
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
complete_services_for_machine,
)
from .check import check_secrets
from .public_modules import FactStoreBase
from .secret_modules import SecretStoreBase
log = logging.getLogger(__name__)
def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
"""
Read multi-line input from stdin.
"""
print(prompt, flush=True)
proc = run(["cat"], RunOpts(check=False))
log.info("Input received. Processing...")
return proc.stdout.rstrip(os.linesep).rstrip()
def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]:
# fmt: off
return nix_shell(
[
"bash",
"bubblewrap",
],
[
"bwrap",
"--unshare-all",
"--tmpfs", "/",
"--ro-bind", "/nix/store", "/nix/store",
"--ro-bind", "/bin/sh", "/bin/sh",
"--dev", "/dev",
# not allowed to bind procfs in some sandboxes
"--bind", str(facts_dir), str(facts_dir),
"--bind", str(secrets_dir), str(secrets_dir),
"--chdir", "/",
# Doesn't work in our CI?
#"--proc", "/proc",
#"--hostname", "facts",
"--bind", "/proc", "/proc",
"--uid", "1000",
"--gid", "1000",
"--",
"bash", "-c", generator
],
)
# fmt: on
def generate_service_facts(
machine: Machine,
service: str,
regenerate: bool,
secret_facts_store: SecretStoreBase,
public_facts_store: FactStoreBase,
tmpdir: Path,
prompt: Callable[[str, str], str],
) -> bool:
service_dir = tmpdir / service
# check if all secrets exist and generate them if at least one is missing
needs_regeneration = not check_secrets(machine, service=service)
machine.debug(f"{service} needs_regeneration: {needs_regeneration}")
if not (needs_regeneration or regenerate):
return False
if not isinstance(machine.flake, Path):
msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes"
env = os.environ.copy()
facts_dir = service_dir / "facts"
facts_dir.mkdir(parents=True)
env["facts"] = str(facts_dir)
secrets_dir = service_dir / "secrets"
secrets_dir.mkdir(parents=True)
env["secrets"] = str(secrets_dir)
# compatibility for old outputs.nix users
if isinstance(machine.facts_data[service]["generator"], str):
generator = machine.facts_data[service]["generator"]
else:
generator = machine.facts_data[service]["generator"]["finalScript"]
if machine.facts_data[service]["generator"]["prompt"]:
prompt_value = prompt(
service, machine.facts_data[service]["generator"]["prompt"]
)
env["prompt_value"] = prompt_value
from clan_lib import bwrap
if sys.platform == "linux" and bwrap.bubblewrap_works():
cmd = bubblewrap_cmd(generator, facts_dir, secrets_dir)
else:
cmd = ["bash", "-c", generator]
run(
cmd,
RunOpts(env=env),
)
files_to_commit = []
# store secrets
for secret_name, secret in machine.facts_data[service]["secret"].items():
groups = secret.get("groups", [])
secret_file = secrets_dir / secret_name
if not secret_file.is_file():
msg = f"did not generate a file for '{secret_name}' when running the following command:\n"
msg += generator
raise ClanError(msg)
secret_path = secret_facts_store.set(
service, secret_name, secret_file.read_bytes(), groups
)
if secret_path:
files_to_commit.append(secret_path)
# store facts
for name in machine.facts_data[service]["public"]:
fact_file = facts_dir / name
if not fact_file.is_file():
msg = f"did not generate a file for '{name}' when running the following command:\n"
msg += machine.facts_data[service]["generator"]
raise ClanError(msg)
fact_file = public_facts_store.set(service, name, fact_file.read_bytes())
if fact_file:
files_to_commit.append(fact_file)
commit_files(
files_to_commit,
machine.flake_dir,
f"Update facts/secrets for service {service} in machine {machine.name}",
)
return True
def prompt_func(service: str, text: str) -> str:
print(f"{text}: ")
return read_multiline_input()
def _generate_facts_for_machine(
machine: Machine,
service: str | None,
regenerate: bool,
tmpdir: Path,
prompt: Callable[[str, str], str] = prompt_func,
) -> bool:
local_temp = tmpdir / machine.name
local_temp.mkdir()
machine_updated = False
if service and service not in machine.facts_data:
services = list(machine.facts_data.keys())
msg = f"Could not find service with name: {service}. The following services are available: {services}"
raise ClanError(msg)
if service:
machine_service_facts = {service: machine.facts_data[service]}
else:
machine_service_facts = machine.facts_data
for service in machine_service_facts:
machine_updated |= generate_service_facts(
machine=machine,
service=service,
regenerate=regenerate,
secret_facts_store=machine.secret_facts_store,
public_facts_store=machine.public_facts_store,
tmpdir=local_temp,
prompt=prompt,
)
if machine_updated:
# flush caches to make sure the new secrets are available in evaluation
machine.flush_caches()
return machine_updated
def generate_facts(
machines: list[Machine],
service: str | None = None,
regenerate: bool = False,
prompt: Callable[[str, str], str] = prompt_func,
) -> bool:
was_regenerated = False
with TemporaryDirectory(prefix="facts-generate-") as _tmpdir:
tmpdir = Path(_tmpdir).resolve()
for machine in machines:
errors = 0
try:
was_regenerated |= _generate_facts_for_machine(
machine, service, regenerate, tmpdir, prompt
)
except (OSError, ClanError) as e:
machine.error(f"Failed to generate facts: {e}")
traceback.print_exc()
errors += 1
if errors > 0:
msg = (
f"Failed to generate facts for {errors} hosts. Check the logs above"
)
raise ClanError(msg)
if not was_regenerated and len(machines) > 0:
log.info("All secrets and facts are already up to date")
return was_regenerated
def generate_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machines: list[Machine] = list(list_full_machines(flake).values())
if len(args.machines) > 0:
machines = list(
filter(
lambda m: m.name in args.machines,
machines,
)
)
generate_facts(machines, args.service, args.regenerate)
def register_generate_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machines",
type=str,
help="machine to generate facts for. if empty, generate facts for all machines",
nargs="*",
default=[],
)
add_dynamic_completer(machines_parser, complete_machines)
service_parser = parser.add_argument(
"--service",
type=str,
help="service to generate facts for, if empty, generate facts for every service",
default=None,
)
add_dynamic_completer(service_parser, complete_services_for_machine)
parser.add_argument(
"--regenerate",
action=argparse.BooleanOptionalAction,
help="whether to regenerate facts for the specified machine",
default=None,
)
parser.set_defaults(func=generate_command)

View File

@@ -1,15 +0,0 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_generate_command_no_flake(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "generate"])

View File

@@ -1,33 +0,0 @@
import argparse
import json
import logging
from clan_lib.flake import require_flake
from clan_lib.machines.machines import Machine
from clan_cli.completions import add_dynamic_completer, complete_machines
log = logging.getLogger(__name__)
def get_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machine = Machine(name=args.machine, flake=flake)
# the raw_facts are bytestrings making them not json serializable
raw_facts = machine.public_facts_store.get_all()
facts = {}
for key in raw_facts["TODO"]:
facts[key] = raw_facts["TODO"][key].decode("utf8")
print(json.dumps(facts, indent=4))
def register_list_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to print facts for",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.set_defaults(func=get_command)

View File

@@ -1,13 +0,0 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_list_command_no_flake(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "list", "machine1"])

View File

@@ -1,30 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import clan_lib.machines.machines as machines
class FactStoreBase(ABC):
@abstractmethod
def __init__(self, machine: machines.Machine) -> None:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
@abstractmethod
def set(self, service: str, name: str, value: bytes) -> Path | None:
pass
# get a single fact
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
# get all facts
@abstractmethod
def get_all(self) -> dict[str, dict[str, bytes]]:
pass

View File

@@ -1,51 +0,0 @@
from pathlib import Path
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from . import FactStoreBase
class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
def set(self, service: str, name: str, value: bytes) -> Path | None:
if self.machine.flake.is_local:
fact_path = (
self.machine.flake.path
/ "machines"
/ self.machine.name
/ "facts"
/ name
)
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.touch()
fact_path.write_bytes(value)
return fact_path
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg)
def exists(self, service: str, name: str) -> bool:
fact_path = (
self.machine.flake_dir / "machines" / self.machine.name / "facts" / name
)
return fact_path.exists()
# get a single fact
def get(self, service: str, name: str) -> bytes:
fact_path = (
self.machine.flake_dir / "machines" / self.machine.name / "facts" / name
)
return fact_path.read_bytes()
# get all facts
def get_all(self) -> dict[str, dict[str, bytes]]:
facts_folder = self.machine.flake_dir / "machines" / self.machine.name / "facts"
facts: dict[str, dict[str, bytes]] = {}
facts["TODO"] = {}
if facts_folder.exists():
for fact_path in facts_folder.iterdir():
facts["TODO"][fact_path.name] = fact_path.read_bytes()
return facts

View File

@@ -1,47 +0,0 @@
import logging
from pathlib import Path
from clan_lib.dirs import vm_state_dir
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from . import FactStoreBase
log = logging.getLogger(__name__)
class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
self.dir = vm_state_dir(machine.flake.identifier, machine.name) / "facts"
machine.debug(f"FactStore initialized with dir {self.dir}")
def exists(self, service: str, name: str) -> bool:
fact_path = self.dir / service / name
return fact_path.exists()
def set(self, service: str, name: str, value: bytes) -> Path | None:
fact_path = self.dir / service / name
fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.write_bytes(value)
return None
# get a single fact
def get(self, service: str, name: str) -> bytes:
fact_path = self.dir / service / name
if fact_path.exists():
return fact_path.read_bytes()
msg = f"Fact {name} for service {service} not found"
raise ClanError(msg)
# get all facts
def get_all(self) -> dict[str, dict[str, bytes]]:
facts: dict[str, dict[str, bytes]] = {}
if self.dir.exists():
for service in self.dir.iterdir():
facts[service.name] = {}
for fact in service.iterdir():
facts[service.name][fact.name] = fact.read_bytes()
return facts

View File

@@ -1,34 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import clan_lib.machines.machines as machines
from clan_lib.ssh.host import Host
class SecretStoreBase(ABC):
@abstractmethod
def __init__(self, machine: machines.Machine) -> None:
pass
@abstractmethod
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
pass
@abstractmethod
def get(self, service: str, name: str) -> bytes:
pass
@abstractmethod
def exists(self, service: str, name: str) -> bool:
pass
def needs_upload(self, host: Host) -> bool:
return True
@abstractmethod
def upload(self, output_dir: Path) -> None:
pass

View File

@@ -1,122 +0,0 @@
import os
import subprocess
from pathlib import Path
from typing import override
from clan_lib.cmd import Log, RunOpts
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_lib.ssh.host import Host
from clan_cli.facts.secret_modules import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
subprocess.run(
nix_shell(
["pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"],
),
input=value,
check=True,
)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
return subprocess.run(
nix_shell(
["pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"],
),
check=True,
stdout=subprocess.PIPE,
).stdout
def exists(self, service: str, name: str) -> bool:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg"
return secret_path.exists()
def generate_hash(self) -> bytes:
password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
)
hashes = []
hashes.append(
subprocess.run(
nix_shell(
["git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
f"machines/{self.machine.name}",
],
),
stdout=subprocess.PIPE,
check=False,
).stdout.strip()
)
for symlink in Path(password_store).glob(f"machines/{self.machine.name}/**/*"):
if symlink.is_symlink():
hashes.append(
subprocess.run(
nix_shell(
["git"],
[
"git",
"-C",
password_store,
"log",
"-1",
"--format=%H",
str(symlink),
],
),
stdout=subprocess.PIPE,
check=False,
).stdout.strip()
)
# we sort the hashes to make sure that the order is always the same
hashes.sort()
return b"\n".join(hashes)
@override
def needs_upload(self, host: Host) -> bool:
local_hash = self.generate_hash()
with host.host_connection() as ssh:
remote_hash = ssh.run(
# TODO get the path to the secrets from the machine
["cat", f"{self.machine.secrets_upload_directory}/.pass_info"],
RunOpts(log=Log.STDERR, check=False),
).stdout.strip()
if not remote_hash:
print("remote hash is empty")
return True
return local_hash.decode() != remote_hash
def upload(self, output_dir: Path) -> None:
os.umask(0o077)
for service in self.machine.facts_data:
for secret in self.machine.facts_data[service]["secret"]:
if isinstance(secret, dict):
secret_name = secret["name"]
else:
# TODO: drop old format soon
secret_name = secret
(output_dir / secret_name).write_bytes(self.get(service, secret_name))
(output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -1,72 +0,0 @@
from pathlib import Path
from typing import override
from clan_lib.machines.machines import Machine
from clan_lib.ssh.host import Host
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import add_machine, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key, load_age_plugins
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
# no need to generate keys if we don't manage secrets
if not hasattr(self.machine, "facts_data"):
return
if not self.machine.facts_data:
return
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
/ f"{self.machine.name}-age.key",
priv_key,
add_groups=self.machine.select("config.clan.core.sops.defaultGroups"),
age_plugins=load_age_plugins(self.machine.flake),
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
path = (
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}"
)
encrypt_secret(
self.machine.flake_dir,
path,
value,
add_machines=[self.machine.name],
add_groups=groups,
age_plugins=load_age_plugins(self.machine.flake),
)
return path
def get(self, service: str, name: str) -> bytes:
return decrypt_secret(
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
age_plugins=load_age_plugins(self.machine.flake),
).encode("utf-8")
def exists(self, service: str, name: str) -> bool:
return has_secret(
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
)
@override
def needs_upload(self, host: Host) -> bool:
return False
# We rely now on the vars backend to upload the age key
def upload(self, output_dir: Path) -> None:
pass

View File

@@ -1,36 +0,0 @@
import shutil
from pathlib import Path
from typing import override
from clan_lib.dirs import vm_state_dir
from clan_lib.machines.machines import Machine
from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.dir = vm_state_dir(machine.flake.identifier, machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
def set(
self, service: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
secret_file = self.dir / service / name
secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value)
return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes:
secret_file = self.dir / service / name
return secret_file.read_bytes()
def exists(self, service: str, name: str) -> bool:
return (self.dir / service / name).exists()
@override
def upload(self, output_dir: Path) -> None:
if output_dir.exists():
shutil.rmtree(output_dir)
shutil.copytree(self.dir, output_dir)

View File

@@ -1,42 +0,0 @@
import argparse
import logging
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_lib.flake import require_flake
from clan_lib.machines.machines import Machine
from clan_lib.ssh.host import Host
from clan_lib.ssh.upload import upload
from clan_cli.completions import add_dynamic_completer, complete_machines
log = logging.getLogger(__name__)
def upload_secrets(machine: Machine, host: Host) -> None:
if not machine.secret_facts_store.needs_upload(host):
machine.info("Secrets already uploaded")
return
with TemporaryDirectory(prefix="facts-upload-") as _tempdir:
local_secret_dir = Path(_tempdir).resolve()
machine.secret_facts_store.upload(local_secret_dir)
remote_secret_dir = Path(machine.secrets_upload_directory)
upload(host, local_secret_dir, remote_secret_dir)
def upload_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machine = Machine(name=args.machine, flake=flake)
with machine.target_host().host_connection() as host:
upload_secrets(machine, host)
def register_upload_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machine",
help="The machine to upload secrets to",
)
add_dynamic_completer(machines_parser, complete_machines)
parser.set_defaults(func=upload_command)

View File

@@ -1,15 +0,0 @@
from pathlib import Path
import pytest
from clan_lib.errors import ClanError
from clan_cli.tests.helpers import cli
def test_upload_command_no_flake(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
with pytest.raises(ClanError):
cli.run(["facts", "upload", "machine1"])

View File

@@ -1,133 +0,0 @@
import ipaddress
from typing import TYPE_CHECKING
import pytest
from clan_cli.facts.secret_modules.sops import SecretStore
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.tests.fixtures_flakes import FlakeForTest
from clan_cli.tests.helpers import cli
from clan_cli.tests.helpers.validator import is_valid_age_key
from clan_lib.flake import Flake
from clan_lib.machines.machines import Machine
if TYPE_CHECKING:
from .age_keys import KeyPair
@pytest.mark.impure
def test_generate_secret(
monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake_with_core.path),
"user1",
age_keys[0].pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake_with_core.path),
"admins",
"user1",
]
)
cmd = [
"vars",
"generate",
"--flake",
str(test_flake_with_core.path),
"vm1",
"--generator",
"zerotier",
]
cli.run(cmd)
store1 = SecretStore(
Machine(name="vm1", flake=Flake(str(test_flake_with_core.path)))
)
assert store1.exists("", "age.key")
network_id = (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm1"
/ "zerotier"
/ "zerotier-network-id"
/ "value"
).read_text()
assert len(network_id) == 16
secrets_folder = sops_secrets_folder(test_flake_with_core.path)
age_key = secrets_folder / "vm1-age.key" / "secret"
identity_secret = (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm1"
/ "zerotier"
/ "zerotier-identity-secret"
/ "secret"
)
age_key_mtime = age_key.lstat().st_mtime_ns
secret1_mtime = identity_secret.lstat().st_mtime_ns
# Assert that the age key is valid
age_secret = store1.get("", "age.key").decode()
assert is_valid_age_key(age_secret)
# test idempotency for vm1 and also generate for vm2
cli.run(
[
"vars",
"generate",
"--flake",
str(test_flake_with_core.path),
"--generator",
"zerotier",
]
)
assert age_key.lstat().st_mtime_ns == age_key_mtime
assert identity_secret.lstat().st_mtime_ns == secret1_mtime
store2 = SecretStore(
Machine(name="vm2", flake=Flake(str(test_flake_with_core.path)))
)
assert store2.exists("", "age.key")
assert (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm2"
/ "zerotier"
/ "zerotier-identity-secret"
/ "secret"
).exists()
ip = (
test_flake_with_core.path
/ "vars"
/ "per-machine"
/ "vm2"
/ "zerotier"
/ "zerotier-ip"
/ "value"
).read_text()
assert ipaddress.IPv6Address(ip).is_private
# Assert that the age key is valid
age_secret = store2.get("", "age.key").decode()
assert is_valid_age_key(age_secret)

View File

@@ -820,63 +820,6 @@ def test_stdout_of_generate(
caplog.clear() caplog.clear()
@pytest.mark.with_core
def test_migration(
monkeypatch: pytest.MonkeyPatch,
flake_with_sops: ClanFlake,
caplog: pytest.LogCaptureFixture,
) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_service = config["clan"]["core"]["facts"]["services"]["my_service"]
my_service["public"]["my_value"] = {}
my_service["secret"]["my_secret"] = {}
my_service["generator"]["script"] = (
'echo -n hello > "$facts"/my_value && echo -n hello > "$secrets"/my_secret'
)
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_value"]["secret"] = False
my_generator["files"]["my_secret"]["secret"] = True
my_generator["migrateFact"] = "my_service"
my_generator["script"] = 'echo -n other > "$out"/my_value'
other_service = config["clan"]["core"]["facts"]["services"]["other_service"]
other_service["secret"]["other_value"] = {}
other_service["generator"]["script"] = 'echo -n hello > "$secrets"/other_value'
other_generator = config["clan"]["core"]["vars"]["generators"]["other_generator"]
# the var to migrate to is mistakenly marked as not secret (migration should fail)
other_generator["files"]["other_value"]["secret"] = False
other_generator["migrateFact"] = "my_service"
other_generator["script"] = 'echo -n value-from-vars > "$out"/other_value'
flake.refresh()
monkeypatch.chdir(flake.path)
cli.run(["facts", "generate", "--flake", str(flake.path), "my_machine"])
with caplog.at_level(logging.INFO):
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
assert "Migrated var my_generator/my_value" in caplog.text
assert "Migrated secret var my_generator/my_secret" in caplog.text
flake_obj = Flake(str(flake.path))
my_generator = Generator("my_generator", machine="my_machine", _flake=flake_obj)
other_generator = Generator(
"other_generator", machine="my_machine", _flake=flake_obj
)
in_repo_store = in_repo.FactStore(flake=flake_obj)
sops_store = sops.SecretStore(flake=flake_obj)
assert in_repo_store.exists(my_generator, "my_value")
assert in_repo_store.get(my_generator, "my_value").decode() == "hello"
assert sops_store.exists(my_generator, "my_secret")
assert sops_store.get(my_generator, "my_secret").decode() == "hello"
assert in_repo_store.exists(other_generator, "other_value")
assert (
in_repo_store.get(other_generator, "other_value").decode() == "value-from-vars"
)
@pytest.mark.with_core @pytest.mark.with_core
def test_fails_when_files_are_left_from_other_backend( def test_fails_when_files_are_left_from_other_backend(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,

View File

@@ -16,7 +16,6 @@ from clan_cli.completions import (
complete_services_for_machine, complete_services_for_machine,
) )
from clan_cli.vars._types import StoreBase from clan_cli.vars._types import StoreBase
from clan_cli.vars.migration import check_can_migrate, migrate_files
from clan_lib.api import API from clan_lib.api import API
from clan_lib.cmd import RunOpts, run from clan_lib.cmd import RunOpts, run
from clan_lib.errors import ClanError from clan_lib.errors import ClanError
@@ -510,17 +509,14 @@ def _generate_vars_for_machine(
) -> None: ) -> None:
_ensure_healthy(machine=machine, generators=generators) _ensure_healthy(machine=machine, generators=generators)
for generator in generators: for generator in generators:
if check_can_migrate(machine, generator): _execute_generator(
migrate_files(machine, generator) machine=machine,
else: generator=generator,
_execute_generator( secret_vars_store=machine.secret_vars_store,
machine=machine, public_vars_store=machine.public_vars_store,
generator=generator, prompt_values=all_prompt_values.get(generator.name, {}),
secret_vars_store=machine.secret_vars_store, no_sandbox=no_sandbox,
public_vars_store=machine.public_vars_store, )
prompt_values=all_prompt_values.get(generator.name, {}),
no_sandbox=no_sandbox,
)
@API.register @API.register

View File

@@ -1,136 +1,7 @@
import logging import logging
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from clan_lib.errors import ClanError
from clan_lib.git import commit_files
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
if TYPE_CHECKING: if TYPE_CHECKING:
from clan_cli.vars.generate import Generator pass
from clan_lib.machines.machines import Machine
def _migration_file_exists(
machine: "Machine",
generator: "Generator",
fact_name: str,
) -> bool:
for file in generator.files:
if file.name == fact_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
is_secret = file.secret
if is_secret:
if machine.secret_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the secret fact store"
)
if not is_secret:
if machine.public_facts_store.exists(generator.name, fact_name):
return True
machine.debug(
f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the public fact store"
)
return False
def _migrate_file(
machine: "Machine",
generator: "Generator",
var_name: str,
service_name: str,
fact_name: str,
) -> list[Path]:
for file in generator.files:
if file.name == var_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
paths = []
if file.secret:
old_value = machine.secret_facts_store.get(service_name, fact_name)
maybe_path = machine.secret_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
else:
old_value = machine.public_facts_store.get(service_name, fact_name)
maybe_path = machine.public_vars_store.set(
generator, file, old_value, is_migration=True
)
if maybe_path:
paths.append(maybe_path)
return paths
def migrate_files(
machine: "Machine",
generator: "Generator",
) -> None:
not_found = []
files_to_commit = []
for file in generator.files:
if _migration_file_exists(machine, generator, file.name):
assert generator.migrate_fact is not None
files_to_commit += _migrate_file(
machine, generator, file.name, generator.migrate_fact, file.name
)
else:
not_found.append(file.name)
if len(not_found) > 0:
msg = f"Could not migrate the following files for generator {generator.name}, as no fact or secret exists with the same name: {not_found}"
raise ClanError(msg)
commit_files(
files_to_commit,
machine.flake_dir,
f"migrated facts to vars for generator {generator.name} for machine {machine.name}",
)
def check_can_migrate(
machine: "Machine",
generator: "Generator",
) -> bool:
service_name = generator.migrate_fact
if not service_name:
return False
# ensure that none of the generated vars already exist in the store
all_files_missing = True
all_files_present = True
for file in generator.files:
if file.secret:
if machine.secret_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
else:
if machine.public_vars_store.exists(generator, file.name):
all_files_missing = False
else:
all_files_present = False
if not all_files_present and not all_files_missing:
msg = f"Cannot migrate facts for generator {generator.name} as some files already exist in the store"
raise ClanError(msg)
if all_files_present:
# all files already migrated, no need to run migration again
return False
# ensure that all files can be migrated (exists in the corresponding fact store)
return bool(
all(
_migration_file_exists(machine, generator, file.name)
for file in generator.files
)
)

View File

@@ -19,7 +19,6 @@ from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell from clan_lib.nix import nix_shell
from clan_cli.completions import add_dynamic_completer, complete_machines from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.facts.generate import generate_facts
from clan_cli.qemu.qga import QgaSession from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol from clan_cli.qemu.qmp import QEMUMonitorProtocol
from clan_cli.vars.generate import generate_vars from clan_cli.vars.generate import generate_vars
@@ -84,10 +83,7 @@ def get_secrets(
secrets_dir = tmpdir / "secrets" secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(parents=True, exist_ok=True) secrets_dir.mkdir(parents=True, exist_ok=True)
generate_facts([machine])
generate_vars([machine]) generate_vars([machine])
machine.secret_facts_store.upload(secrets_dir)
populate_secret_vars(machine, secrets_dir) populate_secret_vars(machine, secrets_dir)
return secrets_dir return secrets_dir

View File

@@ -6,7 +6,6 @@ from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any, Literal from typing import Any, Literal
from clan_cli.facts.generate import generate_facts
from clan_cli.vars.generate import generate_vars from clan_cli.vars.generate import generate_vars
from clan_cli.vars.upload import populate_secret_vars from clan_cli.vars.upload import populate_secret_vars
@@ -77,7 +76,6 @@ def run_machine_flash(
extra_args = [] extra_args = []
system_config_nix: dict[str, Any] = {} system_config_nix: dict[str, Any] = {}
generate_facts([machine])
generate_vars([machine]) generate_vars([machine])
if system_config.language: if system_config.language:
@@ -131,7 +129,6 @@ def run_machine_flash(
local_dir = tmpdir / upload_dir local_dir = tmpdir / upload_dir
local_dir.mkdir(parents=True) local_dir.mkdir(parents=True)
machine.secret_facts_store.upload(local_dir)
populate_secret_vars(machine, local_dir) populate_secret_vars(machine, local_dir)
disko_install = [] disko_install = []

View File

@@ -5,7 +5,6 @@ from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Literal from typing import Literal
from clan_cli.facts.generate import generate_facts
from clan_cli.machines.hardware import HardwareConfig from clan_cli.machines.hardware import HardwareConfig
from clan_cli.vars.generate import generate_vars from clan_cli.vars.generate import generate_vars
@@ -87,7 +86,6 @@ def run_machine_install(opts: InstallOptions, target_host: Remote) -> None:
# Notify the UI about what we are doing # Notify the UI about what we are doing
notify_install_step("generators") notify_install_step("generators")
generate_facts([machine])
generate_vars([machine]) generate_vars([machine])
with ( with (
@@ -100,7 +98,6 @@ def run_machine_install(opts: InstallOptions, target_host: Remote) -> None:
# Notify the UI about what we are doing # Notify the UI about what we are doing
notify_install_step("upload-secrets") notify_install_step("upload-secrets")
machine.secret_facts_store.upload(upload_dir)
machine.secret_vars_store.populate_dir( machine.secret_vars_store.populate_dir(
machine.name, upload_dir, phases=["activation", "users", "services"] machine.name, upload_dir, phases=["activation", "users", "services"]
) )

View File

@@ -5,8 +5,6 @@ from functools import cached_property
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
from clan_cli.facts import public_modules as facts_public_modules
from clan_cli.facts import secret_modules as facts_secret_modules
from clan_cli.vars._types import StoreBase from clan_cli.vars._types import StoreBase
from clan_lib.api import API from clan_lib.api import API
@@ -84,18 +82,6 @@ class Machine:
f'{self._class_}Configurations."{self.name}".pkgs.hostPlatform.system' f'{self._class_}Configurations."{self.name}".pkgs.hostPlatform.system'
) )
@cached_property
def secret_facts_store(self) -> facts_secret_modules.SecretStoreBase:
secret_module = self.select("config.clan.core.facts.secretModule")
module = importlib.import_module(secret_module)
return module.SecretStore(machine=self)
@cached_property
def public_facts_store(self) -> facts_public_modules.FactStoreBase:
public_module = self.select("config.clan.core.facts.publicModule")
module = importlib.import_module(public_module)
return module.FactStore(machine=self)
@cached_property @cached_property
def secret_vars_store(self) -> StoreBase: def secret_vars_store(self) -> StoreBase:
secret_module = self.select("config.clan.core.vars.settings.secretModule") secret_module = self.select("config.clan.core.vars.settings.secretModule")

View File

@@ -5,8 +5,6 @@ import re
import shlex import shlex
from contextlib import ExitStack from contextlib import ExitStack
from clan_cli.facts.generate import generate_facts
from clan_cli.facts.upload import upload_secrets
from clan_cli.vars.generate import generate_vars from clan_cli.vars.generate import generate_vars
from clan_cli.vars.upload import upload_secret_vars from clan_cli.vars.upload import upload_secret_vars
@@ -147,11 +145,9 @@ def run_machine_update(
# Some operations require root privileges on the target host. # Some operations require root privileges on the target host.
target_host_root = stack.enter_context(_target_host.become_root()) target_host_root = stack.enter_context(_target_host.become_root())
generate_facts([machine], service=None, regenerate=False)
generate_vars([machine], generator_name=None, regenerate=False) generate_vars([machine], generator_name=None, regenerate=False)
# Upload secrets to the target host using root # Upload secrets to the target host using root
upload_secrets(machine, target_host_root)
upload_secret_vars(machine, target_host_root) upload_secret_vars(machine, target_host_root)
# Upload the flake's source to the build host. # Upload the flake's source to the build host.