From 5bd20fcf2c8b3ecb78243ca9db881705464ec651 Mon Sep 17 00:00:00 2001 From: DavHau Date: Tue, 23 Jul 2024 14:54:18 +0700 Subject: [PATCH] vars: add support for password-store --- nixosModules/clanCore/vars/default.nix | 2 +- .../clanCore/vars/secret/password-store.nix | 12 ++ .../vars/secret_modules/password_store.py | 24 +++- pkgs/clan-cli/tests/test_vars.py | 104 +++++++++++++++++- 4 files changed, 130 insertions(+), 12 deletions(-) create mode 100644 nixosModules/clanCore/vars/secret/password-store.nix diff --git a/nixosModules/clanCore/vars/default.nix b/nixosModules/clanCore/vars/default.nix index 0afedaa12..db5d55c9b 100644 --- a/nixosModules/clanCore/vars/default.nix +++ b/nixosModules/clanCore/vars/default.nix @@ -17,7 +17,7 @@ in imports = [ ./public/in_repo.nix # ./public/vm.nix - # ./secret/password-store.nix + ./secret/password-store.nix ./secret/sops.nix # ./secret/vm.nix ]; diff --git a/nixosModules/clanCore/vars/secret/password-store.nix b/nixosModules/clanCore/vars/secret/password-store.nix new file mode 100644 index 000000000..900737c76 --- /dev/null +++ b/nixosModules/clanCore/vars/secret/password-store.nix @@ -0,0 +1,12 @@ +{ config, lib, ... }: +{ + config.clan.core.vars.settings = + lib.mkIf (config.clan.core.vars.settings.secretStore == "password-store") + { + fileModule = file: { + path = lib.mkIf file.secret "${config.clan.core.password-store.targetDirectory}/${config.clan.core.machineName}-${file.config.generatorName}-${file.config.name}"; + }; + secretUploadDirectory = lib.mkDefault "/etc/secrets"; + secretModule = "clan_cli.vars.secret_modules.password_store"; + }; +} diff --git a/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py b/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py index c804456f3..2bb36f87d 100644 --- a/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py +++ b/pkgs/clan-cli/clan_cli/vars/secret_modules/password_store.py @@ -13,33 +13,45 @@ class SecretStore(SecretStoreBase): self.machine = machine def set( - self, service: str, name: str, value: bytes, groups: list[str] + self, generator_name: str, name: str, value: bytes, groups: list[str] ) -> Path | None: subprocess.run( nix_shell( ["nixpkgs#pass"], - ["pass", "insert", "-m", f"machines/{self.machine.name}/{name}"], + [ + "pass", + "insert", + "-m", + f"machines/{self.machine.name}/{generator_name}/{name}", + ], ), input=value, check=True, ) return None # we manage the files outside of the git repo - def get(self, service: str, name: str) -> bytes: + def get(self, generator_name: str, name: str) -> bytes: return subprocess.run( nix_shell( ["nixpkgs#pass"], - ["pass", "show", f"machines/{self.machine.name}/{name}"], + [ + "pass", + "show", + f"machines/{self.machine.name}/{generator_name}/{name}", + ], ), check=True, stdout=subprocess.PIPE, ).stdout - def exists(self, service: str, name: str) -> bool: + def exists(self, generator_name: str, name: str) -> bool: password_store = os.environ.get( "PASSWORD_STORE_DIR", f"{os.environ['HOME']}/.password-store" ) - secret_path = Path(password_store) / f"machines/{self.machine.name}/{name}.gpg" + secret_path = ( + Path(password_store) + / f"machines/{self.machine.name}/{generator_name}/{name}.gpg" + ) return secret_path.exists() def generate_hash(self) -> bytes: diff --git a/pkgs/clan-cli/tests/test_vars.py b/pkgs/clan-cli/tests/test_vars.py index 66e82ab7f..bcd82e263 100644 --- a/pkgs/clan-cli/tests/test_vars.py +++ b/pkgs/clan-cli/tests/test_vars.py @@ -1,4 +1,5 @@ import os +import subprocess from collections import defaultdict from collections.abc import Callable from io import StringIO @@ -14,7 +15,8 @@ from root import CLAN_CORE from clan_cli.clan_uri import FlakeId from clan_cli.machines.machines import Machine -from clan_cli.vars.secret_modules.sops import SecretStore +from clan_cli.nix import nix_shell +from clan_cli.vars.secret_modules import password_store, sops def def_value() -> defaultdict: @@ -95,7 +97,45 @@ def test_generate_public_var( @pytest.mark.impure -def test_generate_secret_var_with_default_group( +def test_generate_secret_var_sops( + monkeypatch: pytest.MonkeyPatch, + temporary_home: Path, + sops_setup: SopsSetup, +) -> None: + user = os.environ.get("USER", "user") + config = nested_dict() + my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] + my_generator["files"]["my_secret"]["secret"] = True + my_generator["script"] = "echo hello > $out/my_secret" + flake = generate_flake( + temporary_home, + flake_template=CLAN_CORE / "templates" / "minimal", + machine_configs=dict(my_machine=config), + ) + monkeypatch.chdir(flake.path) + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(flake.path), + user, + sops_setup.keys[0].pubkey, + ] + ) + cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) + var_file_path = ( + flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret" + ) + assert not var_file_path.is_file() + sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path))) + assert sops_store.exists("my_generator", "my_secret") + assert sops_store.get("my_generator", "my_secret").decode() == "hello\n" + + +@pytest.mark.impure +def test_generate_secret_var_sops_with_default_group( monkeypatch: pytest.MonkeyPatch, temporary_home: Path, sops_setup: SopsSetup, @@ -128,7 +168,7 @@ def test_generate_secret_var_with_default_group( assert not ( flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret" ).is_file() - sops_store = SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path))) + sops_store = sops.SecretStore(Machine(name="my_machine", flake=FlakeId(flake.path))) assert sops_store.exists("my_generator", "my_secret") assert ( flake.path @@ -141,6 +181,60 @@ def test_generate_secret_var_with_default_group( assert sops_store.get("my_generator", "my_secret").decode() == "hello\n" +@pytest.mark.impure +def test_generate_secret_var_password_store( + monkeypatch: pytest.MonkeyPatch, + temporary_home: Path, +) -> None: + config = nested_dict() + my_generator = config["clan"]["core"]["vars"]["settings"]["secretStore"] = ( + "password-store" + ) + my_generator = config["clan"]["core"]["vars"]["generators"]["my_generator"] + my_generator["files"]["my_secret"]["secret"] = True + my_generator["script"] = "echo hello > $out/my_secret" + flake = generate_flake( + temporary_home, + flake_template=CLAN_CORE / "templates" / "minimal", + machine_configs=dict(my_machine=config), + ) + monkeypatch.chdir(flake.path) + gnupghome = temporary_home / "gpg" + gnupghome.mkdir(mode=0o700) + monkeypatch.setenv("GNUPGHOME", str(gnupghome)) + monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_home / "pass")) + gpg_key_spec = temporary_home / "gpg_key_spec" + gpg_key_spec.write_text( + """ + Key-Type: 1 + Key-Length: 1024 + Name-Real: Root Superuser + Name-Email: test@local + Expire-Date: 0 + %no-protection + """ + ) + subprocess.run( + nix_shell( + ["nixpkgs#gnupg"], ["gpg", "--batch", "--gen-key", str(gpg_key_spec)] + ), + check=True, + ) + subprocess.run( + nix_shell(["nixpkgs#pass"], ["pass", "init", "test@local"]), check=True + ) + cli.run(["vars", "generate", "--flake", str(flake.path), "my_machine"]) + var_file_path = ( + flake.path / "machines" / "my_machine" / "vars" / "my_generator" / "my_secret" + ) + assert not var_file_path.is_file() + store = password_store.SecretStore( + Machine(name="my_machine", flake=FlakeId(flake.path)) + ) + assert store.exists("my_generator", "my_secret") + assert store.get("my_generator", "my_secret").decode() == "hello\n" + + @pytest.mark.impure def test_generate_secret_for_multiple_machines( monkeypatch: pytest.MonkeyPatch, @@ -196,8 +290,8 @@ def test_generate_secret_for_multiple_machines( assert machine2_var_file_path.is_file() assert machine2_var_file_path.read_text() == "machine2\n" # check if secret vars have been created correctly - sops_store1 = SecretStore(Machine(name="machine1", flake=FlakeId(flake.path))) - sops_store2 = SecretStore(Machine(name="machine2", flake=FlakeId(flake.path))) + sops_store1 = sops.SecretStore(Machine(name="machine1", flake=FlakeId(flake.path))) + sops_store2 = sops.SecretStore(Machine(name="machine2", flake=FlakeId(flake.path))) assert sops_store1.exists("my_generator", "my_secret") assert sops_store2.exists("my_generator", "my_secret") assert sops_store1.get("my_generator", "my_secret").decode() == "machine1\n"