Merge pull request 'vars/sops: improve shared secrets, switching backend' (#2151) from DavHau/clan-core:DavHau-vars-migration into main

This commit is contained in:
clan-bot
2024-09-20 13:18:01 +00:00
6 changed files with 181 additions and 37 deletions

View File

@@ -1,9 +1,9 @@
import logging
import shutil
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from clan_cli.errors import ClanError
from clan_cli.machines import machines
log = logging.getLogger(__name__)
@@ -113,6 +113,13 @@ class StoreBase(ABC):
def is_secret_store(self) -> bool:
pass
def backend_collision_error(self, folder: Path) -> None:
msg = (
f"Var folder {folder} exists but doesn't look like a {self.store_name} secret."
"Potentially a leftover from another backend. Please delete it manually."
)
raise ClanError(msg)
def rel_dir(self, generator_name: str, var_name: str, shared: bool = False) -> Path:
if shared:
return Path(f"shared/{generator_name}/{var_name}")
@@ -145,12 +152,6 @@ class StoreBase(ABC):
else:
old_val = None
old_val_str = "<not set>"
directory = self.directory(generator_name, var_name, shared)
# delete directory
if directory.exists():
shutil.rmtree(directory)
# re-create directory
directory.mkdir(parents=True, exist_ok=True)
new_file = self._set(generator_name, var_name, value, shared, deployed)
if self.is_secret_store:
print(f"Updated secret var {generator_name}/{var_name}\n")

View File

@@ -1,3 +1,4 @@
import shutil
from pathlib import Path
from clan_cli.errors import ClanError
@@ -23,14 +24,22 @@ class FactStore(FactStoreBase):
shared: bool = False,
deployed: bool = True,
) -> Path | None:
if self.machine.flake.is_local():
file_path = self.directory(generator_name, name, shared) / "value"
if not self.machine.flake.is_local():
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg)
folder = self.directory(generator_name, name, shared)
if folder.exists():
if not (folder / "value").exists():
# another backend has used that folder before -> error out
self.backend_collision_error(folder)
shutil.rmtree(folder)
# re-create directory
file_path = folder / "value"
folder.mkdir(parents=True, exist_ok=True)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch()
file_path.write_bytes(value)
return file_path
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg)
# get a single fact
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:

View File

@@ -1,7 +1,8 @@
import json
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.folders import sops_machines_folder, sops_secrets_folder
from clan_cli.secrets.machines import add_machine, add_secret, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
@@ -40,6 +41,18 @@ class SecretStore(SecretStoreBase):
def store_name(self) -> str:
return "sops"
def machine_has_access(
self, generator_name: str, secret_name: str, shared: bool
) -> bool:
secret_path = self.secret_path(generator_name, secret_name, shared)
secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in secret["sops"]["age"]]
machines_folder_path = sops_machines_folder(self.machine.flake_dir)
machine_pubkey = json.loads(
(machines_folder_path / self.machine.name / "key.json").read_text()
)["publickey"]
return machine_pubkey in recipients
def secret_path(
self, generator_name: str, secret_name: str, shared: bool = False
) -> Path:
@@ -53,16 +66,28 @@ class SecretStore(SecretStoreBase):
shared: bool = False,
deployed: bool = True,
) -> Path | None:
path = self.secret_path(generator_name, name, shared)
secret_folder = self.secret_path(generator_name, name, shared)
# delete directory
if secret_folder.exists() and not (secret_folder / "secret").exists():
# another backend has used that folder before -> error out
self.backend_collision_error(secret_folder)
# create directory if it doesn't exist
secret_folder.mkdir(parents=True, exist_ok=True)
if shared and self.exists_shared(generator_name, name):
# secret exists, but this machine doesn't have access -> add machine
# add_secret will be a no-op if the machine is already added
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
else:
# initialize the secret
encrypt_secret(
self.machine.flake_dir,
path,
secret_folder,
value,
add_machines=[self.machine.name],
add_machines=[self.machine.name] if deployed else [],
add_groups=self.machine.deployment["sops"]["defaultGroups"],
git_commit=False,
)
return path
return secret_folder
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:
return decrypt_secret(
@@ -80,10 +105,14 @@ class SecretStore(SecretStoreBase):
)
(output_dir / "key.txt").write_text(key)
def exists_shared(self, generator_name: str, name: str) -> bool:
secret_folder = self.secret_path(generator_name, name, shared=True)
return (secret_folder / "secret").exists()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
secret_folder = self.secret_path(generator_name, name, shared)
if not (secret_folder / "secret").exists():
return False
# add_secret will be a no-op if the machine is already added
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
if not shared:
return True
return self.machine_has_access(generator_name, name, shared)

