vars/password-store: replace passBackend option with passPackage

The `clan.core.vars.settings.passBackend` option has been replaced with
`clan.vars.password-store.passPackage` to provide better type safety and
clearer configuration.

Changes:
- Remove problematic mkRemovedOptionModule that caused circular dependency
- Add proper option definition with assertion-based migration
- Users setting the old option get clear migration instructions
- Normal evaluation continues to work for users not using the old option

Migration: Replace `clan.core.vars.settings.passBackend = "passage"`
with `clan.vars.password-store.passPackage = pkgs.passage`
This commit is contained in:
lassulus
2025-06-29 10:51:18 +02:00
parent d3cd481600
commit 2f0f9a9fba
8 changed files with 129 additions and 98 deletions

View File

@@ -40,6 +40,18 @@ in
}; };
config = { config = {
# Check for removed passBackend option usage
assertions = [
{
assertion = config.clan.core.vars.settings.passBackend == null;
message = ''
The option `clan.core.vars.settings.passBackend' has been removed.
Use clan.vars.password-store.passPackage instead.
Set it to pkgs.pass for GPG or pkgs.passage for age encryption.
'';
}
];
# check all that all non-secret files have no owner/group/mode set # check all that all non-secret files have no owner/group/mode set
warnings = lib.foldl' ( warnings = lib.foldl' (
warnings: generator: warnings: generator:

View File

@@ -62,6 +62,13 @@ in
location where the tarball with the password-store secrets will be uploaded to and the manifest location where the tarball with the password-store secrets will be uploaded to and the manifest
''; '';
}; };
passPackage = lib.mkOption {
type = lib.types.package;
default = pkgs.pass;
description = ''
Password store package to use. Can be pkgs.pass for GPG-based storage or pkgs.passage for age-based storage.
'';
};
}; };
config = { config = {
clan.core.vars.settings = clan.core.vars.settings =
@@ -76,7 +83,7 @@ in
else if file.config.neededFor == "services" then else if file.config.neededFor == "services" then
"/run/secrets/${file.config.generatorName}/${file.config.name}" "/run/secrets/${file.config.generatorName}/${file.config.name}"
else if file.config.neededFor == "activation" then else if file.config.neededFor == "activation" then
"${config.clan.password-store.secretLocation}/activation/${file.config.generatorName}/${file.config.name}" "${config.clan.vars.password-store.secretLocation}/activation/${file.config.generatorName}/${file.config.name}"
else if file.config.neededFor == "partitioning" then else if file.config.neededFor == "partitioning" then
"/run/partitioning-secrets/${file.config.generatorName}/${file.config.name}" "/run/partitioning-secrets/${file.config.generatorName}/${file.config.name}"
else else

View File

@@ -15,17 +15,6 @@
''; '';
}; };
passBackend = lib.mkOption {
type = lib.types.enum [
"passage"
"pass"
];
default = "pass";
description = ''
password-store backend to use. Valid options are `pass` and `passage`
'';
};
secretModule = lib.mkOption { secretModule = lib.mkOption {
type = lib.types.str; type = lib.types.str;
internal = true; internal = true;
@@ -65,4 +54,15 @@
the python import path to the public module the python import path to the public module
''; '';
}; };
# Legacy option that guides migration
passBackend = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
visible = false;
description = ''
DEPRECATED: This option has been removed. Use clan.vars.password-store.passPackage instead.
Set it to pkgs.pass for GPG or pkgs.passage for age encryption.
'';
};
} }

View File

@@ -124,8 +124,6 @@ def update_command(args: argparse.Namespace) -> None:
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.{{share,dependencies,migrateFact,prompts}}", f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.{{share,dependencies,migrateFact,prompts}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.files.*.{{secret,deploy,owner,group,mode,neededFor}}", f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.generators.*.files.*.{{secret,deploy,owner,group,mode,neededFor}}",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretUploadDirectory", f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.facts.secretUploadDirectory",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.vars.password-store.secretLocation",
f"clanInternals.machines.{system}.{{{','.join(machine_names)}}}.config.clan.core.vars.settings.passBackend",
] ]
) )

View File

@@ -403,6 +403,19 @@ def test_generate_secret_var_password_store(
shutil.copytree(test_root / "data" / "password-store", password_store_dir) shutil.copytree(test_root / "data" / "password-store", password_store_dir)
monkeypatch.setenv("PASSWORD_STORE_DIR", str(password_store_dir)) monkeypatch.setenv("PASSWORD_STORE_DIR", str(password_store_dir))
# Initialize password store as a git repository
import subprocess
subprocess.run(["git", "init"], cwd=password_store_dir, check=True)
subprocess.run(
["git", "config", "user.email", "test@example.com"],
cwd=password_store_dir,
check=True,
)
subprocess.run(
["git", "config", "user.name", "Test User"], cwd=password_store_dir, check=True
)
machine = Machine(name="my_machine", flake=Flake(str(flake.path))) machine = Machine(name="my_machine", flake=Flake(str(flake.path)))
assert not check_vars(machine.name, machine.flake) assert not check_vars(machine.name, machine.flake)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])

View File

@@ -82,11 +82,6 @@ class Generator:
files = [] files = []
gen_files = files_data.get(gen_name, {}) gen_files = files_data.get(gen_name, {})
for file_name, file_data in gen_files.items(): for file_name, file_data in gen_files.items():
# Handle mode conversion properly
mode = file_data["mode"]
if isinstance(mode, str):
mode = int(mode, 8)
var = Var( var = Var(
id=f"{gen_name}/{file_name}", id=f"{gen_name}/{file_name}",
name=file_name, name=file_name,
@@ -94,7 +89,9 @@ class Generator:
deploy=file_data["deploy"], deploy=file_data["deploy"],
owner=file_data["owner"], owner=file_data["owner"],
group=file_data["group"], group=file_data["group"],
mode=mode, mode=file_data["mode"]
if isinstance(file_data["mode"], int)
else int(file_data["mode"], 8),
needed_for=file_data["neededFor"], needed_for=file_data["neededFor"],
) )
files.append(var) files.append(var)

View File

@@ -1,9 +1,7 @@
import io import io
import logging import logging
import os
import tarfile import tarfile
from collections.abc import Iterable from collections.abc import Iterable
from itertools import chain
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -12,7 +10,6 @@ from clan_cli.vars._types import StoreBase
from clan_cli.vars.generate import Generator, Var from clan_cli.vars.generate import Generator, Var
from clan_lib.cmd import CmdOut, Log, RunOpts, run from clan_lib.cmd import CmdOut, Log, RunOpts, run
from clan_lib.machines.machines import Machine from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_lib.ssh.remote import Remote from clan_lib.ssh.remote import Remote
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -26,31 +23,64 @@ class SecretStore(StoreBase):
def __init__(self, machine: Machine) -> None: def __init__(self, machine: Machine) -> None:
self.machine = machine self.machine = machine
self.entry_prefix = "clan-vars" self.entry_prefix = "clan-vars"
self._store_dir: Path | None = None
@property @property
def store_name(self) -> str: def store_name(self) -> str:
return "password_store" return "password_store"
@property @property
def _store_backend(self) -> str: def store_dir(self) -> Path:
backend = self.machine.select("config.clan.core.vars.settings.passBackend") """Get the password store directory, cached after first access."""
return backend if self._store_dir is None:
result = self._run_pass(
"git", "rev-parse", "--show-toplevel", options=RunOpts(check=False)
)
if result.returncode != 0:
msg = "Password store must be a git repository"
raise ValueError(msg)
self._store_dir = Path(result.stdout.strip())
return self._store_dir
@property @property
def _password_store_dir(self) -> Path: def _pass_command(self) -> str:
if self._store_backend == "passage": out_path = self.machine.select(
lookup = os.environ.get("PASSAGE_DIR") "config.clan.vars.password-store.passPackage.outPath"
default = Path.home() / ".passage/store" )
else: main_program = (
lookup = os.environ.get("PASSWORD_STORE_DIR") self.machine.select(
default = Path.home() / ".password-store" "config.clan.vars.password-store.passPackage.?meta.?mainProgram"
return Path(lookup) if lookup else default )
.get("meta", {})
.get("mainProgram")
)
if main_program:
binary_path = Path(out_path) / "bin" / main_program
if binary_path.exists():
return str(binary_path)
# Look for common password store binaries
bin_dir = Path(out_path) / "bin"
if bin_dir.exists():
for binary in ["pass", "passage"]:
binary_path = bin_dir / binary
if binary_path.exists():
return str(binary_path)
# If only one binary exists, use it
binaries = [f for f in bin_dir.iterdir() if f.is_file()]
if len(binaries) == 1:
return str(binaries[0])
msg = "Could not find password store binary in package"
raise ValueError(msg)
def entry_dir(self, generator: Generator, name: str) -> Path: def entry_dir(self, generator: Generator, name: str) -> Path:
return Path(self.entry_prefix) / self.rel_dir(generator, name) return Path(self.entry_prefix) / self.rel_dir(generator, name)
def _run_pass(self, *args: str, options: RunOpts | None = None) -> CmdOut: def _run_pass(self, *args: str, options: RunOpts | None = None) -> CmdOut:
cmd = nix_shell(packages=["pass"], cmd=[self._store_backend, *args]) cmd = [self._pass_command, *args]
return run(cmd, options) return run(cmd, options)
def _set( def _set(
@@ -68,9 +98,11 @@ class SecretStore(StoreBase):
return self._run_pass("show", pass_name).stdout.encode() return self._run_pass("show", pass_name).stdout.encode()
def exists(self, generator: Generator, name: str) -> bool: def exists(self, generator: Generator, name: str) -> bool:
extension = "age" if self._store_backend == "passage" else "gpg" pass_name = str(self.entry_dir(generator, name))
filename = f"{self.entry_dir(generator, name)}.{extension}" # Check if the file exists with either .age or .gpg extension
return (self._password_store_dir / filename).exists() age_file = self.store_dir / f"{pass_name}.age"
gpg_file = self.store_dir / f"{pass_name}.gpg"
return age_file.exists() or gpg_file.exists()
def delete(self, generator: Generator, name: str) -> Iterable[Path]: def delete(self, generator: Generator, name: str) -> Iterable[Path]:
pass_name = str(self.entry_dir(generator, name)) pass_name = str(self.entry_dir(generator, name))
@@ -79,66 +111,31 @@ class SecretStore(StoreBase):
def delete_store(self) -> Iterable[Path]: def delete_store(self) -> Iterable[Path]:
machine_dir = Path(self.entry_prefix) / "per-machine" / self.machine.name machine_dir = Path(self.entry_prefix) / "per-machine" / self.machine.name
if not (self._password_store_dir / machine_dir).exists(): # Check if the directory exists in the password store before trying to delete
# The directory may not exist if the machine result = self._run_pass("ls", str(machine_dir), options=RunOpts(check=False))
# has no vars, or they have been deleted already. if result.returncode == 0:
return [] self._run_pass(
pass_call = ["rm", "--force", "--recursive", str(machine_dir)] "rm",
self._run_pass(*pass_call, options=RunOpts(check=True)) "--force",
"--recursive",
str(machine_dir),
options=RunOpts(check=True),
)
return [] return []
def generate_hash(self) -> bytes: def generate_hash(self) -> bytes:
hashes = [] result = self._run_pass(
hashes.append(
run(
nix_shell(
["git"],
[
"git", "git",
"-C",
str(self._password_store_dir),
"log", "log",
"-1", "-1",
"--format=%H", "--format=%H",
self.entry_prefix, self.entry_prefix,
], options=RunOpts(check=False),
),
RunOpts(check=False),
)
.stdout.strip()
.encode()
)
shared_dir = self._password_store_dir / self.entry_prefix / "shared"
machine_dir = (
self._password_store_dir
/ self.entry_prefix
/ "per-machine"
/ self.machine.name
)
for symlink in chain(shared_dir.glob("**/*"), machine_dir.glob("**/*")):
if symlink.is_symlink():
hashes.append(
run(
nix_shell(
["git"],
[
"git",
"-C",
str(self._password_store_dir),
"log",
"-1",
"--format=%H",
str(symlink),
],
),
RunOpts(check=False),
)
.stdout.strip()
.encode()
) )
git_hash = result.stdout.strip().encode()
# we sort the hashes to make sure that the order is always the same if not git_hash:
hashes.sort() return b""
from clan_cli.vars.generate import Generator from clan_cli.vars.generate import Generator
@@ -149,22 +146,24 @@ class SecretStore(StoreBase):
for generator in generators: for generator in generators:
for file in generator.files: for file in generator.files:
manifest.append(f"{generator.name}/{file.name}".encode()) manifest.append(f"{generator.name}/{file.name}".encode())
manifest += hashes
manifest.append(git_hash)
return b"\n".join(manifest) return b"\n".join(manifest)
def needs_upload(self, host: Remote) -> bool: def needs_upload(self, host: Remote) -> bool:
local_hash = self.generate_hash() local_hash = self.generate_hash()
if not local_hash:
return True
remote_hash = host.run( remote_hash = host.run(
# TODO get the path to the secrets from the machine
[ [
"cat", "cat",
f"{self.machine.select('config.clan.vars.password-store.secretLocation')}/.{self._store_backend}_info", f"{self.machine.select('config.clan.vars.password-store.secretLocation')}/.pass_info",
], ],
RunOpts(log=Log.STDERR, check=False), RunOpts(log=Log.STDERR, check=False),
).stdout.strip() ).stdout.strip()
if not remote_hash: if not remote_hash:
print("remote hash is empty")
return True return True
return local_hash.decode() != remote_hash return local_hash.decode() != remote_hash
@@ -233,7 +232,9 @@ class SecretStore(StoreBase):
out_file.parent.mkdir(parents=True, exist_ok=True) out_file.parent.mkdir(parents=True, exist_ok=True)
out_file.write_bytes(self.get(generator, file.name)) out_file.write_bytes(self.get(generator, file.name))
(output_dir / f".{self._store_backend}_info").write_bytes(self.generate_hash()) hash_data = self.generate_hash()
if hash_data:
(output_dir / ".pass_info").write_bytes(hash_data)
def upload(self, host: Remote, phases: list[str]) -> None: def upload(self, host: Remote, phases: list[str]) -> None:
if "partitioning" in phases: if "partitioning" in phases:

View File

@@ -2,6 +2,7 @@
# callPackage args # callPackage args
gnupg, gnupg,
installShellFiles, installShellFiles,
pass,
jq, jq,
lib, lib,
nix, nix,
@@ -58,6 +59,7 @@ let
testDependencies = testRuntimeDependencies ++ [ testDependencies = testRuntimeDependencies ++ [
gnupg gnupg
pass
stdenv.cc # Compiler used for certain native extensions stdenv.cc # Compiler used for certain native extensions
(pythonRuntime.withPackages pyTestDeps) (pythonRuntime.withPackages pyTestDeps)
]; ];
@@ -213,6 +215,7 @@ pythonRuntime.pkgs.buildPythonApplication {
pkgs.shellcheck-minimal pkgs.shellcheck-minimal
pkgs.mkpasswd pkgs.mkpasswd
pkgs.xkcdpass pkgs.xkcdpass
pkgs.pass
nix-select nix-select
]; ];
}; };