Merge pull request 'clan-cli: machines delete: delete the machine's vars and secrets' (#2994) from lopter/clan-core:lo-machines-delete into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2994
This commit is contained in:
Luis Hebendanz
2025-03-19 12:25:48 +00:00
17 changed files with 390 additions and 153 deletions

View File

@@ -208,16 +208,11 @@ def complete_secrets(
Provides completion functionality for clan secrets Provides completion functionality for clan secrets
""" """
from .clan_uri import Flake from .clan_uri import Flake
from .secrets.secrets import ListSecretsOptions, list_secrets from .secrets.secrets import list_secrets
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "." flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
options = ListSecretsOptions( secrets = list_secrets(Flake(flake).path)
flake=Flake(flake),
pattern=None,
)
secrets = list_secrets(options.flake.path, options.pattern)
secrets_dict = dict.fromkeys(secrets, "secret") secrets_dict = dict.fromkeys(secrets, "secret")
return secrets_dict return secrets_dict

View File

@@ -431,7 +431,7 @@ def delete_by_path(d: dict[str, Any], path: str) -> Any:
""" """
if not path: if not path:
msg = "Cannot delete. Path is empty." msg = "Cannot delete. Path is empty."
raise ClanError(msg) raise KeyError(msg)
keys = path.split(".") keys = path.split(".")
current = d current = d
@@ -439,17 +439,17 @@ def delete_by_path(d: dict[str, Any], path: str) -> Any:
# Navigate to the parent dictionary of the final key # Navigate to the parent dictionary of the final key
for key in keys[:-1]: for key in keys[:-1]:
if key not in current or not isinstance(current[key], dict): if key not in current or not isinstance(current[key], dict):
msg = f"Cannot delete. Key '{path}' not found or not a dictionary '{d}'." msg = f"Cannot delete. Key '{path}' not found or not a dictionary '{d}'"
raise ClanError(msg) raise KeyError(msg)
current = current[key] current = current[key]
# Attempt to pop the final key # Attempt to pop the final key
last_key = keys[-1] last_key = keys[-1]
try: try:
value = current.pop(last_key) value = current.pop(last_key)
except KeyError: except KeyError as exc:
msg = f"Cannot delete. Path '{path}' not found in data '{d}'. " msg = f"Cannot delete. Path '{path}' not found in data '{d}'"
raise ClanError(msg) from KeyError raise KeyError(msg) from exc
else: else:
return {last_key: value} return {last_key: value}

View File

@@ -1,25 +1,66 @@
import argparse import argparse
import logging import logging
import shutil import shutil
from pathlib import Path
from clan_cli import inventory
from clan_cli.api import API from clan_cli.api import API
from clan_cli.clan_uri import Flake from clan_cli.clan_uri import Flake
from clan_cli.completions import add_dynamic_completer, complete_machines from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.dirs import specific_machine_dir from clan_cli.dirs import specific_machine_dir
from clan_cli.inventory import delete from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.machines import has_machine as secrets_has_machine
from clan_cli.secrets.machines import remove_machine as secrets_machine_remove
from clan_cli.secrets.secrets import (
list_secrets,
)
from clan_cli.vars.list import (
public_store as vars_public_store,
)
from clan_cli.vars.list import (
secret_store as vars_secret_store,
)
from .machines import Machine
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@API.register @API.register
def delete_machine(flake: Flake, name: str) -> None: def delete_machine(flake: Flake, name: str) -> None:
delete(str(flake.path), {f"machines.{name}"}) try:
inventory.delete(str(flake.path), {f"machines.{name}"})
except KeyError as exc:
# louis@(2025-03-09): test infrastructure does not seem to set the
# inventory properly, but more importantly only one machine in my
# personal clan ended up in the inventory for some reason, so I think
# it makes sense to eat the exception here.
log.warning(
f"{name} was missing or already deleted from the machines inventory: {exc}"
)
changed_paths: list[Path] = []
# Remove the machine directory
folder = specific_machine_dir(flake.path, name) folder = specific_machine_dir(flake.path, name)
if folder.exists(): if folder.exists():
changed_paths.append(folder)
shutil.rmtree(folder) shutil.rmtree(folder)
# louis@(2025-02-04): clean-up legacy (pre-vars) secrets:
sops_folder = sops_secrets_folder(flake.path)
filter_fn = lambda secret_name: secret_name.startswith(f"{name}-")
for secret_name in list_secrets(flake.path, filter_fn):
secret_path = sops_folder / secret_name
changed_paths.append(secret_path)
shutil.rmtree(secret_path)
machine = Machine(name, flake)
changed_paths.extend(vars_public_store(machine).delete_store())
changed_paths.extend(vars_secret_store(machine).delete_store())
# Remove the machine's key, and update secrets & vars that referenced it:
if secrets_has_machine(flake.path, name):
secrets_machine_remove(flake.path, name)
def delete_command(args: argparse.Namespace) -> None: def delete_command(args: argparse.Namespace) -> None:
delete_machine(args.flake, args.name) delete_machine(args.flake, args.name)

View File

@@ -6,7 +6,6 @@ import os
import shutil import shutil
import sys import sys
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import IO from typing import IO
@@ -18,7 +17,6 @@ from clan_cli.completions import (
complete_users, complete_users,
) )
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.flake import Flake
from clan_cli.git import commit_files from clan_cli.git import commit_files
from . import sops from . import sops
@@ -325,31 +323,26 @@ def has_secret(secret_path: Path) -> bool:
return (secret_path / "secret").exists() return (secret_path / "secret").exists()
def list_secrets(flake_dir: Path, pattern: str | None = None) -> list[str]: def list_secrets(
flake_dir: Path, filter_fn: Callable[[str], bool] | None = None
) -> list[str]:
path = sops_secrets_folder(flake_dir) path = sops_secrets_folder(flake_dir)
def validate(name: str) -> bool: def validate(name: str) -> bool:
return ( return (
VALID_SECRET_NAME.match(name) is not None VALID_SECRET_NAME.match(name) is not None
and has_secret(sops_secrets_folder(flake_dir) / name) and has_secret(sops_secrets_folder(flake_dir) / name)
and (pattern is None or pattern in name) and (filter_fn is None or filter_fn(name) is True)
) )
return list_objects(path, validate) return list_objects(path, validate)
@dataclass
class ListSecretsOptions:
flake: Flake
pattern: str | None
def list_command(args: argparse.Namespace) -> None: def list_command(args: argparse.Namespace) -> None:
options = ListSecretsOptions( def filter_fn(name: str) -> bool:
flake=args.flake, return args.pattern in name
pattern=args.pattern,
) lst = list_secrets(args.flake.path, filter_fn if args.pattern else None)
lst = list_secrets(options.flake.path, options.pattern)
if len(lst) > 0: if len(lst) > 0:
print("\n".join(lst)) print("\n".join(lst))