View File

@@ -54,6 +54,16 @@ class FlakeForTest(NamedTuple):
from age_keys import KEYS, KeyPair
def set_machine_settings(
flake: Path,
machine_name: str,
machine_settings: dict,
) -> None:
settings_path = flake / "machines" / machine_name / "settings.json"
settings_path.parent.mkdir(parents=True, exist_ok=True)
settings_path.write_text(json.dumps(machine_settings, indent=2))
def generate_flake(
temporary_home: Path,
flake_template: Path,
@@ -117,9 +127,7 @@ def generate_flake(
# generate machines from machineConfigs
for machine_name, machine_config in machine_configs.items():
settings_path = flake / "machines" / machine_name / "settings.json"
settings_path.parent.mkdir(parents=True, exist_ok=True)
settings_path.write_text(json.dumps(machine_config, indent=2))
set_machine_settings(flake, machine_name, machine_config)
if "/tmp" not in str(os.environ.get("HOME")):
log.warning(

View File

@@ -8,14 +8,16 @@ from tempfile import TemporaryDirectory
import pytest
from age_keys import SopsSetup
from clan_cli.clan_uri import FlakeId
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_eval, nix_shell, run
from clan_cli.vars.check import check_vars
from clan_cli.vars.generate import generate_vars_for_machine
from clan_cli.vars.list import stringify_all_vars
from clan_cli.vars.public_modules import in_repo
from clan_cli.vars.secret_modules import password_store, sops
from clan_cli.vars.set import set_var
from fixtures_flakes import generate_flake
from fixtures_flakes import generate_flake, set_machine_settings
from helpers import cli
from helpers.nixos_config import nested_dict
from root import CLAN_CORE
@@ -152,6 +154,10 @@ def test_generate_secret_var_sops(
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
vars_text = stringify_all_vars(machine)
assert "my_generator/my_secret" in vars_text
# test regeneration works
cli.run(
["vars", "generate", "--flake", str(flake.path), "my_machine", "--regenerate"]
)
@pytest.mark.impure
@@ -186,6 +192,50 @@ def test_generate_secret_var_sops_with_default_group(
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generated_shared_secret_sops(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
m1_config = nested_dict()
shared_generator = m1_config["clan"]["core"]["vars"]["generators"][
"my_shared_generator"
]
shared_generator["share"] = True
shared_generator["files"]["my_shared_secret"]["secret"] = True
shared_generator["script"] = "echo hello > $out/my_shared_secret"
m2_config = nested_dict()
m2_config["clan"]["core"]["vars"]["generators"]["my_shared_generator"] = (
shared_generator.copy()
)
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"machine1": m1_config, "machine2": m2_config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
machine1 = Machine(name="machine1", flake=FlakeId(str(flake.path)))
machine2 = Machine(name="machine2", flake=FlakeId(str(flake.path)))
cli.run(["vars", "generate", "--flake", str(flake.path), "machine1"])
assert check_vars(machine1)
cli.run(["vars", "generate", "--flake", str(flake.path), "machine2"])
assert check_vars(machine2)
assert check_vars(machine2)
m1_sops_store = sops.SecretStore(machine1)
m2_sops_store = sops.SecretStore(machine2)
assert m1_sops_store.exists("my_shared_generator", "my_shared_secret", shared=True)
assert m2_sops_store.exists("my_shared_generator", "my_shared_secret", shared=True)
assert m1_sops_store.machine_has_access(
"my_shared_generator", "my_shared_secret", shared=True
)
assert m2_sops_store.machine_has_access(
"my_shared_generator", "my_shared_secret", shared=True
)
@pytest.mark.impure
def test_generate_secret_var_password_store(
monkeypatch: pytest.MonkeyPatch,
@@ -383,21 +433,21 @@ def test_share_flag(
) -> None:
config = nested_dict()
shared_generator = config["clan"]["core"]["vars"]["generators"]["shared_generator"]
shared_generator["share"] = True
shared_generator["files"]["my_secret"]["secret"] = True
shared_generator["files"]["my_value"]["secret"] = False
shared_generator["script"] = (
"echo hello > $out/my_secret && echo hello > $out/my_value"
)
shared_generator["share"] = True
unshared_generator = config["clan"]["core"]["vars"]["generators"][
"unshared_generator"
]
unshared_generator["share"] = False
unshared_generator["files"]["my_secret"]["secret"] = True
unshared_generator["files"]["my_value"]["secret"] = False
unshared_generator["script"] = (
"echo hello > $out/my_secret && echo hello > $out/my_value"
)
unshared_generator["share"] = False
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
@@ -775,3 +825,47 @@ def test_migration(
)
assert in_repo_store.exists("my_generator", "my_value")
assert in_repo_store.get("my_generator", "my_value").decode() == "hello"
@pytest.mark.impure
def test_fails_when_files_are_left_from_other_backend(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
my_secret_generator = config["clan"]["core"]["vars"]["generators"][
"my_secret_generator"
]
my_secret_generator["files"]["my_secret"]["secret"] = True
my_secret_generator["script"] = "echo hello > $out/my_secret"
my_value_generator = config["clan"]["core"]["vars"]["generators"][
"my_value_generator"
]
my_value_generator["files"]["my_value"]["secret"] = False
my_value_generator["script"] = "echo hello > $out/my_value"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
sops_setup.init()
monkeypatch.chdir(flake.path)
for generator in ["my_secret_generator", "my_value_generator"]:
generate_vars_for_machine(
Machine(name="my_machine", flake=FlakeId(str(flake.path))),
generator,
regenerate=False,
)
my_secret_generator["files"]["my_secret"]["secret"] = False
my_value_generator["files"]["my_value"]["secret"] = True
set_machine_settings(flake.path, "my_machine", config)
monkeypatch.chdir(flake.path)
for generator in ["my_secret_generator", "my_value_generator"]:
with pytest.raises(ClanError):
generate_vars_for_machine(
Machine(name="my_machine", flake=FlakeId(str(flake.path))),
generator,
regenerate=False,
)

View File

@@ -24,6 +24,9 @@ def test_vm_deployment(
machine1_config["services"]["getty"]["autologinUser"] = "root"
machine1_config["services"]["openssh"]["enable"] = True
machine1_config["networking"]["firewall"]["enable"] = False
machine1_config["users"]["users"]["root"]["openssh"]["authorizedKeys"]["keys"] = [
# put your key here when debugging and pass ssh_port in run_vm_in_thread call below
]
m1_generator = machine1_config["clan"]["core"]["vars"]["generators"]["m1_generator"]
m1_generator["files"]["my_secret"]["secret"] = True
m1_generator["script"] = """
@@ -46,7 +49,7 @@ def test_vm_deployment(
machine2_config["services"]["getty"]["autologinUser"] = "root"
machine2_config["services"]["openssh"]["enable"] = True
machine2_config["users"]["users"]["root"]["openssh"]["authorizedKeys"]["keys"] = [
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDuhpzDHBPvn8nv8RH1MRomDOaXyP4GziQm7r3MZ1Syk grmpf"
# put your key here when debugging and pass ssh_port in run_vm_in_thread call below
]
machine2_config["networking"]["firewall"]["enable"] = False
machine2_config["clan"]["core"]["vars"]["generators"]["my_shared_generator"] = (
@@ -94,7 +97,7 @@ def test_vm_deployment(
# run nix flake lock
cmd.run(["nix", "flake", "lock"])
vm_m1 = run_vm_in_thread("m1_machine")
vm_m2 = run_vm_in_thread("m2_machine", ssh_port=2222)
vm_m2 = run_vm_in_thread("m2_machine")
qga_m1 = qga_connect("m1_machine", vm_m1)
qga_m2 = qga_connect("m2_machine", vm_m2)
# check my_secret is deployed