Merge pull request 'vars/sops: store secrets in /sops/vars' (#1802) from DavHau/clan-core:DavHau-vars into main

This commit is contained in:
clan-bot
2024-07-24 11:48:31 +00:00
4 changed files with 44 additions and 25 deletions

View File

@@ -48,19 +48,22 @@ class SecretStore(SecretStoreBase):
def get(self, service: str, name: str) -> bytes:
return decrypt_secret(
self.machine.flake_dir, f"{self.machine.name}-{name}"
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
).encode("utf-8")
def exists(self, service: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"{self.machine.name}-{name}",
sops_secrets_folder(self.machine.flake_dir) / f"{self.machine.name}-{name}",
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
if not has_secret(sops_secrets_folder(self.machine.flake_dir) / key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
key = decrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / key_name,
)
(output_dir / "key.txt").write_text(key)

View File

@@ -245,8 +245,8 @@ def disallow_member(group_folder: Path, name: str) -> list[Path]:
)
def has_secret(flake_dir: Path, secret: str) -> bool:
return (sops_secrets_folder(flake_dir) / secret / "secret").exists()
def has_secret(secret_path: Path) -> bool:
return (secret_path / "secret").exists()
def list_secrets(flake_dir: Path, pattern: str | None = None) -> list[str]:
@@ -255,7 +255,7 @@ def list_secrets(flake_dir: Path, pattern: str | None = None) -> list[str]:
def validate(name: str) -> bool:
return (
VALID_SECRET_NAME.match(name) is not None
and has_secret(flake_dir, name)
and has_secret(sops_secrets_folder(flake_dir) / name)
and (pattern is None or pattern in name)
)
@@ -278,16 +278,21 @@ def list_command(args: argparse.Namespace) -> None:
print("\n".join(lst))
def decrypt_secret(flake_dir: Path, secret: str) -> str:
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
ensure_sops_key(flake_dir)
secret_path = sops_secrets_folder(flake_dir) / secret / "secret"
if not secret_path.exists():
raise ClanError(f"Secret '{secret}' does not exist")
return decrypt_file(secret_path)
path = secret_path / "secret"
if not path.exists():
raise ClanError(f"Secret '{secret_path!s}' does not exist")
return decrypt_file(path)
def get_command(args: argparse.Namespace) -> None:
print(decrypt_secret(args.flake.path, args.secret), end="")
print(
decrypt_secret(
args.flake.path, sops_secrets_folder(args.flake.path) / args.secret
),
end="",
)
def set_command(args: argparse.Namespace) -> None:

View File

@@ -36,13 +36,20 @@ class SecretStore(SecretStoreBase):
)
add_machine(self.machine.flake_dir, self.machine.name, pub_key, False)
def secret_path(self, generator_name: str, secret_name: str) -> Path:
return (
self.machine.flake_dir
/ "sops"
/ "vars"
/ self.machine.name
/ generator_name
/ secret_name
)
def set(
self, generator_name: str, name: str, value: bytes, groups: list[str]
) -> Path | None:
path = (
sops_secrets_folder(self.machine.flake_dir)
/ f"vars-{self.machine.name}-{generator_name}-{name}"
)
path = self.secret_path(generator_name, name)
encrypt_secret(
self.machine.flake_dir,
path,
@@ -54,19 +61,21 @@ class SecretStore(SecretStoreBase):
def get(self, generator_name: str, name: str) -> bytes:
return decrypt_secret(
self.machine.flake_dir, f"vars-{self.machine.name}-{generator_name}-{name}"
self.machine.flake_dir, self.secret_path(generator_name, name)
).encode("utf-8")
def exists(self, generator_name: str, name: str) -> bool:
return has_secret(
self.machine.flake_dir,
f"vars-{self.machine.name}-{generator_name}-{name}",
self.secret_path(generator_name, name),
)
def upload(self, output_dir: Path) -> None:
key_name = f"{self.machine.name}-age.key"
if not has_secret(self.machine.flake_dir, key_name):
if not has_secret(sops_secrets_folder(self.machine.flake_dir) / key_name):
# skip uploading the secret, not managed by us
return
key = decrypt_secret(self.machine.flake_dir, key_name)
key = decrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) / key_name,
)
(output_dir / "key.txt").write_text(key)

View File

@@ -173,8 +173,10 @@ def test_generate_secret_var_sops_with_default_group(
assert (
flake.path
/ "sops"
/ "secrets"
/ "vars-my_machine-my_generator-my_secret"
/ "vars"
/ "my_machine"
/ "my_generator"
/ "my_secret"
/ "groups"
/ "my_group"
).exists()