Merge pull request 'vars: make interface more type-safe' (#2459) from vars into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2459
Reviewed-by: kenji <aks.kenji@protonmail.com>
This commit is contained in:
Mic92
2024-11-26 16:15:55 +00:00
19 changed files with 676 additions and 522 deletions

View File

@@ -40,6 +40,7 @@ in
generators = lib.flip lib.mapAttrs config.clan.core.vars.generators ( generators = lib.flip lib.mapAttrs config.clan.core.vars.generators (
_name: generator: { _name: generator: {
inherit (generator) inherit (generator)
name
dependencies dependencies
finalScript finalScript
invalidationHash invalidationHash
@@ -49,7 +50,7 @@ in
; ;
files = lib.flip lib.mapAttrs generator.files ( files = lib.flip lib.mapAttrs generator.files (
_name: file: { _name: file: {
inherit (file) deploy secret; inherit (file) name deploy secret;
} }
); );
} }

View File

@@ -46,6 +46,16 @@ in
submodule (generator: { submodule (generator: {
imports = [ ./generator.nix ]; imports = [ ./generator.nix ];
options = { options = {
name = lib.mkOption {
type = lib.types.str;
description = ''
The name of the generator.
This name will be used to refer to the generator in other generators.
'';
readOnly = true;
default = generator.config._module.args.name;
};
dependencies = lib.mkOption { dependencies = lib.mkOption {
description = '' description = ''
A list of other generators that this generator depends on. A list of other generators that this generator depends on.
@@ -212,6 +222,14 @@ in
type = attrsOf ( type = attrsOf (
submodule (prompt: { submodule (prompt: {
options = { options = {
name = lib.mkOption {
description = ''
The name of the prompt.
This name will be used to refer to the prompt in the generator script.
'';
type = str;
default = prompt.config._module.args.name;
};
createFile = lib.mkOption { createFile = lib.mkOption {
description = '' description = ''
Whether the prompted value should be stored in a file with the same name as the prompt. Whether the prompted value should be stored in a file with the same name as the prompt.

View File

@@ -5,7 +5,7 @@ from dataclasses import dataclass, field
from functools import cached_property from functools import cached_property
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Any, Literal from typing import TYPE_CHECKING, Any, Literal
from clan_cli.clan_uri import FlakeId from clan_cli.clan_uri import FlakeId
from clan_cli.cmd import run_no_stdout from clan_cli.cmd import run_no_stdout
@@ -21,6 +21,9 @@ from clan_cli.vars.secret_modules import SecretStoreBase
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
if TYPE_CHECKING:
from clan_cli.vars.generate import Generator
@dataclass @dataclass
class Machine: class Machine:
@@ -156,10 +159,16 @@ class Machine:
return {} return {}
@property @property
def vars_generators(self) -> dict[str, dict[str, Any]]: def vars_generators(self) -> list["Generator"]:
if self.deployment["vars"]["generators"]: from clan_cli.vars.generate import Generator
return self.deployment["vars"]["generators"]
return {} clan_vars = self.deployment.get("vars")
if clan_vars is None:
return []
generators: dict[str, Any] = clan_vars.get("generators")
if generators is None:
return []
return [Generator.from_json(gen) for gen in generators.values()]
@property @property
def secrets_upload_directory(self) -> str: def secrets_upload_directory(self) -> str:

View File

@@ -2,10 +2,14 @@ import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines import machines from clan_cli.machines import machines
if TYPE_CHECKING:
from .generate import Generator, Var
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -16,69 +20,12 @@ def string_repr(value: bytes) -> str:
return "<binary blob>" return "<binary blob>"
@dataclass
class Prompt:
name: str
description: str
type: str
has_file: bool
generator: str
previous_value: str | None = None
# TODO: add flag 'pending' generator needs to be executed
@dataclass
class Generator:
name: str
share: bool
prompts: list[Prompt]
@dataclass @dataclass
class GeneratorUpdate: class GeneratorUpdate:
generator: str generator: str
prompt_values: dict[str, str] prompt_values: dict[str, str]
@dataclass
class Var:
_store: "StoreBase"
generator: str
name: str
id: str
secret: bool
shared: bool
deployed: bool
owner: str
group: str
@property
def value(self) -> bytes:
if not self._store.exists(self.generator, self.name, self.shared):
msg = f"Var {self.id} has not been generated yet"
raise ValueError(msg)
# try decode the value or return <binary blob>
return self._store.get(self.generator, self.name, self.shared)
@property
def printable_value(self) -> str:
return string_repr(self.value)
def set(self, value: bytes) -> None:
self._store.set(self.generator, self.name, value, self.shared, self.deployed)
@property
def exists(self) -> bool:
return self._store.exists(self.generator, self.name, self.shared)
def __str__(self) -> str:
if self._store.exists(self.generator, self.name, self.shared):
if self.secret:
return f"{self.id}: ********"
return f"{self.id}: {self.printable_value}"
return f"{self.id}: <not set>"
class StoreBase(ABC): class StoreBase(ABC):
def __init__(self, machine: "machines.Machine") -> None: def __init__(self, machine: "machines.Machine") -> None:
self.machine = machine self.machine = machine
@@ -90,24 +37,22 @@ class StoreBase(ABC):
# get a single fact # get a single fact
@abstractmethod @abstractmethod
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes: def get(self, generator: "Generator", name: str) -> bytes:
pass pass
@abstractmethod @abstractmethod
def _set( def _set(
self, self,
generator_name: str, generator: "Generator",
name: str, var: "Var",
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
""" """
override this method to implement the actual creation of the file override this method to implement the actual creation of the file
""" """
@abstractmethod @abstractmethod
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool: def exists(self, generator: "Generator", name: str) -> bool:
pass pass
@property @property
@@ -122,106 +67,88 @@ class StoreBase(ABC):
) )
raise ClanError(msg) raise ClanError(msg)
def rel_dir(self, generator_name: str, var_name: str, shared: bool = False) -> Path: def rel_dir(self, generator: "Generator", var_name: str) -> Path:
if shared: if generator.share:
return Path(f"shared/{generator_name}/{var_name}") return Path(f"shared/{generator.name}/{var_name}")
return Path(f"per-machine/{self.machine.name}/{generator_name}/{var_name}") return Path(f"per-machine/{self.machine.name}/{generator.name}/{var_name}")
def directory( def directory(self, generator: "Generator", var_name: str) -> Path:
self, generator_name: str, var_name: str, shared: bool = False return Path(self.machine.flake_dir) / "vars" / self.rel_dir(generator, var_name)
) -> Path:
return (
Path(self.machine.flake_dir)
/ "vars"
/ self.rel_dir(generator_name, var_name, shared)
)
def set( def set(
self, self,
generator_name: str, generator: "Generator",
var_name: str, var: "Var",
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
if self.exists(generator_name, var_name, shared): if self.exists(generator, var.name):
if self.is_secret_store: if self.is_secret_store:
old_val = None old_val = None
old_val_str = "********" old_val_str = "********"
else: else:
old_val = self.get(generator_name, var_name, shared) old_val = self.get(generator, var.name)
old_val_str = string_repr(old_val) old_val_str = string_repr(old_val)
else: else:
old_val = None old_val = None
old_val_str = "<not set>" old_val_str = "<not set>"
new_file = self._set(generator_name, var_name, value, shared, deployed) new_file = self._set(generator, var, value)
if self.is_secret_store: if self.is_secret_store:
print(f"Updated secret var {generator_name}/{var_name}\n") print(f"Updated secret var {generator.name}/{var.name}\n")
else: else:
if value != old_val: if value != old_val:
print( print(
f"Updated var {generator_name}/{var_name}\n" f"Updated var {generator.name}/{var.name}\n"
f" old: {old_val_str}\n" f" old: {old_val_str}\n"
f" new: {string_repr(value)}" f" new: {string_repr(value)}"
) )
else: else:
print( print(
f"Var {generator_name}/{var_name} remains unchanged: {old_val_str}" f"Var {generator.name}/{var.name} remains unchanged: {old_val_str}"
) )
return new_file return new_file
def get_all(self) -> list[Var]: def get_all(self) -> list["Var"]:
all_vars = [] all_vars = []
for gen_name, generator in self.machine.vars_generators.items(): for generator in self.machine.vars_generators:
for f_name, file in generator["files"].items(): for var in generator.files:
# only handle vars compatible to this store # only handle vars compatible to this store
if self.is_secret_store != file["secret"]: if self.is_secret_store != var.secret:
continue continue
all_vars.append( var.store(self)
Var( var.generator(generator)
_store=self, all_vars.append(var)
generator=gen_name,
name=f_name,
id=f"{gen_name}/{f_name}",
secret=file["secret"],
shared=generator["share"],
deployed=file["deploy"],
owner=file.get("owner", "root"),
group=file.get("group", "root"),
)
)
return all_vars return all_vars
def get_invalidation_hash(self, generator_name: str) -> str | None: def get_invalidation_hash(self, generator: "Generator") -> str | None:
""" """
Return the invalidation hash that indicates if a generator needs to be re-run Return the invalidation hash that indicates if a generator needs to be re-run
due to a change in its definition due to a change in its definition
""" """
hash_file = ( hash_file = (
self.machine.flake_dir / "vars" / generator_name / "invalidation_hash" self.machine.flake_dir / "vars" / generator.name / "invalidation_hash"
) )
if not hash_file.exists(): if not hash_file.exists():
return None return None
return hash_file.read_text().strip() return hash_file.read_text().strip()
def set_invalidation_hash(self, generator_name: str, hash_str: str) -> None: def set_invalidation_hash(self, generator: "Generator", hash_str: str) -> None:
""" """
Store the invalidation hash that indicates if a generator needs to be re-run Store the invalidation hash that indicates if a generator needs to be re-run
""" """
hash_file = ( hash_file = (
self.machine.flake_dir / "vars" / generator_name / "invalidation_hash" self.machine.flake_dir / "vars" / generator.name / "invalidation_hash"
) )
hash_file.parent.mkdir(parents=True, exist_ok=True) hash_file.parent.mkdir(parents=True, exist_ok=True)
hash_file.write_text(hash_str) hash_file.write_text(hash_str)
def hash_is_valid(self, generator_name: str) -> bool: def hash_is_valid(self, generator: "Generator") -> bool:
""" """
Check if the invalidation hash is up to date Check if the invalidation hash is up to date
If the hash is not set in nix and hasn't been stored before, it is considered valid If the hash is not set in nix and hasn't been stored before, it is considered valid
-> this provides backward and forward compatibility -> this provides backward and forward compatibility
""" """
stored_hash = self.get_invalidation_hash(generator_name) stored_hash = self.get_invalidation_hash(generator)
target_hash = self.machine.vars_generators[generator_name]["invalidationHash"] target_hash = generator.invalidation_hash
# if the hash is neither set in nix nor on disk, it is considered valid (provides backwards compat) # if the hash is neither set in nix nor on disk, it is considered valid (provides backwards compat)
if target_hash is None and stored_hash is None: if target_hash is None and stored_hash is None:
return True return True

View File

@@ -1,75 +1,91 @@
import argparse import argparse
import importlib import importlib
import logging import logging
from dataclasses import dataclass
from clan_cli.completions import add_dynamic_completer, complete_machines from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.vars.public_modules import FactStoreBase
from clan_cli.vars.secret_modules import SecretStoreBase
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from typing import TYPE_CHECKING
@dataclass if TYPE_CHECKING:
class Var: from .generate import Var
generator: str
name: str
@dataclass
class VarStatus: class VarStatus:
missing_secret_vars: list[Var] def __init__(
missing_public_vars: list[Var] self,
unfixed_secret_vars: list[Var] missing_secret_vars: list["Var"],
invalid_generators: list[str] missing_public_vars: list["Var"],
unfixed_secret_vars: list["Var"],
invalid_generators: list[str],
) -> None:
self.missing_secret_vars = missing_secret_vars
self.missing_public_vars = missing_public_vars
self.unfixed_secret_vars = unfixed_secret_vars
self.invalid_generators = invalid_generators
def vars_status(machine: Machine, generator_name: None | str = None) -> VarStatus: def vars_status(machine: Machine, generator_name: None | str = None) -> VarStatus:
secret_vars_module = importlib.import_module(machine.secret_vars_module) secret_vars_module = importlib.import_module(machine.secret_vars_module)
secret_vars_store = secret_vars_module.SecretStore(machine=machine) secret_vars_store: SecretStoreBase = secret_vars_module.SecretStore(machine=machine)
public_vars_module = importlib.import_module(machine.public_vars_module) public_vars_module = importlib.import_module(machine.public_vars_module)
public_vars_store = public_vars_module.FactStore(machine=machine) public_vars_store: FactStoreBase = public_vars_module.FactStore(machine=machine)
missing_secret_vars = [] missing_secret_vars = []
missing_public_vars = [] missing_public_vars = []
# signals if a var needs to be updated (eg. needs re-encryption due to new users added) # signals if a var needs to be updated (eg. needs re-encryption due to new users added)
unfixed_secret_vars = [] unfixed_secret_vars = []
invalid_generators = [] invalid_generators = []
generators = machine.vars_generators
if generator_name: if generator_name:
generators = [generator_name] for generator in generators:
else: if generator_name == generator.name:
generators = list(machine.vars_generators.keys()) generators = [generator]
for generator_name in generators: break
shared = machine.vars_generators[generator_name]["share"] else:
for name, file in machine.vars_generators[generator_name]["files"].items(): err_msg = (
if file["secret"]: f"Generator '{generator_name}' not found in machine {machine.name}"
if not secret_vars_store.exists(generator_name, name, shared=shared): )
raise ClanError(err_msg)
for generator in generators:
generator.machine(machine)
for file in generator.files:
file.store(secret_vars_store if file.secret else public_vars_store)
file.generator(generator)
if file.secret:
if not secret_vars_store.exists(generator, file.name):
log.info( log.info(
f"Secret var '{name}' for service '{generator_name}' in machine {machine.name} is missing." f"Secret var '{file.name}' for service '{generator.name}' in machine {machine.name} is missing."
) )
missing_secret_vars.append(Var(generator_name, name)) missing_secret_vars.append(file)
else: else:
needs_fix, msg = secret_vars_store.needs_fix( needs_fix, msg = secret_vars_store.needs_fix(generator, file.name)
generator_name, name, shared=shared
)
if needs_fix: if needs_fix:
log.info( log.info(
f"Secret var '{name}' for service '{generator_name}' in machine {machine.name} needs update: {msg}" f"Secret var '{file.name}' for service '{generator.name}' in machine {machine.name} needs update: {msg}"
) )
unfixed_secret_vars.append(Var(generator_name, name)) unfixed_secret_vars.append(file)
elif not public_vars_store.exists(generator_name, name, shared=shared): elif not public_vars_store.exists(generator, file.name):
log.info( log.info(
f"Public var '{name}' for service '{generator_name}' in machine {machine.name} is missing." f"Public var '{file.name}' for service '{generator.name}' in machine {machine.name} is missing."
) )
missing_public_vars.append(Var(generator_name, name)) missing_public_vars.append(file)
# check if invalidation hash is up to date # check if invalidation hash is up to date
if not ( if not (
secret_vars_store.hash_is_valid(generator_name) secret_vars_store.hash_is_valid(generator)
and public_vars_store.hash_is_valid(generator_name) and public_vars_store.hash_is_valid(generator)
): ):
invalid_generators.append(generator_name) invalid_generators.append(generator.name)
log.info( log.info(
f"Generator '{generator_name}' in machine {machine.name} has outdated invalidation hash." f"Generator '{generator.name}' in machine {machine.name} has outdated invalidation hash."
) )
log.debug(f"missing_secret_vars: {missing_secret_vars}") log.debug(f"missing_secret_vars: {missing_secret_vars}")
log.debug(f"missing_public_vars: {missing_public_vars}") log.debug(f"missing_public_vars: {missing_public_vars}")

View File

@@ -2,9 +2,11 @@ import argparse
import logging import logging
import os import os
import sys import sys
from dataclasses import dataclass, field
from functools import cached_property
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any from typing import TYPE_CHECKING, Any
from clan_cli.cmd import run from clan_cli.cmd import run
from clan_cli.completions import ( from clan_cli.completions import (
@@ -15,19 +17,60 @@ from clan_cli.completions import (
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.git import commit_files from clan_cli.git import commit_files
from clan_cli.machines.inventory import get_all_machines, get_selected_machines from clan_cli.machines.inventory import get_all_machines, get_selected_machines
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from .check import check_vars
from .graph import ( from .graph import (
minimal_closure, minimal_closure,
requested_closure, requested_closure,
) )
from .prompt import ask from .prompt import Prompt, ask
from .public_modules import FactStoreBase from .public_modules import FactStoreBase
from .secret_modules import SecretStoreBase from .secret_modules import SecretStoreBase
from .var import Var
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
if TYPE_CHECKING:
from clan_cli.machines.machines import Machine
@dataclass
class Generator:
name: str
files: list[Var] = field(default_factory=list)
share: bool = False
invalidation_hash: str | None = None
final_script: str = ""
prompts: list[Prompt] = field(default_factory=list)
dependencies: list[str] = field(default_factory=list)
migrate_fact: str | None = None
# TODO: remove this
_machine: "Machine | None" = None
def machine(self, machine: "Machine") -> None:
self._machine = machine
@cached_property
def exists(self) -> bool:
assert self._machine is not None
return check_vars(self._machine, generator_name=self.name)
@classmethod
def from_json(cls: type["Generator"], data: dict[str, Any]) -> "Generator":
return cls(
name=data["name"],
share=data["share"],
final_script=data["finalScript"],
files=[Var.from_json(data["name"], f) for f in data["files"].values()],
invalidation_hash=data["invalidationHash"],
dependencies=data["dependencies"],
migrate_fact=data["migrateFact"],
prompts=[Prompt.from_json(p) for p in data["prompts"].values()],
)
def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]: def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]:
# fmt: off # fmt: off
@@ -54,26 +97,29 @@ def bubblewrap_cmd(generator: str, tmpdir: Path) -> list[str]:
# TODO: implement caching to not decrypt the same secret multiple times # TODO: implement caching to not decrypt the same secret multiple times
def decrypt_dependencies( def decrypt_dependencies(
machine: Machine, machine: "Machine",
generator_name: str, generator: Generator,
secret_vars_store: SecretStoreBase, secret_vars_store: SecretStoreBase,
public_vars_store: FactStoreBase, public_vars_store: FactStoreBase,
) -> dict[str, dict[str, bytes]]: ) -> dict[str, dict[str, bytes]]:
generator = machine.vars_generators[generator_name]
dependencies = set(generator["dependencies"])
decrypted_dependencies: dict[str, Any] = {} decrypted_dependencies: dict[str, Any] = {}
for dep_generator in dependencies: for generator_name in set(generator.dependencies):
decrypted_dependencies[dep_generator] = {} decrypted_dependencies[generator_name] = {}
dep_files = machine.vars_generators[dep_generator]["files"] for dep_generator in machine.vars_generators:
shared = machine.vars_generators[dep_generator]["share"] if generator_name == dep_generator.name:
for file_name, file in dep_files.items(): break
if file["secret"]: else:
decrypted_dependencies[dep_generator][file_name] = ( msg = f"Could not find dependent generator {generator_name} in machine {machine.name}"
secret_vars_store.get(dep_generator, file_name, shared=shared) raise ClanError(msg)
dep_files = dep_generator.files
for file in dep_files:
if file.secret:
decrypted_dependencies[generator_name][file.name] = (
secret_vars_store.get(dep_generator, file.name)
) )
else: else:
decrypted_dependencies[dep_generator][file_name] = ( decrypted_dependencies[generator_name][file.name] = (
public_vars_store.get(dep_generator, file_name, shared=shared) public_vars_store.get(dep_generator, file.name)
) )
return decrypted_dependencies return decrypted_dependencies
@@ -95,8 +141,8 @@ def dependencies_as_dir(
def execute_generator( def execute_generator(
machine: Machine, machine: "Machine",
generator_name: str, generator: Generator,
secret_vars_store: SecretStoreBase, secret_vars_store: SecretStoreBase,
public_vars_store: FactStoreBase, public_vars_store: FactStoreBase,
prompt_values: dict[str, str], prompt_values: dict[str, str],
@@ -105,14 +151,10 @@ def execute_generator(
msg = f"flake is not a Path: {machine.flake}" msg = f"flake is not a Path: {machine.flake}"
msg += "fact/secret generation is only supported for local flakes" msg += "fact/secret generation is only supported for local flakes"
generator = machine.vars_generators[generator_name]
script = generator["finalScript"]
is_shared = generator["share"]
# build temporary file tree of dependencies # build temporary file tree of dependencies
decrypted_dependencies = decrypt_dependencies( decrypted_dependencies = decrypt_dependencies(
machine, machine,
generator_name, generator,
secret_vars_store, secret_vars_store,
public_vars_store, public_vars_store,
) )
@@ -121,7 +163,7 @@ def execute_generator(
try: try:
return prompt_values[prompt_name] return prompt_values[prompt_name]
except KeyError as e: except KeyError as e:
msg = f"prompt value for '{prompt_name}' in generator {generator_name} not provided" msg = f"prompt value for '{prompt_name}' in generator {generator.name} not provided"
raise ClanError(msg) from e raise ClanError(msg) from e
env = os.environ.copy() env = os.environ.copy()
@@ -138,97 +180,94 @@ def execute_generator(
dependencies_as_dir(decrypted_dependencies, tmpdir_in) dependencies_as_dir(decrypted_dependencies, tmpdir_in)
# populate prompted values # populate prompted values
# TODO: make prompts rest API friendly # TODO: make prompts rest API friendly
if generator["prompts"]: if generator.prompts:
tmpdir_prompts.mkdir() tmpdir_prompts.mkdir()
env["prompts"] = str(tmpdir_prompts) env["prompts"] = str(tmpdir_prompts)
for prompt_name in generator["prompts"]: for prompt in generator.prompts:
prompt_file = tmpdir_prompts / prompt_name prompt_file = tmpdir_prompts / prompt.name
value = get_prompt_value(prompt_name) value = get_prompt_value(prompt.name)
prompt_file.write_text(value) prompt_file.write_text(value)
if sys.platform == "linux": if sys.platform == "linux":
cmd = bubblewrap_cmd(script, tmpdir) cmd = bubblewrap_cmd(generator.final_script, tmpdir)
else: else:
cmd = ["bash", "-c", script] cmd = ["bash", "-c", generator.final_script]
run( run(
cmd, cmd,
env=env, env=env,
) )
files_to_commit = [] files_to_commit = []
# store secrets # store secrets
files = generator["files"] files = generator.files
public_changed = False public_changed = False
secret_changed = False secret_changed = False
for file_name, file in files.items(): for file in files:
is_deployed = file["deploy"] secret_file = tmpdir_out / file.name
secret_file = tmpdir_out / file_name
if not secret_file.is_file(): if not secret_file.is_file():
msg = f"did not generate a file for '{file_name}' when running the following command:\n" msg = f"did not generate a file for '{file.name}' when running the following command:\n"
msg += script msg += generator.final_script
raise ClanError(msg) raise ClanError(msg)
if file["secret"]: if file.secret:
file_path = secret_vars_store.set( file_path = secret_vars_store.set(
generator_name, generator,
file_name, file,
secret_file.read_bytes(), secret_file.read_bytes(),
shared=is_shared,
deployed=is_deployed,
) )
secret_changed = True secret_changed = True
else: else:
file_path = public_vars_store.set( file_path = public_vars_store.set(
generator_name, generator,
file_name, file,
secret_file.read_bytes(), secret_file.read_bytes(),
shared=is_shared,
) )
public_changed = True public_changed = True
if file_path: if file_path:
files_to_commit.append(file_path) files_to_commit.append(file_path)
if generator["invalidationHash"] is not None: if generator.invalidation_hash is not None:
if public_changed: if public_changed:
public_vars_store.set_invalidation_hash( public_vars_store.set_invalidation_hash(
generator_name, generator["invalidationHash"] generator, generator.invalidation_hash
) )
if secret_changed: if secret_changed:
secret_vars_store.set_invalidation_hash( secret_vars_store.set_invalidation_hash(
generator_name, generator["invalidationHash"] generator, generator.invalidation_hash
) )
commit_files( commit_files(
files_to_commit, files_to_commit,
machine.flake_dir, machine.flake_dir,
f"Update vars via generator {generator_name} for machine {machine.name}", f"Update vars via generator {generator.name} for machine {machine.name}",
) )
def _ask_prompts( def _ask_prompts(
machine: Machine, generators: list[Generator],
generator_names: list[str],
) -> dict[str, dict[str, str]]: ) -> dict[str, dict[str, str]]:
prompt_values: dict[str, dict[str, str]] = {} prompt_values: dict[str, dict[str, str]] = {}
for generator in generator_names: for generator in generators:
prompts = machine.vars_generators[generator]["prompts"] for prompt in generator.prompts:
for prompt_name, _prompt in prompts.items(): if generator.name not in prompt_values:
if generator not in prompt_values: prompt_values[generator.name] = {}
prompt_values[generator] = {} var_id = f"{generator.name}/{prompt.name}"
var_id = f"{generator}/{prompt_name}" prompt_values[generator.name][prompt.name] = ask(var_id, prompt.prompt_type)
prompt_values[generator][prompt_name] = ask(var_id, _prompt["type"])
return prompt_values return prompt_values
def get_closure( def get_closure(
machine: Machine, machine: "Machine",
generator_name: str | None, generator_name: str | None,
regenerate: bool, regenerate: bool,
) -> list[str]: ) -> list[Generator]:
from .graph import Generator, all_missing_closure, full_closure from .graph import all_missing_closure, full_closure
vars_generators = machine.vars_generators vars_generators = machine.vars_generators
generators: dict[str, Generator] = { generators: dict[str, Generator] = {
name: Generator(name, generator["dependencies"], _machine=machine) generator.name: generator for generator in vars_generators
for name, generator in vars_generators.items()
} }
# TODO: we should remove this
for generator in vars_generators:
generator.machine(machine)
if generator_name is None: # all generators selected if generator_name is None: # all generators selected
if regenerate: if regenerate:
return full_closure(generators) return full_closure(generators)
@@ -240,109 +279,109 @@ def get_closure(
def _migration_file_exists( def _migration_file_exists(
machine: Machine, machine: "Machine",
service_name: str, generator: Generator,
fact_name: str, fact_name: str,
) -> bool: ) -> bool:
is_secret = machine.vars_generators[service_name]["files"][fact_name]["secret"] for file in generator.files:
if file.name == fact_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
is_secret = file.secret
if is_secret: if is_secret:
if machine.secret_facts_store.exists(service_name, fact_name): if machine.secret_facts_store.exists(generator.name, fact_name):
return True return True
log.debug( log.debug(
f"Cannot migrate fact {fact_name} for service {service_name}, as it does not exist in the secret fact store" f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the secret fact store"
) )
if not is_secret: if not is_secret:
if machine.public_facts_store.exists(service_name, fact_name): if machine.public_facts_store.exists(generator.name, fact_name):
return True return True
log.debug( log.debug(
f"Cannot migrate fact {fact_name} for service {service_name}, as it does not exist in the public fact store" f"Cannot migrate fact {fact_name} for service {generator.name}, as it does not exist in the public fact store"
) )
return False return False
def _migrate_file( def _migrate_file(
machine: Machine, machine: "Machine",
generator_name: str, generator: Generator,
var_name: str, var_name: str,
service_name: str, service_name: str,
fact_name: str, fact_name: str,
) -> None: ) -> None:
is_secret = machine.vars_generators[generator_name]["files"][var_name]["secret"] for file in generator.files:
if is_secret: if file.name == var_name:
break
else:
msg = f"Could not find file {fact_name} in generator {generator.name}"
raise ClanError(msg)
if file.secret:
old_value = machine.secret_facts_store.get(service_name, fact_name) old_value = machine.secret_facts_store.get(service_name, fact_name)
machine.secret_vars_store.set(generator, file, old_value)
else: else:
old_value = machine.public_facts_store.get(service_name, fact_name) old_value = machine.public_facts_store.get(service_name, fact_name)
is_shared = machine.vars_generators[generator_name]["share"] machine.public_vars_store.set(generator, file, old_value)
is_deployed = machine.vars_generators[generator_name]["files"][var_name]["deploy"]
if is_secret:
machine.secret_vars_store.set(
generator_name, var_name, old_value, shared=is_shared, deployed=is_deployed
)
else:
machine.public_vars_store.set(
generator_name, var_name, old_value, shared=is_shared, deployed=is_deployed
)
def _migrate_files( def _migrate_files(
machine: Machine, machine: "Machine",
generator_name: str, generator: Generator,
) -> None: ) -> None:
service_name = machine.vars_generators[generator_name]["migrateFact"]
not_found = [] not_found = []
for var_name, _file in machine.vars_generators[generator_name]["files"].items(): for file in generator.files:
if _migration_file_exists(machine, generator_name, var_name): if _migration_file_exists(machine, generator, file.name):
_migrate_file(machine, generator_name, var_name, service_name, var_name) assert generator.migrate_fact is not None
_migrate_file(
machine, generator, file.name, generator.migrate_fact, file.name
)
else: else:
not_found.append(var_name) not_found.append(file.name)
if len(not_found) > 0: if len(not_found) > 0:
msg = f"Could not migrate the following files for generator {generator_name}, as no fact or secret exists with the same name: {not_found}" msg = f"Could not migrate the following files for generator {generator.name}, as no fact or secret exists with the same name: {not_found}"
raise ClanError(msg) raise ClanError(msg)
def _check_can_migrate( def _check_can_migrate(
machine: Machine, machine: "Machine",
generator_name: str, generator: Generator,
) -> bool: ) -> bool:
vars_generator = machine.vars_generators[generator_name] service_name = generator.migrate_fact
if "migrateFact" not in vars_generator:
return False
service_name = vars_generator["migrateFact"]
if not service_name: if not service_name:
return False return False
# ensure that none of the generated vars already exist in the store # ensure that none of the generated vars already exist in the store
for fname, file in vars_generator["files"].items(): for file in generator.files:
if file["secret"]: if file.secret:
if machine.secret_vars_store.exists( if machine.secret_vars_store.exists(generator, file.name):
generator_name, fname, vars_generator["share"] if file.deploy:
):
if vars_generator["deploy"]:
machine.secret_vars_store.ensure_machine_has_access( machine.secret_vars_store.ensure_machine_has_access(
generator_name, fname, vars_generator["share"] generator, file.name
) )
return False return False
else: else:
if machine.public_vars_store.exists( if machine.public_vars_store.exists(generator, file.name):
generator_name, fname, vars_generator["share"]
):
return False return False
# ensure that the service to migrate from actually exists # ensure that the service to migrate from actually exists
if service_name not in machine.facts_data: if service_name not in machine.facts_data:
log.debug( log.debug(
f"Could not migrate facts for generator {generator_name}, as the service {service_name} does not exist" f"Could not migrate facts for generator {generator.name}, as the service {service_name} does not exist"
) )
return False return False
# ensure that all files can be migrated (exists in the corresponding fact store) # ensure that all files can be migrated (exists in the corresponding fact store)
return bool( return bool(
all( all(
_migration_file_exists(machine, generator_name, fname) _migration_file_exists(machine, generator, file.name)
for fname in vars_generator["files"] for file in generator.files
) )
) )
def ensure_consistent_state( def ensure_consistent_state(
machine: Machine, machine: "Machine",
generator_name: str | None, generator_name: str | None,
fix: bool, fix: bool,
) -> None: ) -> None:
@@ -352,25 +391,30 @@ def ensure_consistent_state(
""" """
if generator_name is None: if generator_name is None:
generators = list(machine.vars_generators.keys()) generators = machine.vars_generators
else: else:
generators = [generator_name] for generator in machine.vars_generators:
if generator_name == generator.name:
generators = [generator]
break
else:
err_msg = (
f"Could not find generator {generator_name} in machine {machine.name}"
)
raise ClanError(err_msg)
outdated = [] outdated = []
for generator_name in generators: for generator in generators:
for name, file in machine.vars_generators[generator_name]["files"].items(): for file in generator.files:
shared = machine.vars_generators[generator_name]["share"] if file.secret and machine.secret_vars_store.exists(generator, file.name):
if file["secret"] and machine.secret_vars_store.exists( if file.deploy:
generator_name, name, shared=shared
):
if file["deploy"]:
machine.secret_vars_store.ensure_machine_has_access( machine.secret_vars_store.ensure_machine_has_access(
generator_name, name, shared=shared generator, file.name
) )
needs_update, msg = machine.secret_vars_store.needs_fix( needs_update, msg = machine.secret_vars_store.needs_fix(
generator_name, name, shared=shared generator, file.name
) )
if needs_update: if needs_update:
outdated.append((generator_name, name, msg)) outdated.append((generator_name, file.name, msg))
if not fix and outdated: if not fix and outdated:
msg = ( msg = (
"The local state of some secret vars is inconsistent and needs to be updated.\n" "The local state of some secret vars is inconsistent and needs to be updated.\n"
@@ -382,7 +426,7 @@ def ensure_consistent_state(
def generate_vars_for_machine( def generate_vars_for_machine(
machine: Machine, machine: "Machine",
generator_name: str | None, generator_name: str | None,
regenerate: bool, regenerate: bool,
fix: bool, fix: bool,
@@ -391,17 +435,17 @@ def generate_vars_for_machine(
closure = get_closure(machine, generator_name, regenerate) closure = get_closure(machine, generator_name, regenerate)
if len(closure) == 0: if len(closure) == 0:
return False return False
prompt_values = _ask_prompts(machine, closure) prompt_values = _ask_prompts(closure)
for gen_name in closure: for generator in closure:
if _check_can_migrate(machine, gen_name): if _check_can_migrate(machine, generator):
_migrate_files(machine, gen_name) _migrate_files(machine, generator)
else: else:
execute_generator( execute_generator(
machine, machine,
gen_name, generator,
machine.secret_vars_store, machine.secret_vars_store,
machine.public_vars_store, machine.public_vars_store,
prompt_values.get(gen_name, {}), prompt_values.get(generator.name, {}),
) )
# flush caches to make sure the new secrets are available in evaluation # flush caches to make sure the new secrets are available in evaluation
machine.flush_caches() machine.flush_caches()
@@ -409,7 +453,7 @@ def generate_vars_for_machine(
def generate_vars( def generate_vars(
machines: list[Machine], machines: list["Machine"],
generator_name: str | None = None, generator_name: str | None = None,
regenerate: bool = False, regenerate: bool = False,
fix: bool = False, fix: bool = False,
@@ -423,11 +467,14 @@ def generate_vars(
) )
machine.flush_caches() machine.flush_caches()
except Exception as exc: except Exception as exc:
machine.error(f"Failed to generate facts: {exc}") errors += [(machine, exc)]
errors += [exc] if len(errors) == 1:
if len(errors) > 0: raise errors[0][1]
msg = f"Failed to generate facts for {len(errors)} hosts. Check the logs above" if len(errors) > 1:
raise ClanError(msg) from errors[0] msg = f"Failed to generate facts for {len(errors)} hosts:"
for machine, error in errors:
msg += f"\n{machine}: {error}"
raise ClanError(msg) from errors[0][1]
if not was_regenerated and len(machines) > 0: if not was_regenerated and len(machines) > 0:
machine.info("All vars are already up to date") machine.info("All vars are already up to date")

View File

@@ -7,7 +7,7 @@ from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from ._types import Var from .generate import Var
from .list import get_vars from .list import get_vars
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@@ -1,31 +1,21 @@
from __future__ import annotations
from collections.abc import Iterable from collections.abc import Iterable
from dataclasses import dataclass
from functools import cached_property
from graphlib import TopologicalSorter from graphlib import TopologicalSorter
from typing import TYPE_CHECKING
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine
from .check import check_vars if TYPE_CHECKING:
from .generate import Generator
class GeneratorNotFoundError(ClanError): class GeneratorNotFoundError(ClanError):
pass pass
@dataclass
class Generator:
name: str
dependencies: list[str]
_machine: Machine
@cached_property
def exists(self) -> bool:
return check_vars(self._machine, generator_name=self.name)
def missing_dependency_closure( def missing_dependency_closure(
requested_generators: Iterable[str], generators: dict requested_generators: Iterable[str], generators: dict[str, Generator]
) -> set[str]: ) -> set[str]:
closure = set(requested_generators) closure = set(requested_generators)
# extend the graph to include all dependencies which are not on disk # extend the graph to include all dependencies which are not on disk
@@ -52,7 +42,9 @@ def add_missing_dependencies(
return missing_dependency_closure(closure, generators) | closure return missing_dependency_closure(closure, generators) | closure
def add_dependents(requested_generators: Iterable[str], generators: dict) -> set[str]: def add_dependents(
requested_generators: Iterable[str], generators: dict[str, Generator]
) -> set[str]:
closure = set(requested_generators) closure = set(requested_generators)
# build reverse dependency graph (graph of dependents) # build reverse dependency graph (graph of dependents)
dependents_graph: dict[str, set[str]] = {} dependents_graph: dict[str, set[str]] = {}
@@ -72,7 +64,9 @@ def add_dependents(requested_generators: Iterable[str], generators: dict) -> set
return closure return closure
def toposort_closure(_closure: Iterable[str], generators: dict) -> list[str]: def toposort_closure(
_closure: Iterable[str], generators: dict[str, Generator]
) -> list[Generator]:
closure = set(_closure) closure = set(_closure)
# return the topological sorted list of generators to execute # return the topological sorted list of generators to execute
final_dep_graph = {} final_dep_graph = {}
@@ -81,16 +75,16 @@ def toposort_closure(_closure: Iterable[str], generators: dict) -> list[str]:
final_dep_graph[gen_name] = deps final_dep_graph[gen_name] = deps
sorter = TopologicalSorter(final_dep_graph) sorter = TopologicalSorter(final_dep_graph)
result = list(sorter.static_order()) result = list(sorter.static_order())
return result return [generators[gen_name] for gen_name in result]
# all generators in topological order # all generators in topological order
def full_closure(generators: dict) -> list[str]: def full_closure(generators: dict[str, Generator]) -> list[Generator]:
return toposort_closure(generators.keys(), generators) return toposort_closure(generators.keys(), generators)
# just the missing generators including their dependents # just the missing generators including their dependents
def all_missing_closure(generators: dict) -> list[str]: def all_missing_closure(generators: dict[str, Generator]) -> list[Generator]:
# collect all generators that are missing from disk # collect all generators that are missing from disk
closure = {gen_name for gen_name, gen in generators.items() if not gen.exists} closure = {gen_name for gen_name, gen in generators.items() if not gen.exists}
closure = add_dependents(closure, generators) closure = add_dependents(closure, generators)
@@ -98,7 +92,9 @@ def all_missing_closure(generators: dict) -> list[str]:
# only a selected list of generators including their missing dependencies and their dependents # only a selected list of generators including their missing dependencies and their dependents
def requested_closure(requested_generators: list[str], generators: dict) -> list[str]: def requested_closure(
requested_generators: list[str], generators: dict[str, Generator]
) -> list[Generator]:
closure = set(requested_generators) closure = set(requested_generators)
# extend the graph to include all dependencies which are not on disk # extend the graph to include all dependencies which are not on disk
closure = add_missing_dependencies(closure, generators) closure = add_missing_dependencies(closure, generators)
@@ -108,7 +104,9 @@ def requested_closure(requested_generators: list[str], generators: dict) -> list
# just enough to ensure that the list of selected generators are in a consistent state. # just enough to ensure that the list of selected generators are in a consistent state.
# empty if nothing is missing. # empty if nothing is missing.
def minimal_closure(requested_generators: list[str], generators: dict) -> list[str]: def minimal_closure(
requested_generators: list[str], generators: dict[str, Generator]
) -> list[Generator]:
closure = set(requested_generators) closure = set(requested_generators)
final_closure = missing_dependency_closure(closure, generators) final_closure = missing_dependency_closure(closure, generators)
# add requested generators if not already exist # add requested generators if not already exist

View File

@@ -4,10 +4,11 @@ import logging
from clan_cli.api import API from clan_cli.api import API
from clan_cli.completions import add_dynamic_completer, complete_machines from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from ._types import Generator, GeneratorUpdate, Prompt, Var from ._types import GeneratorUpdate
from .generate import execute_generator from .generate import Generator, Prompt, Var, execute_generator
from .public_modules import FactStoreBase from .public_modules import FactStoreBase
from .secret_modules import SecretStoreBase from .secret_modules import SecretStoreBase
@@ -35,44 +36,25 @@ def _get_previous_value(
generator: Generator, generator: Generator,
prompt: Prompt, prompt: Prompt,
) -> str | None: ) -> str | None:
if not prompt.has_file: if not prompt.create_file:
return None return None
pub_store = public_store(machine) pub_store = public_store(machine)
if pub_store.exists(generator.name, prompt.name, shared=generator.share): if pub_store.exists(generator, prompt.name):
return pub_store.get( return pub_store.get(generator, prompt.name).decode()
generator.name, prompt.name, shared=generator.share
).decode()
sec_store = secret_store(machine) sec_store = secret_store(machine)
if sec_store.exists(generator.name, prompt.name, shared=generator.share): if sec_store.exists(generator, prompt.name):
return sec_store.get( return sec_store.get(generator, prompt.name).decode()
generator.name, prompt.name, shared=generator.share
).decode()
return None return None
@API.register @API.register
# TODO: use machine_name # TODO: use machine_name
def get_prompts(machine: Machine) -> list[Generator]: def get_prompts(machine: Machine) -> list[Generator]:
generators = [] generators: list[Generator] = machine.vars_generators
for gen_name, generator in machine.vars_generators.items(): for generator in generators:
prompts: list[Prompt] = [] for prompt in generator.prompts:
gen = Generator( prompt.previous_value = _get_previous_value(machine, generator, prompt)
name=gen_name,
prompts=prompts,
share=generator["share"],
)
for prompt_name, prompt in generator["prompts"].items():
prompt = Prompt(
name=prompt_name,
description=prompt["description"],
type=prompt["type"],
has_file=prompt["createFile"],
generator=gen_name,
)
prompt.previous_value = _get_previous_value(machine, gen, prompt)
prompts.append(prompt)
generators.append(gen)
return generators return generators
@@ -82,9 +64,15 @@ def get_prompts(machine: Machine) -> list[Generator]:
@API.register @API.register
def set_prompts(machine: Machine, updates: list[GeneratorUpdate]) -> None: def set_prompts(machine: Machine, updates: list[GeneratorUpdate]) -> None:
for update in updates: for update in updates:
for generator in machine.vars_generators:
if generator.name == update.generator:
break
else:
msg = f"Generator '{update.generator}' not found in machine {machine.name}"
raise ClanError(msg)
execute_generator( execute_generator(
machine, machine,
update.generator, generator,
secret_vars_store=secret_store(machine), secret_vars_store=secret_store(machine),
public_vars_store=public_store(machine), public_vars_store=public_store(machine),
prompt_values=update.prompt_values, prompt_values=update.prompt_values,

View File

@@ -1,8 +1,9 @@
import enum
import logging import logging
import sys import sys
from dataclasses import dataclass
from getpass import getpass from getpass import getpass
from typing import Any
from clan_cli.errors import ClanError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -10,18 +11,42 @@ log = logging.getLogger(__name__)
MOCK_PROMPT_RESPONSE = None MOCK_PROMPT_RESPONSE = None
def ask(description: str, input_type: str) -> str: class PromptType(enum.Enum):
LINE = "line"
HIDDEN = "hidden"
MULTILINE = "multiline"
@dataclass
class Prompt:
name: str
description: str
prompt_type: PromptType
create_file: bool = False
previous_value: str | None = None
@classmethod
def from_json(cls: type["Prompt"], data: dict[str, Any]) -> "Prompt":
return cls(
name=data["name"],
description=data["description"],
prompt_type=PromptType(data["type"]),
create_file=data["createFile"],
previous_value=data.get("previousValue"),
)
def ask(description: str, input_type: PromptType) -> str:
if MOCK_PROMPT_RESPONSE: if MOCK_PROMPT_RESPONSE:
return next(MOCK_PROMPT_RESPONSE) return next(MOCK_PROMPT_RESPONSE)
if input_type == "line": match input_type:
result = input(f"Enter the value for {description}: ") case PromptType.LINE:
elif input_type == "multiline": result = input(f"Enter the value for {description}: ")
print(f"Enter the value for {description} (Finish with Ctrl-D): ") case PromptType.MULTILINE:
result = sys.stdin.read() print(f"Enter the value for {description} (Finish with Ctrl-D): ")
elif input_type == "hidden": result = sys.stdin.read()
result = getpass(f"Enter the value for {description} (hidden): ") case PromptType.HIDDEN:
else: result = getpass(f"Enter the value for {description} (hidden): ")
msg = f"Unknown input type: {input_type} for prompt {description}"
raise ClanError(msg)
log.info("Input received. Processing...") log.info("Input received. Processing...")
return result return result

View File

@@ -3,6 +3,7 @@ from pathlib import Path
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.vars.generate import Generator, Var
from . import FactStoreBase from . import FactStoreBase
@@ -18,16 +19,14 @@ class FactStore(FactStoreBase):
def _set( def _set(
self, self,
generator_name: str, generator: Generator,
name: str, var: Var,
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
if not self.machine.flake.is_local(): if not self.machine.flake.is_local():
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}" msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg) raise ClanError(msg)
folder = self.directory(generator_name, name, shared) folder = self.directory(generator, var.name)
if folder.exists(): if folder.exists():
if not (folder / "value").exists(): if not (folder / "value").exists():
# another backend has used that folder before -> error out # another backend has used that folder before -> error out
@@ -42,8 +41,8 @@ class FactStore(FactStoreBase):
return file_path return file_path
# get a single fact # get a single fact
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
return (self.directory(generator_name, name, shared) / "value").read_bytes() return (self.directory(generator, name) / "value").read_bytes()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool: def exists(self, generator: Generator, name: str) -> bool:
return (self.directory(generator_name, name, shared) / "value").exists() return (self.directory(generator, name) / "value").exists()

View File

@@ -4,6 +4,7 @@ from pathlib import Path
from clan_cli.dirs import vm_state_dir from clan_cli.dirs import vm_state_dir
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.vars.generate import Generator, Var
from . import FactStoreBase from . import FactStoreBase
@@ -21,27 +22,25 @@ class FactStore(FactStoreBase):
def store_name(self) -> str: def store_name(self) -> str:
return "vm" return "vm"
def exists(self, service: str, name: str, shared: bool = False) -> bool: def exists(self, generator: Generator, name: str) -> bool:
fact_path = self.dir / service / name fact_path = self.dir / generator.name / name
return fact_path.exists() return fact_path.exists()
def _set( def _set(
self, self,
service: str, generator: Generator,
name: str, var: Var,
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
fact_path = self.dir / service / name fact_path = self.dir / generator.name / var.name
fact_path.parent.mkdir(parents=True, exist_ok=True) fact_path.parent.mkdir(parents=True, exist_ok=True)
fact_path.write_bytes(value) fact_path.write_bytes(value)
return None return None
# get a single fact # get a single fact
def get(self, service: str, name: str, shared: bool = False) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
fact_path = self.dir / service / name fact_path = self.dir / generator.name / name
if fact_path.exists(): if fact_path.exists():
return fact_path.read_bytes() return fact_path.read_bytes()
msg = f"Fact {name} for service {service} not found" msg = f"Fact {name} for service {generator.name} not found"
raise ClanError(msg) raise ClanError(msg)

View File

@@ -1,8 +1,12 @@
from abc import abstractmethod from abc import abstractmethod
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
from clan_cli.vars._types import StoreBase from clan_cli.vars._types import StoreBase
if TYPE_CHECKING:
from clan_cli.vars.generate import Generator
class SecretStoreBase(StoreBase): class SecretStoreBase(StoreBase):
@property @property
@@ -12,16 +16,13 @@ class SecretStoreBase(StoreBase):
def needs_upload(self) -> bool: def needs_upload(self) -> bool:
return True return True
def ensure_machine_has_access( def ensure_machine_has_access(self, generator: "Generator", name: str) -> None:
self, generator_name: str, name: str, shared: bool = False
) -> None:
pass pass
def needs_fix( def needs_fix(
self, self,
generator_name: str, generator: "Generator",
name: str, name: str,
shared: bool,
) -> tuple[bool, str | None]: ) -> tuple[bool, str | None]:
""" """
Check if local state needs updating, eg. secret needs to be re-encrypted with new keys Check if local state needs updating, eg. secret needs to be re-encrypted with new keys
@@ -30,9 +31,8 @@ class SecretStoreBase(StoreBase):
def fix( def fix(
self, self,
generator_name: str, generator: "Generator",
name: str, name: str,
shared: bool,
) -> None: ) -> None:
""" """
Update local state, eg make sure secret is encrypted with correct keys Update local state, eg make sure secret is encrypted with correct keys

View File

@@ -9,6 +9,7 @@ from typing import override
from clan_cli.cmd import Log, run from clan_cli.cmd import Log, run
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from clan_cli.vars.generate import Generator, Var
from . import SecretStoreBase from . import SecretStoreBase
@@ -30,16 +31,14 @@ class SecretStore(SecretStoreBase):
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store" "PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
) )
def entry_dir(self, generator_name: str, name: str, shared: bool) -> Path: def entry_dir(self, generator: Generator, name: str) -> Path:
return Path(self.entry_prefix) / self.rel_dir(generator_name, name, shared) return Path(self.entry_prefix) / self.rel_dir(generator, name)
def _set( def _set(
self, self,
generator_name: str, generator: Generator,
name: str, var: Var,
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
run( run(
nix_shell( nix_shell(
@@ -48,7 +47,7 @@ class SecretStore(SecretStoreBase):
"pass", "pass",
"insert", "insert",
"-m", "-m",
str(self.entry_dir(generator_name, name, shared)), str(self.entry_dir(generator, var.name)),
], ],
), ),
input=value, input=value,
@@ -56,22 +55,21 @@ class SecretStore(SecretStoreBase):
) )
return None # we manage the files outside of the git repo return None # we manage the files outside of the git repo
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
return run( return run(
nix_shell( nix_shell(
["nixpkgs#pass"], ["nixpkgs#pass"],
[ [
"pass", "pass",
"show", "show",
str(self.entry_dir(generator_name, name, shared)), str(self.entry_dir(generator, name)),
], ],
), ),
).stdout.encode() ).stdout.encode()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool: def exists(self, generator: Generator, name: str) -> bool:
return ( return (
Path(self._password_store_dir) Path(self._password_store_dir) / f"{self.entry_dir(generator, name)}.gpg"
/ f"{self.entry_dir(generator_name, name, shared)}.gpg"
).exists() ).exists()
def generate_hash(self) -> bytes: def generate_hash(self) -> bytes:
@@ -128,9 +126,9 @@ class SecretStore(SecretStoreBase):
hashes.sort() hashes.sort()
manifest = [] manifest = []
for gen_name, generator in self.machine.vars_generators.items(): for generator in self.machine.vars_generators:
for f_name in generator["files"]: for file in generator.files:
manifest.append(f"{gen_name}/{f_name}".encode()) manifest.append(f"{generator.name}/{file.name}".encode())
manifest += hashes manifest += hashes
return b"\n".join(manifest) return b"\n".join(manifest)
@@ -152,24 +150,24 @@ class SecretStore(SecretStoreBase):
def upload(self, output_dir: Path) -> None: def upload(self, output_dir: Path) -> None:
with tarfile.open(output_dir / "secrets.tar.gz", "w:gz") as tar: with tarfile.open(output_dir / "secrets.tar.gz", "w:gz") as tar:
for gen_name, generator in self.machine.vars_generators.items(): for generator in self.machine.vars_generators:
dir_exists = False dir_exists = False
for f_name, file in generator["files"].items(): for file in generator.files:
if not file["deploy"]: if not file.deploy:
continue continue
if not file["secret"]: if not file.secret:
continue continue
if not dir_exists: if not dir_exists:
tar_dir = tarfile.TarInfo(name=gen_name) tar_dir = tarfile.TarInfo(name=generator.name)
tar_dir.type = tarfile.DIRTYPE tar_dir.type = tarfile.DIRTYPE
tar_dir.mode = 0o511 tar_dir.mode = 0o511
tar.addfile(tarinfo=tar_dir) tar.addfile(tarinfo=tar_dir)
dir_exists = True dir_exists = True
tar_file = tarfile.TarInfo(name=f"{gen_name}/{f_name}") tar_file = tarfile.TarInfo(name=f"{generator.name}/{file.name}")
content = self.get(gen_name, f_name, generator["share"]) content = self.get(generator, file.name)
tar_file.size = len(content) tar_file.size = len(content)
tar_file.mode = 0o440 tar_file.mode = 0o440
tar_file.uname = file.get("owner", "root") tar_file.uname = file.owner
tar_file.gname = file.get("group", "root") tar_file.gname = file.group
tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content)) tar.addfile(tarinfo=tar_file, fileobj=io.BytesIO(content))
(output_dir / ".pass_info").write_bytes(self.generate_hash()) (output_dir / ".pass_info").write_bytes(self.generate_hash())

View File

@@ -18,6 +18,7 @@ from clan_cli.secrets.secrets import (
has_secret, has_secret,
) )
from clan_cli.secrets.sops import KeyType, generate_private_key from clan_cli.secrets.sops import KeyType, generate_private_key
from clan_cli.vars.generate import Generator, Var
from . import SecretStoreBase from . import SecretStoreBase
@@ -42,11 +43,10 @@ class SecretStore(SecretStoreBase):
if not self.machine.vars_generators: if not self.machine.vars_generators:
return return
has_secrets = False has_secrets = False
for generator in self.machine.vars_generators.values(): for generator in self.machine.vars_generators:
if "files" in generator: for file in generator.files:
for file in generator["files"].values(): if file.secret:
if file["secret"]: has_secrets = True
has_secrets = True
if not has_secrets: if not has_secrets:
return return
@@ -67,9 +67,9 @@ class SecretStore(SecretStoreBase):
return "sops" return "sops"
def user_has_access( def user_has_access(
self, user: str, generator_name: str, secret_name: str, shared: bool self, user: str, generator: Generator, secret_name: str
) -> bool: ) -> bool:
secret_path = self.secret_path(generator_name, secret_name, shared=shared) secret_path = self.secret_path(generator, secret_name)
secret = json.loads((secret_path / "secret").read_text()) secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
users_folder_path = sops_users_folder(self.machine.flake_dir) users_folder_path = sops_users_folder(self.machine.flake_dir)
@@ -78,10 +78,8 @@ class SecretStore(SecretStoreBase):
] ]
return user_pubkey in recipients return user_pubkey in recipients
def machine_has_access( def machine_has_access(self, generator: Generator, secret_name: str) -> bool:
self, generator_name: str, secret_name: str, shared: bool secret_path = self.secret_path(generator, secret_name)
) -> bool:
secret_path = self.secret_path(generator_name, secret_name, shared)
secret = json.loads((secret_path / "secret").read_text()) secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])] recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
machines_folder_path = sops_machines_folder(self.machine.flake_dir) machines_folder_path = sops_machines_folder(self.machine.flake_dir)
@@ -90,20 +88,16 @@ class SecretStore(SecretStoreBase):
)["publickey"] )["publickey"]
return machine_pubkey in recipients return machine_pubkey in recipients
def secret_path( def secret_path(self, generator: Generator, secret_name: str) -> Path:
self, generator_name: str, secret_name: str, shared: bool = False return self.directory(generator, secret_name)
) -> Path:
return self.directory(generator_name, secret_name, shared=shared)
def _set( def _set(
self, self,
generator_name: str, generator: Generator,
name: str, var: Var,
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
secret_folder = self.secret_path(generator_name, name, shared) secret_folder = self.secret_path(generator, var.name)
# delete directory # delete directory
if secret_folder.exists() and not (secret_folder / "secret").exists(): if secret_folder.exists() and not (secret_folder / "secret").exists():
# another backend has used that folder before -> error out # another backend has used that folder before -> error out
@@ -115,15 +109,15 @@ class SecretStore(SecretStoreBase):
self.machine.flake_dir, self.machine.flake_dir,
secret_folder, secret_folder,
value, value,
add_machines=[self.machine.name] if deployed else [], add_machines=[self.machine.name] if var.deploy else [],
add_groups=self.machine.deployment["sops"]["defaultGroups"], add_groups=self.machine.deployment["sops"]["defaultGroups"],
git_commit=False, git_commit=False,
) )
return secret_folder return secret_folder
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
return decrypt_secret( return decrypt_secret(
self.machine.flake_dir, self.secret_path(generator_name, name, shared) self.machine.flake_dir, self.secret_path(generator, name)
).encode("utf-8") ).encode("utf-8")
def upload(self, output_dir: Path) -> None: def upload(self, output_dir: Path) -> None:
@@ -137,16 +131,14 @@ class SecretStore(SecretStoreBase):
) )
(output_dir / "key.txt").write_text(key) (output_dir / "key.txt").write_text(key)
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool: def exists(self, generator: Generator, name: str) -> bool:
secret_folder = self.secret_path(generator_name, name, shared) secret_folder = self.secret_path(generator, name)
return (secret_folder / "secret").exists() return (secret_folder / "secret").exists()
def ensure_machine_has_access( def ensure_machine_has_access(self, generator: Generator, name: str) -> None:
self, generator_name: str, name: str, shared: bool = False if self.machine_has_access(generator, name):
) -> None:
if self.machine_has_access(generator_name, name, shared):
return return
secret_folder = self.secret_path(generator_name, name, shared) secret_folder = self.secret_path(generator, name)
add_secret(self.machine.flake_dir, self.machine.name, secret_folder) add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]: def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]:
@@ -170,10 +162,8 @@ class SecretStore(SecretStoreBase):
return keys return keys
@override @override
def needs_fix( def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]:
self, generator_name: str, name: str, shared: bool secret_path = self.secret_path(generator, name)
) -> tuple[bool, str | None]:
secret_path = self.secret_path(generator_name, name, shared)
recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"] recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"]
current_recipients = {r["recipient"] for r in recipients_} current_recipients = {r["recipient"] for r in recipients_}
wanted_recipients = { wanted_recipients = {
@@ -181,19 +171,19 @@ class SecretStore(SecretStoreBase):
} }
needs_update = current_recipients != wanted_recipients needs_update = current_recipients != wanted_recipients
recipients_to_add = wanted_recipients - current_recipients recipients_to_add = wanted_recipients - current_recipients
var_id = f"{generator_name}/{name}" var_id = f"{generator.name}/{name}"
msg = ( msg = (
f"One or more recipient keys were added to secret{' shared' if shared else ''} var '{var_id}', but it was never re-encrypted. " f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. "
f"This could have been a malicious actor trying to add their keys, please investigate. " f"This could have been a malicious actor trying to add their keys, please investigate. "
f"Added keys: {', '.join(recipients_to_add)}" f"Added keys: {', '.join(recipients_to_add)}"
) )
return needs_update, msg return needs_update, msg
@override @override
def fix(self, generator_name: str, name: str, shared: bool) -> None: def fix(self, generator: Generator, name: str) -> None:
from clan_cli.secrets.secrets import update_keys from clan_cli.secrets.secrets import update_keys
secret_path = self.secret_path(generator_name, name, shared) secret_path = self.secret_path(generator, name)
update_keys( update_keys(
secret_path, secret_path,
collect_keys_for_path(secret_path), collect_keys_for_path(secret_path),

View File

@@ -3,6 +3,7 @@ from pathlib import Path
from clan_cli.dirs import vm_state_dir from clan_cli.dirs import vm_state_dir
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.vars.generate import Generator, Var
from . import SecretStoreBase from . import SecretStoreBase
@@ -19,19 +20,17 @@ class SecretStore(SecretStoreBase):
def _set( def _set(
self, self,
service: str, generator: Generator,
name: str, var: Var,
value: bytes, value: bytes,
shared: bool = False,
deployed: bool = True,
) -> Path | None: ) -> Path | None:
secret_file = self.dir / service / name secret_file = self.dir / generator.name / var.name
secret_file.parent.mkdir(parents=True, exist_ok=True) secret_file.parent.mkdir(parents=True, exist_ok=True)
secret_file.write_bytes(value) secret_file.write_bytes(value)
return None # we manage the files outside of the git repo return None # we manage the files outside of the git repo
def get(self, service: str, name: str, shared: bool = False) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
secret_file = self.dir / service / name secret_file = self.dir / generator.name / name
return secret_file.read_bytes() return secret_file.read_bytes()
def upload(self, output_dir: Path) -> None: def upload(self, output_dir: Path) -> None:

View File

@@ -6,8 +6,9 @@ from clan_cli.clan_uri import FlakeId
from clan_cli.completions import add_dynamic_completer, complete_machines from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.vars.get import get_var from clan_cli.vars.get import get_var
from clan_cli.vars.prompt import PromptType
from ._types import Var from .generate import Var
from .prompt import ask from .prompt import ask
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -31,7 +32,7 @@ def set_via_stdin(machine: str, var_id: str, flake: FlakeId) -> None:
_machine = Machine(name=machine, flake=flake) _machine = Machine(name=machine, flake=flake)
var = get_var(_machine, var_id) var = get_var(_machine, var_id)
if sys.stdin.isatty(): if sys.stdin.isatty():
new_value = ask(var.id, "hidden").encode("utf-8") new_value = ask(var.id, PromptType.HIDDEN).encode("utf-8")
else: else:
new_value = sys.stdin.buffer.read() new_value = sys.stdin.buffer.read()
set_var(_machine, var, new_value, flake) set_var(_machine, var, new_value, flake)

View File

@@ -0,0 +1,77 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from clan_cli.vars.generate import Generator
from ._types import StoreBase
@dataclass
class Var:
id: str
name: str
secret: bool = True
deploy: bool = False
owner: str = "root"
group: str = "root"
# TODO: those shouldn't be set here
_store: "StoreBase | None" = None
_generator: "Generator | None" = None
def store(self, store: "StoreBase") -> None:
self._store = store
def generator(self, generator: "Generator") -> None:
self._generator = generator
@property
def value(self) -> bytes:
assert self._store is not None
assert self._generator is not None
if not self._store.exists(self._generator, self.name):
msg = f"Var {self.id} has not been generated yet"
raise ValueError(msg)
# try decode the value or return <binary blob>
return self._store.get(self._generator, self.name)
@property
def printable_value(self) -> str:
try:
return self.value.decode()
except UnicodeDecodeError:
return "<binary blob>"
def set(self, value: bytes) -> None:
assert self._store is not None
assert self._generator is not None
self._store.set(self._generator, self, value)
@property
def exists(self) -> bool:
assert self._store is not None
assert self._generator is not None
return self._store.exists(self._generator, self.name)
def __str__(self) -> str:
if self._store is None or self._generator is None:
return f"{self.id}: <not initialized>"
# TODO: we don't want __str__ with side effects, this should be a separate method
if self._store.exists(self._generator, self.name):
if self.secret:
return f"{self.id}: ********"
return f"{self.id}: {self.printable_value}"
return f"{self.id}: <not set>"
@classmethod
def from_json(cls: type["Var"], generator_name: str, data: dict[str, Any]) -> "Var":
return cls(
id=f"{generator_name}/{data['name']}",
name=data["name"],
secret=data["secret"],
deploy=data["deploy"],
owner=data.get("owner", "root"),
group=data.get("group", "root"),
)

View File

@@ -1,6 +1,5 @@
import json import json
import shutil import shutil
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import pytest import pytest
@@ -10,8 +9,9 @@ from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_eval, run from clan_cli.nix import nix_eval, run
from clan_cli.vars.check import check_vars from clan_cli.vars.check import check_vars
from clan_cli.vars.generate import generate_vars_for_machine from clan_cli.vars.generate import Generator, generate_vars_for_machine
from clan_cli.vars.get import get_var from clan_cli.vars.get import get_var
from clan_cli.vars.graph import all_missing_closure, requested_closure
from clan_cli.vars.list import stringify_all_vars from clan_cli.vars.list import stringify_all_vars
from clan_cli.vars.public_modules import in_repo from clan_cli.vars.public_modules import in_repo
from clan_cli.vars.secret_modules import password_store, sops from clan_cli.vars.secret_modules import password_store, sops
@@ -48,31 +48,49 @@ def test_dependencies_as_files(temp_dir: Path) -> None:
def test_required_generators() -> None: def test_required_generators() -> None:
from clan_cli.vars.graph import all_missing_closure, requested_closure gen_1 = Generator(name="gen_1", dependencies=[])
gen_2 = Generator(name="gen_2", dependencies=["gen_1"])
@dataclass gen_2a = Generator(name="gen_2a", dependencies=["gen_2"])
class Generator: gen_2b = Generator(name="gen_2b", dependencies=["gen_2"])
dependencies: list[str]
exists: bool # result is already on disk
gen_1.exists = True
gen_2.exists = False
gen_2a.exists = False
gen_2b.exists = True
generators = { generators = {
"gen_1": Generator([], True), generator.name: generator for generator in [gen_1, gen_2, gen_2a, gen_2b]
"gen_2": Generator(["gen_1"], False),
"gen_2a": Generator(["gen_2"], False),
"gen_2b": Generator(["gen_2"], True),
} }
assert requested_closure(["gen_1"], generators) == [ def generator_names(generator: list[Generator]) -> list[str]:
return [gen.name for gen in generator]
assert generator_names(requested_closure(["gen_1"], generators)) == [
"gen_1", "gen_1",
"gen_2", "gen_2",
"gen_2a", "gen_2a",
"gen_2b", "gen_2b",
] ]
assert requested_closure(["gen_2"], generators) == ["gen_2", "gen_2a", "gen_2b"] assert generator_names(requested_closure(["gen_2"], generators)) == [
assert requested_closure(["gen_2a"], generators) == ["gen_2", "gen_2a", "gen_2b"] "gen_2",
assert requested_closure(["gen_2b"], generators) == ["gen_2", "gen_2a", "gen_2b"] "gen_2a",
"gen_2b",
]
assert generator_names(requested_closure(["gen_2a"], generators)) == [
"gen_2",
"gen_2a",
"gen_2b",
]
assert generator_names(requested_closure(["gen_2b"], generators)) == [
"gen_2",
"gen_2a",
"gen_2b",
]
assert all_missing_closure(generators) == ["gen_2", "gen_2a", "gen_2b"] assert generator_names(all_missing_closure(generators)) == [
"gen_2",
"gen_2a",
"gen_2b",
]
@pytest.mark.impure @pytest.mark.impure
@@ -96,8 +114,8 @@ def test_generate_public_var(
store = in_repo.FactStore( store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert store.exists("my_generator", "my_value") assert store.exists(Generator("my_generator"), "my_value")
assert store.get("my_generator", "my_value").decode() == "hello\n" assert store.get(Generator("my_generator"), "my_value").decode() == "hello\n"
vars_text = stringify_all_vars(machine) vars_text = stringify_all_vars(machine)
assert "my_generator/my_value: hello" in vars_text assert "my_generator/my_value: hello" in vars_text
vars_eval = run( vars_eval = run(
@@ -133,12 +151,12 @@ def test_generate_secret_var_sops(
in_repo_store = in_repo.FactStore( in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert not in_repo_store.exists("my_generator", "my_secret") assert not in_repo_store.exists(Generator("my_generator"), "my_secret")
sops_store = sops.SecretStore( sops_store = sops.SecretStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert sops_store.exists("my_generator", "my_secret") assert sops_store.exists(Generator("my_generator"), "my_secret")
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n" assert sops_store.get(Generator("my_generator"), "my_secret").decode() == "hello\n"
vars_text = stringify_all_vars(machine) vars_text = stringify_all_vars(machine)
assert "my_generator/my_secret" in vars_text assert "my_generator/my_secret" in vars_text
# test regeneration works # test regeneration works
@@ -168,12 +186,12 @@ def test_generate_secret_var_sops_with_default_group(
in_repo_store = in_repo.FactStore( in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert not in_repo_store.exists("my_generator", "my_secret") assert not in_repo_store.exists(Generator("my_generator"), "my_secret")
sops_store = sops.SecretStore( sops_store = sops.SecretStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert sops_store.exists("my_generator", "my_secret") assert sops_store.exists(Generator("my_generator"), "my_secret")
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n" assert sops_store.get(Generator("my_generator"), "my_secret").decode() == "hello\n"
# add another user and check if secret gets re-encrypted # add another user and check if secret gets re-encrypted
from clan_cli.secrets.sops import generate_private_key from clan_cli.secrets.sops import generate_private_key
@@ -197,7 +215,7 @@ def test_generate_secret_var_sops_with_default_group(
# check if new user can access the secret # check if new user can access the secret
monkeypatch.setenv("USER", "uschi") monkeypatch.setenv("USER", "uschi")
assert sops_store.user_has_access( assert sops_store.user_has_access(
"uschi", "my_generator", "my_secret", shared=False "uschi", Generator("my_generator", share=False), "my_secret"
) )
@@ -232,13 +250,17 @@ def test_generated_shared_secret_sops(
assert check_vars(machine2) assert check_vars(machine2)
m1_sops_store = sops.SecretStore(machine1) m1_sops_store = sops.SecretStore(machine1)
m2_sops_store = sops.SecretStore(machine2) m2_sops_store = sops.SecretStore(machine2)
assert m1_sops_store.exists("my_shared_generator", "my_shared_secret", shared=True) assert m1_sops_store.exists(
assert m2_sops_store.exists("my_shared_generator", "my_shared_secret", shared=True) Generator("my_shared_generator", share=True), "my_shared_secret"
)
assert m2_sops_store.exists(
Generator("my_shared_generator", share=True), "my_shared_secret"
)
assert m1_sops_store.machine_has_access( assert m1_sops_store.machine_has_access(
"my_shared_generator", "my_shared_secret", shared=True Generator("my_shared_generator", share=True), "my_shared_secret"
) )
assert m2_sops_store.machine_has_access( assert m2_sops_store.machine_has_access(
"my_shared_generator", "my_shared_secret", shared=True Generator("my_shared_generator", share=True), "my_shared_secret"
) )
@@ -277,11 +299,19 @@ def test_generate_secret_var_password_store(
store = password_store.SecretStore( store = password_store.SecretStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert store.exists("my_generator", "my_secret", shared=False) assert store.exists(Generator("my_generator", share=False, files=[]), "my_secret")
assert not store.exists("my_generator", "my_secret", shared=True) assert not store.exists(
assert store.exists("my_shared_generator", "my_shared_secret", shared=True) Generator("my_generator", share=True, files=[]), "my_secret"
assert not store.exists("my_shared_generator", "my_shared_secret", shared=False) )
assert store.get("my_generator", "my_secret", shared=False).decode() == "hello\n" assert store.exists(
Generator("my_shared_generator", share=True, files=[]), "my_shared_secret"
)
assert not store.exists(
Generator("my_shared_generator", share=False, files=[]), "my_shared_secret"
)
generator = Generator(name="my_generator", share=False, files=[])
assert store.get(generator, "my_secret").decode() == "hello\n"
vars_text = stringify_all_vars(machine) vars_text = stringify_all_vars(machine)
assert "my_generator/my_secret" in vars_text assert "my_generator/my_secret" in vars_text
@@ -321,10 +351,16 @@ def test_generate_secret_for_multiple_machines(
in_repo_store2 = in_repo.FactStore( in_repo_store2 = in_repo.FactStore(
Machine(name="machine2", flake=FlakeId(str(flake.path))) Machine(name="machine2", flake=FlakeId(str(flake.path)))
) )
assert in_repo_store1.exists("my_generator", "my_value") assert in_repo_store1.exists(Generator("my_generator"), "my_value")
assert in_repo_store2.exists("my_generator", "my_value") assert in_repo_store2.exists(Generator("my_generator"), "my_value")
assert in_repo_store1.get("my_generator", "my_value").decode() == "machine1\n" assert (
assert in_repo_store2.get("my_generator", "my_value").decode() == "machine2\n" in_repo_store1.get(Generator("my_generator"), "my_value").decode()
== "machine1\n"
)
assert (
in_repo_store2.get(Generator("my_generator"), "my_value").decode()
== "machine2\n"
)
# check if secret vars have been created correctly # check if secret vars have been created correctly
sops_store1 = sops.SecretStore( sops_store1 = sops.SecretStore(
Machine(name="machine1", flake=FlakeId(str(flake.path))) Machine(name="machine1", flake=FlakeId(str(flake.path)))
@@ -332,10 +368,14 @@ def test_generate_secret_for_multiple_machines(
sops_store2 = sops.SecretStore( sops_store2 = sops.SecretStore(
Machine(name="machine2", flake=FlakeId(str(flake.path))) Machine(name="machine2", flake=FlakeId(str(flake.path)))
) )
assert sops_store1.exists("my_generator", "my_secret") assert sops_store1.exists(Generator("my_generator"), "my_secret")
assert sops_store2.exists("my_generator", "my_secret") assert sops_store2.exists(Generator("my_generator"), "my_secret")
assert sops_store1.get("my_generator", "my_secret").decode() == "machine1\n" assert (
assert sops_store2.get("my_generator", "my_secret").decode() == "machine2\n" sops_store1.get(Generator("my_generator"), "my_secret").decode() == "machine1\n"
)
assert (
sops_store2.get(Generator("my_generator"), "my_secret").decode() == "machine2\n"
)
@pytest.mark.impure @pytest.mark.impure
@@ -357,10 +397,16 @@ def test_dependant_generators(
in_repo_store = in_repo.FactStore( in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert in_repo_store.exists("parent_generator", "my_value") assert in_repo_store.exists(Generator("parent_generator"), "my_value")
assert in_repo_store.get("parent_generator", "my_value").decode() == "hello\n" assert (
assert in_repo_store.exists("child_generator", "my_value") in_repo_store.get(Generator("parent_generator"), "my_value").decode()
assert in_repo_store.get("child_generator", "my_value").decode() == "hello\n" == "hello\n"
)
assert in_repo_store.exists(Generator("child_generator"), "my_value")
assert (
in_repo_store.get(Generator("child_generator"), "my_value").decode()
== "hello\n"
)
@pytest.mark.impure @pytest.mark.impure
@@ -395,8 +441,10 @@ def test_prompt(
in_repo_store = in_repo.FactStore( in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert in_repo_store.exists("my_generator", "my_value") assert in_repo_store.exists(Generator("my_generator"), "my_value")
assert in_repo_store.get("my_generator", "my_value").decode() == input_value assert (
in_repo_store.get(Generator("my_generator"), "my_value").decode() == input_value
)
@pytest.mark.impure @pytest.mark.impure
@@ -437,15 +485,25 @@ def test_share_flag(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
# check secrets stored correctly # check secrets stored correctly
assert sops_store.exists("shared_generator", "my_secret", shared=True) assert sops_store.exists(Generator("shared_generator", share=True), "my_secret")
assert not sops_store.exists("shared_generator", "my_secret", shared=False) assert not sops_store.exists(
assert sops_store.exists("unshared_generator", "my_secret", shared=False) Generator("shared_generator", share=False), "my_secret"
assert not sops_store.exists("unshared_generator", "my_secret", shared=True) )
assert sops_store.exists(Generator("unshared_generator", share=False), "my_secret")
assert not sops_store.exists(
Generator("unshared_generator", share=True), "my_secret"
)
# check values stored correctly # check values stored correctly
assert in_repo_store.exists("shared_generator", "my_value", shared=True) assert in_repo_store.exists(Generator("shared_generator", share=True), "my_value")
assert not in_repo_store.exists("shared_generator", "my_value", shared=False) assert not in_repo_store.exists(
assert in_repo_store.exists("unshared_generator", "my_value", shared=False) Generator("shared_generator", share=False), "my_value"
assert not in_repo_store.exists("unshared_generator", "my_value", shared=True) )
assert in_repo_store.exists(
Generator("unshared_generator", share=False), "my_value"
)
assert not in_repo_store.exists(
Generator("unshared_generator", share=True), "my_value"
)
vars_eval = run( vars_eval = run(
nix_eval( nix_eval(
[ [
@@ -505,9 +563,13 @@ def test_prompt_create_file(
sops_store = sops.SecretStore( sops_store = sops.SecretStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert sops_store.exists("my_generator", "prompt1") assert sops_store.exists(
assert not sops_store.exists("my_generator", "prompt2") Generator(name="my_generator", share=False, files=[]), "prompt1"
assert sops_store.get("my_generator", "prompt1").decode() == "input1" )
assert not sops_store.exists(Generator(name="my_generator"), "prompt2")
assert (
sops_store.get(Generator(name="my_generator"), "prompt1").decode() == "input1"
)
@pytest.mark.impure @pytest.mark.impure
@@ -558,8 +620,8 @@ def test_api_set_prompts(
], ],
) )
store = in_repo.FactStore(machine) store = in_repo.FactStore(machine)
assert store.exists("my_generator", "prompt1") assert store.exists(Generator("my_generator"), "prompt1")
assert store.get("my_generator", "prompt1").decode() == "input1" assert store.get(Generator("my_generator"), "prompt1").decode() == "input1"
set_prompts( set_prompts(
machine, machine,
[ [
@@ -569,7 +631,7 @@ def test_api_set_prompts(
) )
], ],
) )
assert store.get("my_generator", "prompt1").decode() == "input2" assert store.get(Generator("my_generator"), "prompt1").decode() == "input2"
@pytest.mark.impure @pytest.mark.impure
@@ -771,8 +833,8 @@ def test_migration_skip(
in_repo_store = in_repo.FactStore( in_repo_store = in_repo.FactStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert in_repo_store.exists("my_generator", "my_value") assert in_repo_store.exists(Generator("my_generator"), "my_value")
assert in_repo_store.get("my_generator", "my_value").decode() == "world" assert in_repo_store.get(Generator("my_generator"), "my_value").decode() == "world"
@pytest.mark.impure @pytest.mark.impure
@@ -805,10 +867,10 @@ def test_migration(
sops_store = sops.SecretStore( sops_store = sops.SecretStore(
Machine(name="my_machine", flake=FlakeId(str(flake.path))) Machine(name="my_machine", flake=FlakeId(str(flake.path)))
) )
assert in_repo_store.exists("my_generator", "my_value") assert in_repo_store.exists(Generator("my_generator"), "my_value")
assert in_repo_store.get("my_generator", "my_value").decode() == "hello" assert in_repo_store.get(Generator("my_generator"), "my_value").decode() == "hello"
assert sops_store.exists("my_generator", "my_secret") assert sops_store.exists(Generator("my_generator"), "my_secret")
assert sops_store.get("my_generator", "my_secret").decode() == "hello" assert sops_store.get(Generator("my_generator"), "my_secret").decode() == "hello"
@pytest.mark.impure @pytest.mark.impure