View File

@@ -1,5 +1,6 @@
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -122,6 +123,29 @@ class StoreBase(ABC):
) )
return new_file return new_file
@abstractmethod
def delete(self, generator: "Generator", name: str) -> Iterable[Path]:
"""Remove a var from the store.
:return: An iterable of affected paths in the git repository. This
may be empty if the store is outside of the repository.
"""
@abstractmethod
def delete_store(self) -> Iterable[Path]:
"""Delete the store (all vars) for this machine.
.. note::
This does not make the distinction between public and private vars.
Since the public and private store of a machine can be co-located
under the same directory, this method's implementation has to be
idempotent.
:return: An iterable of affected paths in the git repository. This
may be empty if the store was outside of the repository.
"""
def get_validation(self, generator: "Generator") -> str | None: def get_validation(self, generator: "Generator") -> str | None:
""" """
Return the invalidation hash that indicates if a generator needs to be re-run Return the invalidation hash that indicates if a generator needs to be re-run

View File

@@ -1,4 +1,5 @@
import shutil import shutil
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -30,15 +31,14 @@ class FactStore(StoreBase):
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}" msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg) raise ClanError(msg)
folder = self.directory(generator, var.name) folder = self.directory(generator, var.name)
file_path = folder / "value"
if folder.exists(): if folder.exists():
if not (folder / "value").exists(): if not file_path.exists():
# another backend has used that folder before -> error out # another backend has used that folder before -> error out
self.backend_collision_error(folder) self.backend_collision_error(folder)
shutil.rmtree(folder) shutil.rmtree(folder)
# re-create directory # re-create directory
file_path = folder / "value"
folder.mkdir(parents=True, exist_ok=True) folder.mkdir(parents=True, exist_ok=True)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch() file_path.touch()
file_path.write_bytes(value) file_path.write_bytes(value)
return file_path return file_path
@@ -50,6 +50,24 @@ class FactStore(StoreBase):
def exists(self, generator: Generator, name: str) -> bool: def exists(self, generator: Generator, name: str) -> bool:
return (self.directory(generator, name) / "value").exists() return (self.directory(generator, name) / "value").exists()
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
fact_folder = self.directory(generator, name)
fact_file = fact_folder / "value"
fact_file.unlink()
empty = None
if next(fact_folder.iterdir(), empty) is not empty:
return [fact_file]
fact_folder.rmdir()
return [fact_folder]
def delete_store(self) -> Iterable[Path]:
flake_root = Path(self.machine.flake_dir)
store_folder = flake_root / "vars/per-machine" / self.machine.name
if not store_folder.exists():
return []
shutil.rmtree(store_folder)
return [store_folder]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
msg = "populate_dir is not implemented for public vars stores" msg = "populate_dir is not implemented for public vars stores"
raise NotImplementedError(msg) raise NotImplementedError(msg)

View File

@@ -1,4 +1,6 @@
import logging import logging
import shutil
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from clan_cli.dirs import vm_state_dir from clan_cli.dirs import vm_state_dir
@@ -48,6 +50,21 @@ class FactStore(StoreBase):
msg = f"Fact {name} for service {generator.name} not found" msg = f"Fact {name} for service {generator.name} not found"
raise ClanError(msg) raise ClanError(msg)
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
fact_dir = self.dir / generator.name
fact_file = fact_dir / name
fact_file.unlink()
empty = None
if next(fact_dir.iterdir(), empty) is empty:
fact_dir.rmdir()
return [fact_file]
def delete_store(self) -> Iterable[Path]:
if not self.dir.exists():
return []
shutil.rmtree(self.dir)
return [self.dir]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
msg = "populate_dir is not implemented for public vars stores" msg = "populate_dir is not implemented for public vars stores"
raise NotImplementedError(msg) raise NotImplementedError(msg)

View File

