sops: initialize age_plugins early
This avoids re-initializing the Flake object deep in the tree, which in turn leads to issue when overriding the Flake for testing, eg the URl would reset.
This commit is contained in:
@@ -191,12 +191,7 @@ class Operation(enum.StrEnum):
|
||||
UPDATE_KEYS = "updatekeys"
|
||||
|
||||
|
||||
def load_age_plugins(flake_dir: str | Path) -> list[str]:
|
||||
if not flake_dir:
|
||||
msg = "Missing flake directory"
|
||||
raise ClanError(msg)
|
||||
|
||||
flake = Flake(str(flake_dir))
|
||||
def load_age_plugins(flake: Flake) -> list[str]:
|
||||
result = flake.select("clanInternals.?secrets.?age.?plugins")
|
||||
plugins = result["secrets"]["age"]["plugins"]
|
||||
if plugins == {}:
|
||||
@@ -210,10 +205,10 @@ def load_age_plugins(flake_dir: str | Path) -> list[str]:
|
||||
|
||||
|
||||
def sops_run(
|
||||
flake_dir: str | Path,
|
||||
call: Operation,
|
||||
secret_path: Path,
|
||||
public_keys: Iterable[SopsKey],
|
||||
age_plugins: list[str] | None,
|
||||
run_opts: RunOpts | None = None,
|
||||
) -> tuple[int, str]:
|
||||
"""Call the sops binary for the given operation."""
|
||||
@@ -221,6 +216,8 @@ def sops_run(
|
||||
# one place because calling into sops needs to be done with a carefully
|
||||
# setup context, and I don't feel good about the idea of having that logic
|
||||
# exist in multiple places.
|
||||
if age_plugins is None:
|
||||
age_plugins = []
|
||||
sops_cmd = ["sops"]
|
||||
environ = os.environ.copy()
|
||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||
@@ -268,8 +265,6 @@ def sops_run(
|
||||
raise ClanError(msg)
|
||||
sops_cmd.append(str(secret_path))
|
||||
|
||||
age_plugins = load_age_plugins(flake_dir)
|
||||
|
||||
cmd = nix_shell(["sops", "gnupg", *age_plugins], sops_cmd)
|
||||
opts = (
|
||||
dataclasses.replace(run_opts, env=environ)
|
||||
@@ -440,27 +435,27 @@ def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
|
||||
|
||||
|
||||
def update_keys(
|
||||
flake_dir: str | Path, secret_path: Path, keys: Iterable[SopsKey]
|
||||
secret_path: Path, keys: Iterable[SopsKey], age_plugins: list[str] | None = None
|
||||
) -> list[Path]:
|
||||
secret_path = secret_path / "secret"
|
||||
error_msg = f"Could not update keys for {secret_path}"
|
||||
|
||||
rc, _ = sops_run(
|
||||
flake_dir,
|
||||
Operation.UPDATE_KEYS,
|
||||
secret_path,
|
||||
keys,
|
||||
RunOpts(log=Log.BOTH, error_msg=error_msg),
|
||||
run_opts=RunOpts(log=Log.BOTH, error_msg=error_msg),
|
||||
age_plugins=age_plugins,
|
||||
)
|
||||
was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED
|
||||
return [secret_path] if was_modified else []
|
||||
|
||||
|
||||
def encrypt_file(
|
||||
flake_dir: str | Path,
|
||||
secret_path: Path,
|
||||
content: str | IO[bytes] | bytes | None,
|
||||
pubkeys: list[SopsKey],
|
||||
age_plugins: list[str] | None = None,
|
||||
) -> None:
|
||||
folder = secret_path.parent
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
@@ -468,11 +463,11 @@ def encrypt_file(
|
||||
if not content:
|
||||
# This will spawn an editor to edit the file.
|
||||
rc, _ = sops_run(
|
||||
flake_dir,
|
||||
Operation.EDIT,
|
||||
secret_path,
|
||||
pubkeys,
|
||||
RunOpts(),
|
||||
run_opts=RunOpts(),
|
||||
age_plugins=age_plugins,
|
||||
)
|
||||
status = ExitStatus.parse(rc)
|
||||
if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED:
|
||||
@@ -507,11 +502,11 @@ def encrypt_file(
|
||||
msg = f"Invalid content type: {type(content)}"
|
||||
raise ClanError(msg)
|
||||
sops_run(
|
||||
flake_dir,
|
||||
Operation.ENCRYPT,
|
||||
Path(source.name),
|
||||
pubkeys,
|
||||
RunOpts(log=Log.BOTH),
|
||||
run_opts=RunOpts(log=Log.BOTH),
|
||||
age_plugins=age_plugins,
|
||||
)
|
||||
# atomic copy of the encrypted file
|
||||
with NamedTemporaryFile(dir=folder, delete=False) as dest:
|
||||
@@ -522,16 +517,16 @@ def encrypt_file(
|
||||
Path(source.name).unlink()
|
||||
|
||||
|
||||
def decrypt_file(flake_dir: str | Path, secret_path: Path) -> str:
|
||||
def decrypt_file(secret_path: Path, age_plugins: list[str] | None = None) -> str:
|
||||
# decryption uses private keys from the environment or default paths:
|
||||
no_public_keys_needed: list[SopsKey] = []
|
||||
|
||||
_, stdout = sops_run(
|
||||
flake_dir,
|
||||
Operation.DECRYPT,
|
||||
secret_path,
|
||||
no_public_keys_needed,
|
||||
RunOpts(error_msg=f"Could not decrypt {secret_path}"),
|
||||
run_opts=RunOpts(error_msg=f"Could not decrypt {secret_path}"),
|
||||
age_plugins=age_plugins,
|
||||
)
|
||||
return stdout
|
||||
|
||||
|
||||
Reference in New Issue
Block a user