clan-cli: add delete and delete_store to StoreBase

- `delete` lets you delete a specific var under a specific generator;
- `delete_store` deletes an entire store.

The `delete` method could be useful to "garbage-collect" unused vars as
a machine's configuration changes.

The `delete_store` method can be used to delete all the vars for a
machine when the machine is deleted. The current behavior is to leave
everything behind.

Important point:

- `delete_store` needs to be idempotent because public and
  "private"/"secret" vars for a machine can share the same physical
  store (directory), and deleting either type of store (public or
  private) will delete both.
This commit is contained in:
Louis Opter
2025-02-07 11:20:51 +00:00
committed by Mic92
parent 9780463e6a
commit ef5ad09b2d
6 changed files with 126 additions and 36 deletions

View File

@@ -1,5 +1,6 @@
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@@ -122,6 +123,29 @@ class StoreBase(ABC):
) )
return new_file return new_file
@abstractmethod
def delete(self, generator: "Generator", name: str) -> Iterable[Path]:
"""Remove a var from the store.
:return: An iterable of affected paths in the git repository. This
may be empty if the store is outside of the repository.
"""
@abstractmethod
def delete_store(self) -> Iterable[Path]:
"""Delete the store (all vars) for this machine.
.. note::
This does not make the distinction between public and private vars.
Since the public and private store of a machine can be co-located
under the same directory, this method's implementation has to be
idempotent.
:return: An iterable of affected paths in the git repository. This
may be empty if the store was outside of the repository.
"""
def get_validation(self, generator: "Generator") -> str | None: def get_validation(self, generator: "Generator") -> str | None:
""" """
Return the invalidation hash that indicates if a generator needs to be re-run Return the invalidation hash that indicates if a generator needs to be re-run

View File

@@ -1,4 +1,5 @@
import shutil import shutil
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -30,15 +31,14 @@ class FactStore(StoreBase):
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}" msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg) raise ClanError(msg)
folder = self.directory(generator, var.name) folder = self.directory(generator, var.name)
file_path = folder / "value"
if folder.exists(): if folder.exists():
if not (folder / "value").exists(): if not file_path.exists():
# another backend has used that folder before -> error out # another backend has used that folder before -> error out
self.backend_collision_error(folder) self.backend_collision_error(folder)
shutil.rmtree(folder) shutil.rmtree(folder)
# re-create directory # re-create directory
file_path = folder / "value"
folder.mkdir(parents=True, exist_ok=True) folder.mkdir(parents=True, exist_ok=True)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch() file_path.touch()
file_path.write_bytes(value) file_path.write_bytes(value)
return file_path return file_path
@@ -50,6 +50,24 @@ class FactStore(StoreBase):
def exists(self, generator: Generator, name: str) -> bool: def exists(self, generator: Generator, name: str) -> bool:
return (self.directory(generator, name) / "value").exists() return (self.directory(generator, name) / "value").exists()
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
fact_folder = self.directory(generator, name)
fact_file = fact_folder / "value"
fact_file.unlink()
empty = None
if next(fact_folder.iterdir(), empty) is not empty:
return [fact_file]
fact_folder.rmdir()
return [fact_folder]
def delete_store(self) -> Iterable[Path]:
flake_root = Path(self.machine.flake_dir)
store_folder = flake_root / "vars/per-machine" / self.machine.name
if not store_folder.exists():
return []
shutil.rmtree(store_folder)
return [store_folder]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
msg = "populate_dir is not implemented for public vars stores" msg = "populate_dir is not implemented for public vars stores"
raise NotImplementedError(msg) raise NotImplementedError(msg)

View File

@@ -1,4 +1,6 @@
import logging import logging
import shutil
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from clan_cli.dirs import vm_state_dir from clan_cli.dirs import vm_state_dir
@@ -48,6 +50,21 @@ class FactStore(StoreBase):
msg = f"Fact {name} for service {generator.name} not found" msg = f"Fact {name} for service {generator.name} not found"
raise ClanError(msg) raise ClanError(msg)
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
fact_dir = self.dir / generator.name
fact_file = fact_dir / name
fact_file.unlink()
empty = None
if next(fact_dir.iterdir(), empty) is empty:
fact_dir.rmdir()
return [fact_file]
def delete_store(self) -> Iterable[Path]:
if not self.dir.exists():
return []
shutil.rmtree(self.dir)
return [self.dir]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
msg = "populate_dir is not implemented for public vars stores" msg = "populate_dir is not implemented for public vars stores"
raise NotImplementedError(msg) raise NotImplementedError(msg)

View File

