vars: introduce ensure_machine_has_access method for sops

this should help avoiding overriding existing shared secrets by not
triggering vars regeneration if a machine has no access.

wip
This commit is contained in:
Jörg Thalheim
2024-11-14 13:17:50 +01:00
committed by Mic92
parent 8f1e5ed1eb
commit c98055c781
3 changed files with 28 additions and 22 deletions

View File

@@ -337,8 +337,12 @@ def ensure_consistent_state(
for name, file in machine.vars_generators[generator_name]["files"].items(): for name, file in machine.vars_generators[generator_name]["files"].items():
shared = machine.vars_generators[generator_name]["share"] shared = machine.vars_generators[generator_name]["share"]
if file["secret"] and machine.secret_vars_store.exists( if file["secret"] and machine.secret_vars_store.exists(
generator_name, name generator_name, name, shared=shared
): ):
if file["deploy"]:
machine.secret_vars_store.ensure_machine_has_access(
generator_name, name, shared=shared
)
needs_update, msg = machine.secret_vars_store.needs_fix( needs_update, msg = machine.secret_vars_store.needs_fix(
generator_name, name, shared=shared generator_name, name, shared=shared
) )

View File

@@ -12,6 +12,11 @@ class SecretStoreBase(StoreBase):
def needs_upload(self) -> bool: def needs_upload(self) -> bool:
return True return True
def ensure_machine_has_access(
self, generator_name: str, name: str, shared: bool = False
) -> None:
pass
def needs_fix( def needs_fix(
self, self,
generator_name: str, generator_name: str,

View File

@@ -110,20 +110,15 @@ class SecretStore(SecretStoreBase):
self.backend_collision_error(secret_folder) self.backend_collision_error(secret_folder)
# create directory if it doesn't exist # create directory if it doesn't exist
secret_folder.mkdir(parents=True, exist_ok=True) secret_folder.mkdir(parents=True, exist_ok=True)
if shared and self.exists_shared(generator_name, name): # initialize the secret
# secret exists, but this machine doesn't have access -> add machine encrypt_secret(
# add_secret will be a no-op if the machine is already added self.machine.flake_dir,
add_secret(self.machine.flake_dir, self.machine.name, secret_folder) secret_folder,
else: value,
# initialize the secret add_machines=[self.machine.name] if deployed else [],
encrypt_secret( add_groups=self.machine.deployment["sops"]["defaultGroups"],
self.machine.flake_dir, git_commit=False,
secret_folder, )
value,
add_machines=[self.machine.name] if deployed else [],
add_groups=self.machine.deployment["sops"]["defaultGroups"],
git_commit=False,
)
return secret_folder return secret_folder
def get(self, generator_name: str, name: str, shared: bool = False) -> bytes: def get(self, generator_name: str, name: str, shared: bool = False) -> bytes:
@@ -142,15 +137,17 @@ class SecretStore(SecretStoreBase):
) )
(output_dir / "key.txt").write_text(key) (output_dir / "key.txt").write_text(key)
def exists_shared(self, generator_name: str, name: str) -> bool:
secret_folder = self.secret_path(generator_name, name, shared=True)
return (secret_folder / "secret").exists()
def exists(self, generator_name: str, name: str, shared: bool = False) -> bool: def exists(self, generator_name: str, name: str, shared: bool = False) -> bool:
secret_folder = self.secret_path(generator_name, name, shared) secret_folder = self.secret_path(generator_name, name, shared)
if not (secret_folder / "secret").exists(): return (secret_folder / "secret").exists()
return False
return not shared or self.machine_has_access(generator_name, name, shared) def ensure_machine_has_access(
self, generator_name: str, name: str, shared: bool = False
) -> None:
if self.machine_has_access(generator_name, name, shared):
return
secret_folder = self.secret_path(generator_name, name, shared)
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]: def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]:
from clan_cli.secrets.secrets import ( from clan_cli.secrets.secrets import (