vars/sops: improve shared secrets, switching backend

When a second machine checks for a shared secret, now the exists() call returns negative and only when updating the secrets for that machine, the machine is added to the sops receivers.

Also throw proper errors when the user switches backends without cleaning the files first.
This commit is contained in:
DavHau
2024-09-20 15:06:32 +02:00
parent 77ac85df57
commit e49d1f0127
6 changed files with 181 additions and 37 deletions

View File

@@ -1,9 +1,9 @@
import logging
import shutil
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from clan_cli.errors import ClanError
from clan_cli.machines import machines
log = logging.getLogger(__name__)
@@ -113,6 +113,13 @@ class StoreBase(ABC):
def is_secret_store(self) -> bool:
pass
def backend_collision_error(self, folder: Path) -> None:
msg = (
f"Var folder {folder} exists but doesn't look like a {self.store_name} secret."
"Potentially a leftover from another backend. Please delete it manually."
)
raise ClanError(msg)
def rel_dir(self, generator_name: str, var_name: str, shared: bool = False) -> Path:
if shared:
return Path(f"shared/{generator_name}/{var_name}")
@@ -145,12 +152,6 @@ class StoreBase(ABC):
else:
old_val = None
old_val_str = "<not set>"
directory = self.directory(generator_name, var_name, shared)
# delete directory
if directory.exists():
shutil.rmtree(directory)
# re-create directory
directory.mkdir(parents=True, exist_ok=True)
new_file = self._set(generator_name, var_name, value, shared, deployed)
if self.is_secret_store:
print(f"Updated secret var {generator_name}/{var_name}\n")

View File

@@ -1,3 +1,4 @@
import shutil
from pathlib import Path
from clan_cli.errors import ClanError
@@ -23,14 +24,22 @@ class FactStore(FactStoreBase):
shared: bool = False,
deployed: bool = True,
) -> Path | None:
if self.machine.flake.is_local():
file_path = self.directory(generator_name, name, shared) / "value"
if not self.machine.flake.is_local():
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg)
folder = self.directory(generator_name, name, shared)
if folder.exists():
if not (folder / "value").exists():
# another backend has used that folder before -> error out
self.backend_collision_error(folder)
shutil.rmtree(folder)
# re-create directory
file_path = folder / "value"
folder.mkdir(parents=True, exist_ok=True)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch()
file_path.write_bytes(value)
return file_path
msg = f"in_flake fact storage is only supported for local flakes: {self.machine.flake}"
raise ClanError(msg)
# get a single fact
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:

View File

@@ -1,7 +1,8 @@
import json
from pathlib import Path
from clan_cli.machines.machines import Machine
from clan_cli.secrets.folders import sops_secrets_folder
from clan_cli.secrets.folders import sops_machines_folder, sops_secrets_folder
from clan_cli.secrets.machines import add_machine, add_secret, has_machine
from clan_cli.secrets.secrets import decrypt_secret, encrypt_secret, has_secret
from clan_cli.secrets.sops import generate_private_key
@@ -40,6 +41,18 @@ class SecretStore(SecretStoreBase):
def store_name(self) -> str:
return "sops"
def machine_has_access(
self, generator_name: str, secret_name: str, shared: bool
) -> bool:
secret_path = self.secret_path(generator_name, secret_name, shared)
secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in secret["sops"]["age"]]
machines_folder_path = sops_machines_folder(self.machine.flake_dir)
machine_pubkey = json.loads(
(machines_folder_path / self.machine.name / "key.json").read_text()
)["publickey"]
return machine_pubkey in recipients
def secret_path(
self, generator_name: str, secret_name: str, shared: bool = False
) -> Path:
@@ -53,16 +66,28 @@ class SecretStore(SecretStoreBase):
shared: bool = False,
deployed: bool = True,
) -> Path | None:
path = self.secret_path(generator_name, name, shared)
secret_folder = self.secret_path(generator_name, name, shared)
# delete directory
if secret_folder.exists() and not (secret_folder / "secret").exists():
# another backend has used that folder before -> error out
self.backend_collision_error(secret_folder)
# create directory if it doesn't exist
secret_folder.mkdir(parents=True, exist_ok=True)
if shared and self.exists_shared(generator_name, name):
# secret exists, but this machine doesn't have access -> add machine
# add_secret will be a no-op if the machine is already added
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
else:
# initialize the secret
encrypt_secret(
self.machine.flake_dir,
path,
secret_folder,
value,
add_machines=[self.machine.name],
add_machines=[self.machine.name] if deployed else [],
add_groups=self.machine.deployment["sops"]["defaultGroups"],
git_commit=False,
)
return path
return secret_folder
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:
return decrypt_secret(
@@ -80,10 +105,14 @@ class SecretStore(SecretStoreBase):
)
(output_dir / "key.txt").write_text(key)
def exists_shared(self, generator_name: str, name: str) -> bool:
secret_folder = self.secret_path(generator_name, name, shared=True)
return (secret_folder / "secret").exists()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
secret_folder = self.secret_path(generator_name, name, shared)
if not (secret_folder / "secret").exists():
return False
# add_secret will be a no-op if the machine is already added
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
if not shared:
return True
return self.machine_has_access(generator_name, name, shared)

