feat: configure age plugins for SOPS in buildClan
This commit is contained in:
committed by
Michael Hoang
parent
d3e1c0b4e4
commit
a438fe77a7
@@ -18,7 +18,7 @@ from clan_lib.api import API
|
||||
from clan_cli.cmd import Log, RunOpts, run
|
||||
from clan_cli.dirs import user_config_dir
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.nix import nix_eval, nix_shell
|
||||
|
||||
from .folders import sops_users_folder
|
||||
|
||||
@@ -191,7 +191,41 @@ class Operation(enum.StrEnum):
|
||||
UPDATE_KEYS = "updatekeys"
|
||||
|
||||
|
||||
def load_age_plugins(flake_dir: str | Path) -> list[str]:
|
||||
if not flake_dir:
|
||||
msg = "Missing flake directory"
|
||||
raise ClanError(msg)
|
||||
|
||||
cmd = nix_eval(
|
||||
[
|
||||
f"{flake_dir}#clanInternals.secrets.age.plugins",
|
||||
"--json",
|
||||
]
|
||||
)
|
||||
|
||||
try:
|
||||
result = run(cmd)
|
||||
except Exception as e:
|
||||
msg = f"Failed to load age plugins {flake_dir}"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
json_str = result.stdout.strip()
|
||||
|
||||
try:
|
||||
plugins = json.loads(json_str)
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Failed to decode '{json_str}': {e}"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
if isinstance(plugins, list):
|
||||
return plugins
|
||||
|
||||
msg = f"Expected a list of age plugins but {type(plugins)!r} was provided"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def sops_run(
|
||||
flake_dir: str | Path,
|
||||
call: Operation,
|
||||
secret_path: Path,
|
||||
public_keys: Iterable[SopsKey],
|
||||
@@ -249,7 +283,9 @@ def sops_run(
|
||||
raise ClanError(msg)
|
||||
sops_cmd.append(str(secret_path))
|
||||
|
||||
cmd = nix_shell(["sops", "gnupg"], sops_cmd)
|
||||
age_plugins = load_age_plugins(flake_dir)
|
||||
|
||||
cmd = nix_shell(["sops", "gnupg", *age_plugins], sops_cmd)
|
||||
opts = (
|
||||
dataclasses.replace(run_opts, env=environ)
|
||||
if run_opts
|
||||
@@ -411,11 +447,14 @@ def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
|
||||
return user_keys
|
||||
|
||||
|
||||
def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]:
|
||||
def update_keys(
|
||||
flake_dir: str | Path, secret_path: Path, keys: Iterable[SopsKey]
|
||||
) -> list[Path]:
|
||||
secret_path = secret_path / "secret"
|
||||
error_msg = f"Could not update keys for {secret_path}"
|
||||
|
||||
rc, _ = sops_run(
|
||||
flake_dir,
|
||||
Operation.UPDATE_KEYS,
|
||||
secret_path,
|
||||
keys,
|
||||
@@ -426,6 +465,7 @@ def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]:
|
||||
|
||||
|
||||
def encrypt_file(
|
||||
flake_dir: str | Path,
|
||||
secret_path: Path,
|
||||
content: str | IO[bytes] | bytes | None,
|
||||
pubkeys: list[SopsKey],
|
||||
@@ -436,6 +476,7 @@ def encrypt_file(
|
||||
if not content:
|
||||
# This will spawn an editor to edit the file.
|
||||
rc, _ = sops_run(
|
||||
flake_dir,
|
||||
Operation.EDIT,
|
||||
secret_path,
|
||||
pubkeys,
|
||||
@@ -474,6 +515,7 @@ def encrypt_file(
|
||||
msg = f"Invalid content type: {type(content)}"
|
||||
raise ClanError(msg)
|
||||
sops_run(
|
||||
flake_dir,
|
||||
Operation.ENCRYPT,
|
||||
Path(source.name),
|
||||
pubkeys,
|
||||
@@ -488,11 +530,12 @@ def encrypt_file(
|
||||
Path(source.name).unlink()
|
||||
|
||||
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
def decrypt_file(flake_dir: str | Path, secret_path: Path) -> str:
|
||||
# decryption uses private keys from the environment or default paths:
|
||||
no_public_keys_needed: list[SopsKey] = []
|
||||
|
||||
_, stdout = sops_run(
|
||||
flake_dir,
|
||||
Operation.DECRYPT,
|
||||
secret_path,
|
||||
no_public_keys_needed,
|
||||
|
||||
Reference in New Issue
Block a user