@@ -2,11 +2,12 @@ import io
import logging import logging
import os import os
import tarfile import tarfile
from collections.abc import Iterable
from itertools import chain from itertools import chain
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, RunOpts, run from clan_cli.cmd import CmdOut, Log, RunOpts, run
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from clan_cli.ssh.upload import upload from clan_cli.ssh.upload import upload
@@ -35,52 +36,51 @@ class SecretStore(StoreBase):
return backend return backend
@property @property
def _password_store_dir(self) -> str: def _password_store_dir(self) -> Path:
if self._store_backend == "passage": if self._store_backend == "passage":
return os.environ.get("PASSAGE_DIR", f"{os.environ['HOME']}/.passage/store") lookup = os.environ.get("PASSAGE_DIR")
return os.environ.get( default = Path.home() / ".passage/store"
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store" else:
) lookup = os.environ.get("PASSWORD_STORE_DIR")
default = Path.home() / ".password-store"
return Path(lookup) if lookup else default
def entry_dir(self, generator: Generator, name: str) -> Path: def entry_dir(self, generator: Generator, name: str) -> Path:
return Path(self.entry_prefix) / self.rel_dir(generator, name) return Path(self.entry_prefix) / self.rel_dir(generator, name)
def _run_pass(self, *args: str, options: RunOpts | None = None) -> CmdOut:
cmd = nix_shell(packages=["nixpkgs#pass"], cmd=[self._store_backend, *args])
return run(cmd, options)
def _set( def _set(
self, self,
generator: Generator, generator: Generator,
var: Var, var: Var,
value: bytes, value: bytes,
) -> Path | None: ) -> Path | None:
run( pass_call = ["insert", "-m", str(self.entry_dir(generator, var.name))]
nix_shell( self._run_pass(*pass_call, options=RunOpts(input=value, check=True))
[f"nixpkgs#{self._store_backend}"],
[
f"{self._store_backend}",
"insert",
"-m",
str(self.entry_dir(generator, var.name)),
],
),
RunOpts(input=value, check=True),
)
return None # we manage the files outside of the git repo return None # we manage the files outside of the git repo
def get(self, generator: Generator, name: str) -> bytes: def get(self, generator: Generator, name: str) -> bytes:
return run( pass_name = str(self.entry_dir(generator, name))
nix_shell( return self._run_pass("show", pass_name).stdout.encode()
[f"nixpkgs#{self._store_backend}"],
[
f"{self._store_backend}",
"show",
str(self.entry_dir(generator, name)),
],
),
).stdout.encode()
def exists(self, generator: Generator, name: str) -> bool: def exists(self, generator: Generator, name: str) -> bool:
extension = "age" if self._store_backend == "passage" else "gpg" extension = "age" if self._store_backend == "passage" else "gpg"
filename = f"{self.entry_dir(generator, name)}.{extension}" filename = f"{self.entry_dir(generator, name)}.{extension}"
return (Path(self._password_store_dir) / filename).exists() return (self._password_store_dir / filename).exists()
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
pass_name = str(self.entry_dir(generator, name))
self._run_pass("rm", "--force", pass_name, options=RunOpts(check=True))
return []
def delete_store(self) -> Iterable[Path]:
machine_pass_dir = Path(self.entry_prefix) / "per-machine" / self.machine.name
pass_call = ["rm", "--force", "--recursive", str(machine_pass_dir)]
self._run_pass(*pass_call, options=RunOpts(check=True))
return []
def generate_hash(self) -> bytes: def generate_hash(self) -> bytes:
hashes = [] hashes = []
@@ -91,7 +91,7 @@ class SecretStore(StoreBase):
[ [
"git", "git",
"-C", "-C",
self._password_store_dir, str(self._password_store_dir),
"log", "log",
"-1", "-1",
"--format=%H", "--format=%H",
@@ -103,9 +103,9 @@ class SecretStore(StoreBase):
.stdout.strip() .stdout.strip()
.encode() .encode()
) )
shared_dir = Path(self._password_store_dir) / self.entry_prefix / "shared" shared_dir = self._password_store_dir / self.entry_prefix / "shared"
machine_dir = ( machine_dir = (
Path(self._password_store_dir) self._password_store_dir
/ self.entry_prefix / self.entry_prefix
/ "per-machine" / "per-machine"
/ self.machine.name / self.machine.name
@@ -119,7 +119,7 @@ class SecretStore(StoreBase):
[ [
"git", "git",
"-C", "-C",
self._password_store_dir, str(self._password_store_dir),
"log", "log",
"-1", "-1",
"--format=%H", "--format=%H",

View File

@@ -1,3 +1,5 @@
import shutil
from collections.abc import Iterable
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -162,6 +164,19 @@ class SecretStore(StoreBase):
self.machine.flake_dir, self.secret_path(generator, name) self.machine.flake_dir, self.secret_path(generator, name)
).encode("utf-8") ).encode("utf-8")
def delete(self, generator: "Generator", name: str) -> Iterable[Path]:
secret_dir = self.directory(generator, name)
shutil.rmtree(secret_dir)
return [secret_dir]
def delete_store(self) -> Iterable[Path]:
flake_root = Path(self.machine.flake_dir)
store_folder = flake_root / "vars/per-machine" / self.machine.name
if not store_folder.exists():
return []
shutil.rmtree(store_folder)
return [store_folder]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if "users" in phases or "services" in phases: if "users" in phases or "services" in phases:
key_name = f"{self.machine.name}-age.key" key_name = f"{self.machine.name}-age.key"

View File

@@ -1,4 +1,5 @@
import shutil import shutil
from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from clan_cli.dirs import vm_state_dir from clan_cli.dirs import vm_state_dir
@@ -39,6 +40,21 @@ class SecretStore(StoreBase):
secret_file = self.dir / generator.name / name secret_file = self.dir / generator.name / name
return secret_file.read_bytes() return secret_file.read_bytes()
def delete(self, generator: Generator, name: str) -> Iterable[Path]:
secret_dir = self.dir / generator.name
secret_file = secret_dir / name
secret_file.unlink()
empty = None
if next(secret_dir.iterdir(), empty) is empty:
secret_dir.rmdir()
return [secret_file]
def delete_store(self) -> Iterable[Path]:
if not self.dir.exists():
return []
shutil.rmtree(self.dir)
return [self.dir]
def populate_dir(self, output_dir: Path, phases: list[str]) -> None: def populate_dir(self, output_dir: Path, phases: list[str]) -> None:
if output_dir.exists(): if output_dir.exists():
shutil.rmtree(output_dir) shutil.rmtree(output_dir)