@@ -2,11 +2,12 @@ import io
import logging import logging
import os import os
import tarfile import tarfile
from collections.abc import Iterable
from itertools import chain from itertools import chain
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, RunOpts, run from clan_cli.cmd import CmdOut, Log, RunOpts, run
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from clan_cli.ssh.upload import upload from clan_cli.ssh.upload import upload
@@ -35,52 +36,55 @@ class SecretStore(StoreBase):
return backend return backend
@property @property
def _password_store_dir(self) -> str: def _password_store_dir(self) -> Path:
if self._store_backend == "passage": if self._store_backend == "passage":
return os.environ.get("PASSAGE_DIR", f"{os.environ['HOME']}/.passage/store") lookup = os.environ.get("PASSAGE_DIR")
return os.environ.get( default = Path.home() / ".passage/store"
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store" else:
) lookup = os.environ.get("PASSWORD_STORE_DIR")
default = Path.home() / ".password-store"
return Path(lookup) if lookup else default
def entry_dir(self, generator: Generator, name: str) -> Path: def entry_dir(self, generator: Generator, name: str) -> Path:
return Path(self.entry_prefix) / self.rel_dir(generator, name) return Path(self.entry_prefix) / self.rel_dir(generator, name)
def _run_pass(self, *args: str, options: RunOpts | None = None) -> CmdOut:
cmd = nix_shell(packages=["nixpkgs#pass"], cmd=[self._store_backend, *args])
return run(cmd, options)
def _set( def _set(
self, self,
generator: Generator, generator: Generator,
var: Var, var: Var,
value: bytes, value: bytes,
) -> Path | None: ) -> Path | None:
run( pass_call = ["insert", "-m", str(self.entry_dir(generator, var.name))]
nix_shell( self._run_pass(*pass_call, options=RunOpts(input=value, check=True))
[f"nixpkgs#{self._store_backend}"],
[
f"{self._store_backend}",
"insert",
"-m",
str(self.entry_dir(generator, var.name)),
],
),
RunOpts(input=value, check=True),
)
return None # we manage the files outside of the git repo return None # we manage the files outside of the git repo
def get(self, generator: Generator, name: str) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
return run( pass_name = str(self.entry_dir(generator, name))
nix_shell( return self._run_pass("show", pass_name).stdout.encode()
[f"nixpkgs#{self._store_backend}"],
[
f"{self._store_backend}",
"show",
str(self.entry_dir(generator, name)),
],
),
).stdout.encode()
def exists(self, generator: Generator, name: str) -> bool: def exists(self, generator: Generator, name: str) -> bool:
extension = "age" if self._store_backend == "passage" else "gpg" extension = "age" if self._store_backend == "passage" else "gpg"
filename = f"{self.entry_dir(generator, name)}.{extension}" filename = f"{self.entry_dir(generator, name)}.{extension}"
return (Path(self._password_store_dir) / filename).exists() return (self._password_store_dir / filename).exists()
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
pass_name = str(self.entry_dir(generator, name))
self._run_pass("rm", "--force", pass_name, options=RunOpts(check=True))
return []
def delete_store(self) -> Iterable[Path]:
machine_dir = Path(self.entry_prefix) / "per-machine" / self.machine.name
if not (self._password_store_dir / machine_dir).exists():
# The directory may not exist if the machine
# has no vars, or they have been deleted already.
return []
pass_call = ["rm", "--force", "--recursive", str(machine_dir)]
self._run_pass(*pass_call, options=RunOpts(check=True))
return []
def generate_hash(self) -> bytes: def generate_hash(self) -> bytes:
hashes = [] hashes = []
@@ -91,7 +95,7 @@ class SecretStore(StoreBase):
[ [
"git", "git",
"-C", "-C",
self._password_store_dir, str(self._password_store_dir),
"log", "log",
"-1", "-1",
"--format=%H", "--format=%H",
@@ -103,9 +107,9 @@ class SecretStore(StoreBase):
.stdout.strip() .stdout.strip()
.encode() .encode()
) )
shared_dir = Path(self._password_store_dir) / self.entry_prefix / "shared" shared_dir = self._password_store_dir / self.entry_prefix / "shared"
machine_dir = ( machine_dir = (
Path(self._password_store_dir) self._password_store_dir
/ self.entry_prefix / self.entry_prefix
/ "per-machine" / "per-machine"
/ self.machine.name / self.machine.name
@@ -119,7 +123,7 @@ class SecretStore(StoreBase):
[ [
"git", "git",
"-C", "-C",
self._password_store_dir, str(self._password_store_dir),
"log", "log",
"-1", "-1",
"--format=%H", "--format=%H",

View File

@@ -1,3 +1,5 @@
import shutil
from collections.abc import Iterable
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -162,6 +164,19 @@ class SecretStore(StoreBase):
self.machine.flake_dir, self.secret_path(generator, name) self.machine.flake_dir, self.secret_path(generator, name)
).encode("utf-8") ).encode("utf-8")
def delete(self, generator: "Generator", name: str) -> Iterable[Path]:
secret_dir = self.directory(generator, name)
shutil.rmtree(secret_dir)
return [secret_dir]
def delete_store(self) -> Iterable[Path]:
flake_root = Path(self.machine.flake_dir)
store_folder = flake_root / "vars/per-machine" / self.machine.name
if not store_folder.exists():
return []
shutil.rmtree(store_folder)
return [store_folder]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if "users" in phases or "services" in phases: if "users" in phases or "services" in phases:
key_name = f"{self.machine.name}-age.key" key_name = f"{self.machine.name}-age.key"

View File

@@ -1,4 +1,5 @@
import shutil import shutil
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from clan_cli.dirs import vm_state_dir from clan_cli.dirs import vm_state_dir
@@ -39,6 +40,21 @@ class SecretStore(StoreBase):
secret_file = self.dir / generator.name / name secret_file = self.dir / generator.name / name
return secret_file.read_bytes() return secret_file.read_bytes()
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
secret_dir = self.dir / generator.name
secret_file = secret_dir / name
secret_file.unlink()
empty = None
if next(secret_dir.iterdir(), empty) is empty:
secret_dir.rmdir()
return [secret_file]
def delete_store(self) -> Iterable[Path]:
if not self.dir.exists():
return []
shutil.rmtree(self.dir)
return [self.dir]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if output_dir.exists(): if output_dir.exists():
shutil.rmtree(output_dir) shutil.rmtree(output_dir)

View File

@@ -34,6 +34,11 @@ log_level = "DEBUG"
log_format = "%(message)s" log_format = "%(message)s"
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --new-first -W error -n auto" # Add --pdb for debugging addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --new-first -W error -n auto" # Add --pdb for debugging
norecursedirs = ["clan_cli/tests/helpers", "clan_cli/nixpkgs"] norecursedirs = ["clan_cli/tests/helpers", "clan_cli/nixpkgs"]
# All tests which evaluate any nix library code from clan-core need to use the
# `with_core` marker, so basically all tests which evaluate a flake with
# machines. In the CI pipeline we run these tests in a separate derivation
# depending on clan-core. All other tests do not need to depend on clan-core
# and can be cached more effectively.
markers = ["impure", "with_core"] markers = ["impure", "with_core"]
filterwarnings = "default::ResourceWarning" filterwarnings = "default::ResourceWarning"

View File

@@ -1,7 +1,9 @@
import json
import os import os
from pathlib import Path from pathlib import Path
import pytest import pytest
from clan_cli.secrets.folders import sops_secrets_folder
from helpers import cli from helpers import cli
@@ -12,13 +14,19 @@ class KeyPair:
class SopsSetup: class SopsSetup:
"""Hold a list of three key pairs and create an "admin" user in the clan.
The first key in the list is used as the admin key and
the private part of the key is exposed in the
`SOPS_AGE_KEY` environment variable, the two others can
be used to add machines or other users.
"""
def __init__(self, keys: list[KeyPair]) -> None: def __init__(self, keys: list[KeyPair]) -> None:
self.keys = keys self.keys = keys
self.user = os.environ.get("USER", "admin")
def init(self, flake_path: Path | None = None) -> None: def init(self, flake_path: Path) -> None:
if flake_path is None:
flake_path = Path.cwd()
self.user = os.environ.get("USER", "user")
cli.run( cli.run(
[ [
"vars", "vars",
@@ -49,9 +57,6 @@ KEYS = [
@pytest.fixture @pytest.fixture
def age_keys() -> list[KeyPair]: def age_keys() -> list[KeyPair]:
"""
Root directory of the tests
"""
return KEYS return KEYS
@@ -59,8 +64,33 @@ def age_keys() -> list[KeyPair]:
def sops_setup( def sops_setup(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
) -> SopsSetup: ) -> SopsSetup:
"""
Root directory of the tests
"""
monkeypatch.setenv("SOPS_AGE_KEY", KEYS[0].privkey) monkeypatch.setenv("SOPS_AGE_KEY", KEYS[0].privkey)
return SopsSetup(KEYS) return SopsSetup(KEYS)
# louis@(2025-03-10): right now this is specific to the `sops/secrets` folder,
# but we could make it generic to any sops file if the need arises.
def assert_secrets_file_recipients(
flake_path: Path,
secret_name: str,
expected_age_recipients_keypairs: list["KeyPair"],
err_msg: str | None = None,
) -> None:
"""Checks that the recipients of a secrets file matches expectations.
This looks up the `secret` file for `secret_name` in the `sops` directory
under `flake_path`.
:param err_msg: in case of failure, if you gave an error message then it
will be displayed, otherwise pytest will display the two different sets
of recipients.
"""
sops_file = sops_secrets_folder(flake_path) / secret_name / "secret"
with sops_file.open("rb") as fp:
sops_data = json.load(fp)
age_recipients = {each["recipient"] for each in sops_data["sops"]["age"]}
expected_age_recipients = {pair.pubkey for pair in expected_age_recipients_keypairs}
if not err_msg:
assert age_recipients == expected_age_recipients
return
assert age_recipients == expected_age_recipients, err_msg

View File

@@ -9,6 +9,7 @@ from collections.abc import Callable, Iterator
from pathlib import Path from pathlib import Path
from typing import Any, NamedTuple from typing import Any, NamedTuple
import age_keys
import pytest import pytest
from clan_cli.dirs import TemplateType, clan_templates, nixpkgs_source from clan_cli.dirs import TemplateType, clan_templates, nixpkgs_source
from clan_cli.locked_open import locked_open from clan_cli.locked_open import locked_open
@@ -229,6 +230,15 @@ def flake(
return minimal_flake_template.copy(temporary_home, monkeypatch) return minimal_flake_template.copy(temporary_home, monkeypatch)
@pytest.fixture
def flake_with_sops(
flake: ClanFlake,
sops_setup: age_keys.SopsSetup,
) -> ClanFlake:
sops_setup.init(flake.path)
return flake
def create_flake( def create_flake(
temporary_home: Path, temporary_home: Path,
flake_template: str | Path, flake_template: str | Path,

View File

@@ -1,13 +1,15 @@
import fixtures_flakes
import pytest import pytest
from age_keys import SopsSetup, assert_secrets_file_recipients
from clan_cli.inventory import load_inventory_json from clan_cli.inventory import load_inventory_json
from fixtures_flakes import FlakeForTest from clan_cli.secrets.folders import sops_machines_folder
from helpers import cli from helpers import cli
from stdout import CaptureOutput from stdout import CaptureOutput
@pytest.mark.impure @pytest.mark.impure
def test_machine_subcommands( def test_machine_subcommands(
test_flake_with_core: FlakeForTest, test_flake_with_core: fixtures_flakes.FlakeForTest,
capture_output: CaptureOutput, capture_output: CaptureOutput,
) -> None: ) -> None:
cli.run( cli.run(
@@ -47,3 +49,76 @@ def test_machine_subcommands(
assert "machine1" not in output.out assert "machine1" not in output.out
assert "vm1" in output.out assert "vm1" in output.out
assert "vm2" in output.out assert "vm2" in output.out
@pytest.mark.with_core
def test_machine_delete(
monkeypatch: pytest.MonkeyPatch,
flake_with_sops: fixtures_flakes.ClanFlake,
sops_setup: SopsSetup,
) -> None:
flake = flake_with_sops
admin_key, machine_key, machine2_key = sops_setup.keys
# create a couple machines with their keys
for name, key in (("my-machine", machine_key), ("my-machine2", machine2_key)):
cli.run(["machines", "create", f"--flake={flake.path}", name])
add_machine_key = [
"secrets",
"machines",
"add",
f"--flake={flake.path}",
name,
key.pubkey,
]
cli.run(add_machine_key)
# create a secret shared by both machines
shared_secret_name = "shared_secret"
with monkeypatch.context():
monkeypatch.setenv("SOPS_NIX_SECRET", "secret_value")
set_shared_secret = [
"secrets",
"set",
f"--flake={flake.path}",
"--machine=my-machine",
"--machine=my-machine2",
shared_secret_name,
]
cli.run(set_shared_secret)
my_machine_sops_folder = sops_machines_folder(flake.path) / "my-machine"
assert my_machine_sops_folder.is_dir(), (
"A sops folder for `my-machine` should have been created with its public key"
)
# define some vars generator for `my-machine`:
config = flake.machines["my-machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_value"]["secret"] = False
my_generator["files"]["my_secret"]["secret"] = True
my_generator["script"] = (
"echo -n public > $out/my_value;"
"echo -n secret > $out/my_secret;"
"echo -n non-default > $out/value_with_default"
)
flake.refresh() # saves "my_generator"
monkeypatch.chdir(flake.path)
cli.run(["vars", "generate", "--flake", str(flake.path), "my-machine"])
my_machine_vars_store = flake.path / "vars/per-machine" / "my-machine"
assert my_machine_vars_store.is_dir(), (
"A vars directory should have been created for `my-machine`"
)
cli.run(["machines", "delete", "--flake", str(flake.path), "my-machine"])
assert not my_machine_vars_store.exists(), (
"The vars directory for `my-machine` should have been deleted"
)
assert not my_machine_sops_folder.exists(), (
"The sops folder holding the public key for `my-machine` should have been deleted"
)
expected_recipients = [admin_key, machine2_key]
assert_secrets_file_recipients(flake.path, shared_secret_name, expected_recipients)

View File

@@ -557,7 +557,7 @@ def test_delete_top_level() -> None:
def test_delete_key_not_found() -> None: def test_delete_key_not_found() -> None:
data = {"foo": {"bar": 1}} data = {"foo": {"bar": 1}}
# Trying to delete a non-existing key "foo.baz" # Trying to delete a non-existing key "foo.baz"
with pytest.raises(ClanError) as excinfo: with pytest.raises(KeyError) as excinfo:
delete_by_path(data, "foo.baz") delete_by_path(data, "foo.baz")
assert "Cannot delete. Path 'foo.baz'" in str(excinfo.value) assert "Cannot delete. Path 'foo.baz'" in str(excinfo.value)
# Data should remain unchanged # Data should remain unchanged
@@ -567,7 +567,7 @@ def test_delete_key_not_found() -> None:
def test_delete_intermediate_not_dict() -> None: def test_delete_intermediate_not_dict() -> None:
data = {"foo": "not a dict"} data = {"foo": "not a dict"}
# Trying to go deeper into a non-dict value # Trying to go deeper into a non-dict value
with pytest.raises(ClanError) as excinfo: with pytest.raises(KeyError) as excinfo:
delete_by_path(data, "foo.bar") delete_by_path(data, "foo.bar")
assert "not found or not a dictionary" in str(excinfo.value) assert "not found or not a dictionary" in str(excinfo.value)
# Data should remain unchanged # Data should remain unchanged
@@ -577,7 +577,7 @@ def test_delete_intermediate_not_dict() -> None:
def test_delete_empty_path() -> None: def test_delete_empty_path() -> None:
data = {"foo": {"bar": 1}} data = {"foo": {"bar": 1}}
# Attempting to delete with an empty path # Attempting to delete with an empty path
with pytest.raises(ClanError) as excinfo: with pytest.raises(KeyError) as excinfo:
delete_by_path(data, "") delete_by_path(data, "")
# Depending on how you handle empty paths, you might raise an error or handle it differently. # Depending on how you handle empty paths, you might raise an error or handle it differently.
# If you do raise an error, check the message. # If you do raise an error, check the message.
@@ -588,7 +588,7 @@ def test_delete_empty_path() -> None:
def test_delete_non_existent_path_deep() -> None: def test_delete_non_existent_path_deep() -> None:
data = {"foo": {"bar": {"baz": 123}}} data = {"foo": {"bar": {"baz": 123}}}
# non-existent deep path # non-existent deep path
with pytest.raises(ClanError) as excinfo: with pytest.raises(KeyError) as excinfo:
delete_by_path(data, "foo.bar.qux") delete_by_path(data, "foo.bar.qux")
assert "not found" in str(excinfo.value) assert "not found" in str(excinfo.value)
# Data remains unchanged # Data remains unchanged

View File

@@ -10,8 +10,8 @@ from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import pytest import pytest
from age_keys import assert_secrets_file_recipients
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.secrets.folders import sops_secrets_folder
from fixtures_flakes import FlakeForTest from fixtures_flakes import FlakeForTest
from helpers import cli from helpers import cli
from stdout import CaptureOutput from stdout import CaptureOutput
@@ -91,7 +91,7 @@ def _test_identities(
test_secret_name, test_secret_name,
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
test_secret_name, test_secret_name,
expected_age_recipients_keypairs=[age_keys[0], admin_age_key], expected_age_recipients_keypairs=[age_keys[0], admin_age_key],
@@ -111,7 +111,7 @@ def _test_identities(
age_keys[1].privkey, age_keys[1].privkey,
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
test_secret_name, test_secret_name,
expected_age_recipients_keypairs=[age_keys[1], admin_age_key], expected_age_recipients_keypairs=[age_keys[1], admin_age_key],
@@ -302,7 +302,7 @@ def test_groups(
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
secret_name, secret_name,
expected_age_recipients_keypairs=[ expected_age_recipients_keypairs=[
@@ -327,7 +327,7 @@ def test_groups(
"user1", "user1",
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
secret_name, secret_name,
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key], expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
@@ -349,7 +349,7 @@ def test_groups(
"user1", "user1",
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
secret_name, secret_name,
expected_age_recipients_keypairs=[ expected_age_recipients_keypairs=[
@@ -370,7 +370,7 @@ def test_groups(
"user1", "user1",
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
secret_name, secret_name,
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key], expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
@@ -391,7 +391,7 @@ def test_groups(
"machine1", "machine1",
] ]
) )
assert_sops_file_recipients( assert_secrets_file_recipients(
test_flake.path, test_flake.path,
secret_name, secret_name,
expected_age_recipients_keypairs=[admin_age_key], expected_age_recipients_keypairs=[admin_age_key],
@@ -413,29 +413,6 @@ def test_groups(
assert not group_symlink.exists(follow_symlinks=False), err_msg assert not group_symlink.exists(follow_symlinks=False), err_msg
def assert_sops_file_recipients(
flake_path: Path,
secret_name: str,
expected_age_recipients_keypairs: list["KeyPair"],
err_msg: str | None = None,
) -> None:
"""Checks that the recipients of a SOPS file matches expectations.
:param err_msg: in case of failure, if you gave an error message then it
will be displayed, otherwise pytest will display the two different sets
of recipients.
"""
sops_file = sops_secrets_folder(flake_path) / secret_name / "secret"
with sops_file.open("rb") as fp:
sops_data = json.load(fp)
age_recipients = {each["recipient"] for each in sops_data["sops"]["age"]}
expected_age_recipients = {pair.pubkey for pair in expected_age_recipients_keypairs}
if not err_msg:
assert age_recipients == expected_age_recipients
return
assert age_recipients == expected_age_recipients, err_msg
@contextmanager @contextmanager
def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
old_key = os.environ["SOPS_AGE_KEY_FILE"] old_key = os.environ["SOPS_AGE_KEY_FILE"]

View File

@@ -2,7 +2,6 @@ import json
import logging import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING
import pytest import pytest
from age_keys import SopsSetup from age_keys import SopsSetup
@@ -21,9 +20,6 @@ from clan_cli.vars.set import set_var
from fixtures_flakes import ClanFlake from fixtures_flakes import ClanFlake
from helpers import cli from helpers import cli
if TYPE_CHECKING:
from age_keys import KeyPair
def test_dependencies_as_files(temp_dir: Path) -> None: def test_dependencies_as_files(temp_dir: Path) -> None:
from clan_cli.vars.generate import dependencies_as_dir from clan_cli.vars.generate import dependencies_as_dir
@@ -100,9 +96,10 @@ def test_required_generators() -> None:
@pytest.mark.with_core @pytest.mark.with_core
def test_generate_public_and_secret_vars( def test_generate_public_and_secret_vars(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
) -> None: ) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
@@ -136,7 +133,6 @@ def test_generate_public_and_secret_vars(
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
machine = Machine(name="my_machine", flake=Flake(str(flake.path))) machine = Machine(name="my_machine", flake=Flake(str(flake.path)))
assert not check_vars(machine) assert not check_vars(machine)
@@ -227,10 +223,11 @@ def test_generate_public_and_secret_vars(
@pytest.mark.with_core @pytest.mark.with_core
def test_generate_secret_var_sops_with_default_group( def test_generate_secret_var_sops_with_default_group(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup, sops_setup: SopsSetup,
age_keys: list["KeyPair"],
) -> None: ) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
config["clan"]["core"]["sops"]["defaultGroups"] = ["my_group"] config["clan"]["core"]["sops"]["defaultGroups"] = ["my_group"]
@@ -248,7 +245,6 @@ def test_generate_secret_var_sops_with_default_group(
) )
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["secrets", "groups", "add-user", "my_group", sops_setup.user]) cli.run(["secrets", "groups", "add-user", "my_group", sops_setup.user])
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
in_repo_store = in_repo.FactStore( in_repo_store = in_repo.FactStore(
@@ -268,7 +264,7 @@ def test_generate_secret_var_sops_with_default_group(
) )
# add another user to the group and check if secret gets re-encrypted # add another user to the group and check if secret gets re-encrypted
pubkey_user2 = age_keys[1] pubkey_user2 = sops_setup.keys[1]
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -291,7 +287,7 @@ def test_generate_secret_var_sops_with_default_group(
) )
# Rotate key of a user # Rotate key of a user
pubkey_user3 = age_keys[2] pubkey_user3 = sops_setup.keys[2]
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -316,9 +312,10 @@ def test_generate_secret_var_sops_with_default_group(
@pytest.mark.with_core @pytest.mark.with_core
def test_generated_shared_secret_sops( def test_generated_shared_secret_sops(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
) -> None: ) -> None:
flake = flake_with_sops
m1_config = flake.machines["machine1"] m1_config = flake.machines["machine1"]
m1_config["nixpkgs"]["hostPlatform"] = "x86_64-linux" m1_config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
shared_generator = m1_config["clan"]["core"]["vars"]["generators"][ shared_generator = m1_config["clan"]["core"]["vars"]["generators"][
@@ -334,7 +331,6 @@ def test_generated_shared_secret_sops(
) )
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
machine1 = Machine(name="machine1", flake=Flake(str(flake.path))) machine1 = Machine(name="machine1", flake=Flake(str(flake.path)))
machine2 = Machine(name="machine2", flake=Flake(str(flake.path))) machine2 = Machine(name="machine2", flake=Flake(str(flake.path)))
cli.run(["vars", "generate", "--flake", str(flake.path), "machine1"]) cli.run(["vars", "generate", "--flake", str(flake.path), "machine1"])
@@ -366,13 +362,17 @@ def test_generate_secret_var_password_store(
) -> None: ) -> None:
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
config["clan"]["core"]["vars"]["settings"]["secretStore"] = "password-store" clan_vars = config["clan"]["core"]["vars"]
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] clan_vars["settings"]["secretStore"] = "password-store"
# Create a second secret so that when we delete the first one,
# we still have the second one to test `delete_store`:
my_generator = clan_vars["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True my_generator["files"]["my_secret"]["secret"] = True
my_generator["script"] = "echo hello > $out/my_secret" my_generator["script"] = "echo hello > $out/my_secret"
my_shared_generator = config["clan"]["core"]["vars"]["generators"][ my_generator2 = clan_vars["generators"]["my_generator2"]
"my_shared_generator" my_generator2["files"]["my_secret2"]["secret"] = True
] my_generator2["script"] = "echo world > $out/my_secret2"
my_shared_generator = clan_vars["generators"]["my_shared_generator"]
my_shared_generator["share"] = True my_shared_generator["share"] = True
my_shared_generator["files"]["my_shared_secret"]["secret"] = True my_shared_generator["files"]["my_shared_secret"]["secret"] = True
my_shared_generator["script"] = "echo hello > $out/my_shared_secret" my_shared_generator["script"] = "echo hello > $out/my_shared_secret"
@@ -384,7 +384,7 @@ def test_generate_secret_var_password_store(
password_store_dir = flake.path / "pass" password_store_dir = flake.path / "pass"
shutil.copytree(test_root / "data" / "password-store", password_store_dir) shutil.copytree(test_root / "data" / "password-store", password_store_dir)
monkeypatch.setenv("PASSWORD_STORE_DIR", str(flake.path / "pass")) monkeypatch.setenv("PASSWORD_STORE_DIR", str(password_store_dir))
machine = Machine(name="my_machine", flake=Flake(str(flake.path))) machine = Machine(name="my_machine", flake=Flake(str(flake.path)))
assert not check_vars(machine) assert not check_vars(machine)
@@ -409,13 +409,31 @@ def test_generate_secret_var_password_store(
vars_text = stringify_all_vars(machine) vars_text = stringify_all_vars(machine)
assert "my_generator/my_secret" in vars_text assert "my_generator/my_secret" in vars_text
my_generator = Generator("my_generator", share=False, files=[])
var_name = "my_secret"
store.delete(my_generator, var_name)
assert not store.exists(my_generator, var_name)
store.delete_store()
store.delete_store() # check idempotency
my_generator2 = Generator("my_generator2", share=False, files=[])
var_name = "my_secret2"
assert not store.exists(my_generator2, var_name)
# The shared secret should still be there,
# not sure if we can delete those automatically:
my_shared_generator = Generator("my_shared_generator", share=True, files=[])
var_name = "my_shared_secret"
assert store.exists(my_shared_generator, var_name)
@pytest.mark.with_core @pytest.mark.with_core
def test_generate_secret_for_multiple_machines( def test_generate_secret_for_multiple_machines(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
) -> None: ) -> None:
flake = flake_with_sops
from clan_cli.nix import nix_config from clan_cli.nix import nix_config
local_system = nix_config()["system"] local_system = nix_config()["system"]
@@ -446,7 +464,6 @@ def test_generate_secret_for_multiple_machines(
) )
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["vars", "generate", "--flake", str(flake.path)]) cli.run(["vars", "generate", "--flake", str(flake.path)])
# check if public vars have been created correctly # check if public vars have been created correctly
in_repo_store1 = in_repo.FactStore( in_repo_store1 = in_repo.FactStore(
@@ -485,9 +502,10 @@ def test_generate_secret_for_multiple_machines(
@pytest.mark.with_core @pytest.mark.with_core
def test_prompt( def test_prompt(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
) -> None: ) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
@@ -509,7 +527,6 @@ def test_prompt(
) )
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
monkeypatch.setattr( monkeypatch.setattr(
"clan_cli.vars.prompt.MOCK_PROMPT_RESPONSE", "clan_cli.vars.prompt.MOCK_PROMPT_RESPONSE",
iter(["line input", "my\nmultiline\ninput\n", "prompt_persist"]), iter(["line input", "my\nmultiline\ninput\n", "prompt_persist"]),
@@ -544,8 +561,7 @@ def test_prompt(
@pytest.mark.with_core @pytest.mark.with_core
def test_multi_machine_shared_vars( def test_multi_machine_shared_vars(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
) -> None: ) -> None:
""" """
Ensure that shared vars are regenerated only when they should, and also can be Ensure that shared vars are regenerated only when they should, and also can be
@@ -555,6 +571,8 @@ def test_multi_machine_shared_vars(
- make sure shared wars are not regenerated when a second machines is added - make sure shared wars are not regenerated when a second machines is added
- make sure vars can still be accessed by all machines, after they are regenerated - make sure vars can still be accessed by all machines, after they are regenerated
""" """
flake = flake_with_sops
machine1_config = flake.machines["machine1"] machine1_config = flake.machines["machine1"]
machine1_config["nixpkgs"]["hostPlatform"] = "x86_64-linux" machine1_config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
shared_generator = machine1_config["clan"]["core"]["vars"]["generators"][ shared_generator = machine1_config["clan"]["core"]["vars"]["generators"][
@@ -570,7 +588,6 @@ def test_multi_machine_shared_vars(
flake.machines["machine2"] = machine1_config flake.machines["machine2"] = machine1_config
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
machine1 = Machine(name="machine1", flake=Flake(str(flake.path))) machine1 = Machine(name="machine1", flake=Flake(str(flake.path)))
machine2 = Machine(name="machine2", flake=Flake(str(flake.path))) machine2 = Machine(name="machine2", flake=Flake(str(flake.path)))
sops_store_1 = sops.SecretStore(machine1) sops_store_1 = sops.SecretStore(machine1)
@@ -659,10 +676,11 @@ def test_api_set_prompts(
@pytest.mark.with_core @pytest.mark.with_core
def test_stdout_of_generate( def test_stdout_of_generate(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
caplog: pytest.LogCaptureFixture, caplog: pytest.LogCaptureFixture,
) -> None: ) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
@@ -675,7 +693,6 @@ def test_stdout_of_generate(
my_secret_generator["script"] = "echo -n hello > $out/my_secret" my_secret_generator["script"] = "echo -n hello > $out/my_secret"
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
from clan_cli.vars.generate import generate_vars_for_machine from clan_cli.vars.generate import generate_vars_for_machine
# with capture_output as output: # with capture_output as output:
@@ -742,10 +759,11 @@ def test_stdout_of_generate(
@pytest.mark.with_core @pytest.mark.with_core
def test_migration( def test_migration(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
caplog: pytest.LogCaptureFixture, caplog: pytest.LogCaptureFixture,
) -> None: ) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_service = config["clan"]["core"]["facts"]["services"]["my_service"] my_service = config["clan"]["core"]["facts"]["services"]["my_service"]
@@ -771,7 +789,6 @@ def test_migration(
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["facts", "generate", "--flake", str(flake.path), "my_machine"]) cli.run(["facts", "generate", "--flake", str(flake.path), "my_machine"])
with caplog.at_level(logging.INFO): with caplog.at_level(logging.INFO):
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
@@ -798,9 +815,10 @@ def test_migration(
@pytest.mark.with_core @pytest.mark.with_core
def test_fails_when_files_are_left_from_other_backend( def test_fails_when_files_are_left_from_other_backend(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake: ClanFlake, flake_with_sops: ClanFlake,
sops_setup: SopsSetup,
) -> None: ) -> None:
flake = flake_with_sops
config = flake.machines["my_machine"] config = flake.machines["my_machine"]
config["nixpkgs"]["hostPlatform"] = "x86_64-linux" config["nixpkgs"]["hostPlatform"] = "x86_64-linux"
my_secret_generator = config["clan"]["core"]["vars"]["generators"][ my_secret_generator = config["clan"]["core"]["vars"]["generators"][
@@ -815,7 +833,6 @@ def test_fails_when_files_are_left_from_other_backend(
my_value_generator["script"] = "echo hello > $out/my_value" my_value_generator["script"] = "echo hello > $out/my_value"
flake.refresh() flake.refresh()
monkeypatch.chdir(flake.path) monkeypatch.chdir(flake.path)
sops_setup.init()
for generator in ["my_secret_generator", "my_value_generator"]: for generator in ["my_secret_generator", "my_value_generator"]:
generate_vars_for_machine( generate_vars_for_machine(
Machine(name="my_machine", flake=Flake(str(flake.path))), Machine(name="my_machine", flake=Flake(str(flake.path))),