vars: add support for password-store

This commit is contained in:
DavHau
2024-07-23 14:54:18 +07:00
parent fd8834103d
commit 5105ff14e0
4 changed files with 130 additions and 12 deletions

View File

@@ -17,7 +17,7 @@ in
imports = [ imports = [
./public/in_repo.nix ./public/in_repo.nix
# ./public/vm.nix # ./public/vm.nix
# ./secret/password-store.nix ./secret/password-store.nix
./secret/sops.nix ./secret/sops.nix
# ./secret/vm.nix # ./secret/vm.nix
]; ];

View File

@@ -0,0 +1,12 @@
{ config, lib, ... }:
{
config.clan.core.vars.settings =
lib.mkIf (config.clan.core.vars.settings.secretStore == "password-store")
{
fileModule = file: {
path = lib.mkIf file.secret "${config.clan.core.password-store.targetDirectory}/${config.clan.core.machineName}-${file.config.generatorName}-${file.config.name}";
};
secretUploadDirectory = lib.mkDefault "/etc/secrets";
secretModule = "clan_cli.vars.secret_modules.password_store";
};
}

View File

@@ -13,33 +13,45 @@ class SecretStore(SecretStoreBase):
self.machine = machine self.machine = machine
def set( def set(
self, service: str, name: str, value: bytes, groups: list[str] self, generator_name: str, name: str, value: bytes, groups: list[str]
) -> Path | None: ) -> Path | None:
subprocess.run( subprocess.run(
nix_shell( nix_shell(
["nixpkgs#pass"], ["nixpkgs#pass"],
["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"], [
"pass",
"insert",
"-m",
f"machines/{self.machine.name}/{generator_name}/{name}",
],
), ),
input=value, input=value,
check=True, check=True,
) )
return None # we manage the files outside of the git repo return None # we manage the files outside of the git repo
def get(self, service: str, name: str) -> bytes: def get(self, generator_name: str, name: str) -> bytes:
return subprocess.run( return subprocess.run(
nix_shell( nix_shell(
["nixpkgs#pass"], ["nixpkgs#pass"],
["pass", "show", f"machines/{self.machine.name}/{name}"], [
"pass",
"show",
f"machines/{self.machine.name}/{generator_name}/{name}",
],
), ),
check=True, check=True,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
).stdout ).stdout
def exists(self, service: str, name: str) -> bool: def exists(self, generator_name: str, name: str) -> bool:
password_store = os.environ.get( password_store = os.environ.get(
"PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store" "PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store"
) )
secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg" secret_path = (
Path(password_store)
/ f"machines/{self.machine.name}/{generator_name}/{name}.gpg"
)
return secret_path.exists() return secret_path.exists()
def generate_hash(self) -> bytes: def generate_hash(self) -> bytes:

View File

@@ -1,4 +1,5 @@
import os import os
import subprocess
from collections import defaultdict from collections import defaultdict
from collections.abc import Callable from collections.abc import Callable
from io import StringIO from io import StringIO
@@ -14,7 +15,8 @@ from root import CLAN_CORE
from clan_cli.clan_uri import FlakeId from clan_cli.clan_uri import FlakeId
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.vars.secret_modules.sops import SecretStore from clan_cli.nix import nix_shell
from clan_cli.vars.secret_modules import password_store, sops
def def_value() -> defaultdict: def def_value() -> defaultdict:
@@ -95,7 +97,45 @@ def test_generate_public_var(
@pytest.mark.impure @pytest.mark.impure
def test_generate_secret_var_with_default_group( def test_generate_secret_var_sops(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
user = os.environ.get("USER", "user")
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True
my_generator["script"] = "echo hello > $out/my_secret"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
user,
sops_setup.keys[0].pubkey,
]
)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
)
assert not var_file_path.is_file()
sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert sops_store.exists("my_generator", "my_secret")
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generate_secret_var_sops_with_default_group(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
temporary_home: Path, temporary_home: Path,
sops_setup: SopsSetup, sops_setup: SopsSetup,
@@ -128,7 +168,7 @@ def test_generate_secret_var_with_default_group(
assert not ( assert not (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret" flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
).is_file() ).is_file()
sops_store = SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path))) sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path)))
assert sops_store.exists("my_generator", "my_secret") assert sops_store.exists("my_generator", "my_secret")
assert ( assert (
flake.path flake.path
@@ -141,6 +181,60 @@ def test_generate_secret_var_with_default_group(
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n" assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generate_secret_var_password_store(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
) -> None:
config = nested_dict()
my_generator = config["clan"]["core"]["vars"]["settings"]["secretStore"] = (
"password-store"
)
my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"]
my_generator["files"]["my_secret"]["secret"] = True
my_generator["script"] = "echo hello > $out/my_secret"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs=dict(my_machine=config),
)
monkeypatch.chdir(flake.path)
gnupghome = temporary_home / "gpg"
gnupghome.mkdir(mode=0o700)
monkeypatch.setenv("GNUPGHOME", str(gnupghome))
monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_home / "pass"))
gpg_key_spec = temporary_home / "gpg_key_spec"
gpg_key_spec.write_text(
"""
Key-Type: 1
Key-Length: 1024
Name-Real: Root Superuser
Name-Email: test@local
Expire-Date: 0
%no-protection
"""
)
subprocess.run(
nix_shell(
["nixpkgs#gnupg"], ["gpg", "--batch", "--gen-key", str(gpg_key_spec)]
),
check=True,
)
subprocess.run(
nix_shell(["nixpkgs#pass"], ["pass", "init", "test@local"]), check=True
)
cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"])
var_file_path = (
flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret"
)
assert not var_file_path.is_file()
store = password_store.SecretStore(
Machine(name="my_machine", flake=FlakeId(flake.path))
)
assert store.exists("my_generator", "my_secret")
assert store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure @pytest.mark.impure
def test_generate_secret_for_multiple_machines( def test_generate_secret_for_multiple_machines(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
@@ -196,8 +290,8 @@ def test_generate_secret_for_multiple_machines(
assert machine2_var_file_path.is_file() assert machine2_var_file_path.is_file()
assert machine2_var_file_path.read_text() == "machine2\n" assert machine2_var_file_path.read_text() == "machine2\n"
# check if secret vars have been created correctly # check if secret vars have been created correctly
sops_store1 = SecretStore(Machine(name="machine1", flake=FlakeId(flake.path))) sops_store1 = sops.SecretStore(Machine(name="machine1", flake=FlakeId(flake.path)))
sops_store2 = SecretStore(Machine(name="machine2", flake=FlakeId(flake.path))) sops_store2 = sops.SecretStore(Machine(name="machine2", flake=FlakeId(flake.path)))
assert sops_store1.exists("my_generator", "my_secret") assert sops_store1.exists("my_generator", "my_secret")
assert sops_store2.exists("my_generator", "my_secret") assert sops_store2.exists("my_generator", "my_secret")
assert sops_store1.get("my_generator", "my_secret").decode() == "machine1\n" assert sops_store1.get("my_generator", "my_secret").decode() == "machine1\n"