View File

@@ -54,6 +54,16 @@ class FlakeForTest(NamedTuple):
from age_keys import KEYS, KeyPair
def set_machine_settings(
flake: Path,
machine_name: str,
machine_settings: dict,
) -> None:
settings_path = flake / "machines" / machine_name / "settings.json"
settings_path.parent.mkdir(parents=True, exist_ok=True)
settings_path.write_text(json.dumps(machine_settings, indent=2))
def generate_flake(
temporary_home: Path,
flake_template: Path,
@@ -117,9 +127,7 @@ def generate_flake(
# generate machines from machineConfigs
for machine_name, machine_config in machine_configs.items():
settings_path = flake / "machines" / machine_name / "settings.json"
settings_path.parent.mkdir(parents=True, exist_ok=True)
settings_path.write_text(json.dumps(machine_config, indent=2))
set_machine_settings(flake, machine_name, machine_config)
if "/tmp" not in str(os.environ.get("HOME")):
log.warning(

View File

@@ -8,14 +8,16 @@ from tempfile import TemporaryDirectory
import pytest
from age_keys import SopsSetup
from clan_cli.clan_uri import FlakeId
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_eval, nix_shell, run
from clan_cli.vars.check import check_vars
from clan_cli.vars.generate import generate_vars_for_machine
from clan_cli.vars.list import stringify_all_vars
from clan_cli.vars.public_modules import in_repo
from clan_cli.vars.secret_modules import password_store, sops
from clan_cli.vars.set import set_var
from fixtures_flakes import generate_flake
from fixtures_flakes import generate_flake, set_machine_settings
from helpers import cli
from helpers.nixos_config import nested_dict
from root import CLAN_CORE
@@ -152,6 +154,10 @@ def test_generate_secret_var_sops(
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
vars_text = stringify_all_vars(machine)
assert "my_generator/my_secret" in vars_text
# test regeneration works
cli.run(
["vars", "generate", "--flake", str(flake.path), "my_machine", "--regenerate"]
)
@pytest.mark.impure
@@ -186,6 +192,50 @@ def test_generate_secret_var_sops_with_default_group(
assert sops_store.get("my_generator", "my_secret").decode() == "hello\n"
@pytest.mark.impure
def test_generated_shared_secret_sops(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
m1_config = nested_dict()
shared_generator = m1_config["clan"]["core"]["vars"]["generators"][
"my_shared_generator"
]
shared_generator["share"] = True
shared_generator["files"]["my_shared_secret"]["secret"] = True
shared_generator["script"] = "echo hello > $out/my_shared_secret"
m2_config = nested_dict()
m2_config["clan"]["core"]["vars"]["generators"]["my_shared_generator"] = (
shared_generator.copy()
)
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"machine1": m1_config, "machine2": m2_config},
monkeypatch=monkeypatch,
)
monkeypatch.chdir(flake.path)
sops_setup.init()
machine1 = Machine(name="machine1", flake=FlakeId(str(flake.path)))
machine2 = Machine(name="machine2", flake=FlakeId(str(flake.path)))
cli.run(["vars", "generate", "--flake", str(flake.path), "machine1"])
assert check_vars(machine1)
cli.run(["vars", "generate", "--flake", str(flake.path), "machine2"])
assert check_vars(machine2)
assert check_vars(machine2)
m1_sops_store = sops.SecretStore(machine1)
m2_sops_store = sops.SecretStore(machine2)
assert m1_sops_store.exists("my_shared_generator", "my_shared_secret", shared=True)
assert m2_sops_store.exists("my_shared_generator", "my_shared_secret", shared=True)
assert m1_sops_store.machine_has_access(
"my_shared_generator", "my_shared_secret", shared=True
)
assert m2_sops_store.machine_has_access(
"my_shared_generator", "my_shared_secret", shared=True
)
@pytest.mark.impure
def test_generate_secret_var_password_store(
monkeypatch: pytest.MonkeyPatch,
@@ -383,21 +433,21 @@ def test_share_flag(
) -> None:
config = nested_dict()
shared_generator = config["clan"]["core"]["vars"]["generators"]["shared_generator"]
shared_generator["share"] = True
shared_generator["files"]["my_secret"]["secret"] = True
shared_generator["files"]["my_value"]["secret"] = False
shared_generator["script"] = (
"echo hello > $out/my_secret && echo hello > $out/my_value"
)
shared_generator["share"] = True
unshared_generator = config["clan"]["core"]["vars"]["generators"][
"unshared_generator"
]
unshared_generator["share"] = False
unshared_generator["files"]["my_secret"]["secret"] = True
unshared_generator["files"]["my_value"]["secret"] = False
unshared_generator["script"] = (
"echo hello > $out/my_secret && echo hello > $out/my_value"
)
unshared_generator["share"] = False
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
@@ -775,3 +825,47 @@ def test_migration(
)
assert in_repo_store.exists("my_generator", "my_value")
assert in_repo_store.get("my_generator", "my_value").decode() == "hello"
@pytest.mark.impure
def test_fails_when_files_are_left_from_other_backend(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
sops_setup: SopsSetup,
) -> None:
config = nested_dict()
my_secret_generator = config["clan"]["core"]["vars"]["generators"][
"my_secret_generator"
]
my_secret_generator["files"]["my_secret"]["secret"] = True
my_secret_generator["script"] = "echo hello > $out/my_secret"
my_value_generator = config["clan"]["core"]["vars"]["generators"][
"my_value_generator"
]
my_value_generator["files"]["my_value"]["secret"] = False
my_value_generator["script"] = "echo hello > $out/my_value"
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"my_machine": config},
monkeypatch=monkeypatch,
)
sops_setup.init()
monkeypatch.chdir(flake.path)
for generator in ["my_secret_generator", "my_value_generator"]:
generate_vars_for_machine(
Machine(name="my_machine", flake=FlakeId(str(flake.path))),
generator,
regenerate=False,
)
my_secret_generator["files"]["my_secret"]["secret"] = False
my_value_generator["files"]["my_value"]["secret"] = True
set_machine_settings(flake.path, "my_machine", config)
monkeypatch.chdir(flake.path)
for generator in ["my_secret_generator", "my_value_generator"]:
with pytest.raises(ClanError):
generate_vars_for_machine(
Machine(name="my_machine", flake=FlakeId(str(flake.path))),
generator,
regenerate=False,
)

View File

@@ -24,6 +24,9 @@ def test_vm_deployment(
machine1_config["services"]["getty"]["autologinUser"] = "root"
machine1_config["services"]["openssh"]["enable"] = True
machine1_config["networking"]["firewall"]["enable"] = False
machine1_config["users"]["users"]["root"]["openssh"]["authorizedKeys"]["keys"] = [
# put your key here when debugging and pass ssh_port in run_vm_in_thread call below
]
m1_generator = machine1_config["clan"]["core"]["vars"]["generators"]["m1_generator"]
m1_generator["files"]["my_secret"]["secret"] = True
m1_generator["script"] = """
@@ -46,7 +49,7 @@ def test_vm_deployment(
machine2_config["services"]["getty"]["autologinUser"] = "root"
machine2_config["services"]["openssh"]["enable"] = True
machine2_config["users"]["users"]["root"]["openssh"]["authorizedKeys"]["keys"] = [
"ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIDuhpzDHBPvn8nv8RH1MRomDOaXyP4GziQm7r3MZ1Syk grmpf"
# put your key here when debugging and pass ssh_port in run_vm_in_thread call below
]
machine2_config["networking"]["firewall"]["enable"] = False
machine2_config["clan"]["core"]["vars"]["generators"]["my_shared_generator"] = (
@@ -94,7 +97,7 @@ def test_vm_deployment(
# run nix flake lock
cmd.run(["nix", "flake", "lock"])
vm_m1 = run_vm_in_thread("m1_machine")
vm_m2 = run_vm_in_thread("m2_machine", ssh_port=2222)
vm_m2 = run_vm_in_thread("m2_machine")
qga_m1 = qga_connect("m1_machine", vm_m1)
qga_m2 = qga_connect("m2_machine", vm_m2)
# check my_secret is deployed