feat: support age plugins

Extends how we parse the contents of `SOPS_AGE_KEY` / `SOPS_AGE_KEY_FILE` / `keys.txt`, allowing a user to prepend a comment before any `AGE-PLUGIN-` secret key entry to indicate its corresponding public key.

For example:

```
AGE-PLUGIN-FIDO2-HMAC-xxxxxxxxxxxxx
```

The comment can use any prefix (e.g. `# public key: age1xxxx`, `# recipient: age1xxx`) as we are looking directly for `age1xxxx` within the line.

This change is necessary to support `age` plugins as there is no unified mechanism to recover the public key from a plugin's secret key.

If a plugin secret key does not have a preceding public key comment, an error will be thrown when attempting to set a secret.
This commit is contained in:
Brian McGee
2025-04-09 16:06:46 +01:00
committed by Michael Hoang
parent 852fdc2846
commit 1694a977f1
9 changed files with 400 additions and 281 deletions

View File

@@ -4,6 +4,7 @@ import io
import json
import logging
import os
import re
import shutil
import subprocess
from collections.abc import Iterable, Sequence
@@ -19,7 +20,9 @@ from clan_cli.dirs import user_config_dir
from clan_cli.errors import ClanError
from clan_cli.nix import nix_shell
from .folders import sops_machines_folder, sops_users_folder
from .folders import sops_users_folder
AGE_RECIPIENT_REGEX = re.compile(r"^.*((age1|ssh-(rsa|ed25519) ).*?)(\s|$)")
log = logging.getLogger(__name__)
@@ -55,14 +58,20 @@ class KeyType(enum.Enum):
def maybe_read_from_path(key_path: Path) -> None:
try:
# as in parse.go in age:
lines = Path(key_path).read_text().strip().splitlines()
for private_key in filter(lambda ln: not ln.startswith("#"), lines):
public_key = get_public_age_key(private_key)
log.info(
f"Found age public key from a private key "
f"in {key_path}: {public_key}"
)
keyring.append(public_key)
content = Path(key_path).read_text().strip()
try:
for public_key in get_public_age_keys(content):
log.info(
f"Found age public key from a private key "
f"in {key_path}: {public_key}"
)
keyring.append(public_key)
except ClanError as e:
error_msg = f"Failed to read age keys from {key_path}"
raise ClanError(error_msg) from e
except FileNotFoundError:
return
except Exception as ex:
@@ -72,13 +81,19 @@ class KeyType(enum.Enum):
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and
# reads identities line by line. See age/keysource.go in
# Sops, and age/parse.go in Age.
for private_key in keys.strip().splitlines():
public_key = get_public_age_key(private_key)
log.info(
f"Found age public key from a private key "
f"in the environment (SOPS_AGE_KEY): {public_key}"
)
keyring.append(public_key)
content = keys.strip()
try:
for public_key in get_public_age_keys(content):
log.info(
f"Found age public key from a private key "
f"in the environment (SOPS_AGE_KEY): {public_key}"
)
keyring.append(public_key)
except ClanError as e:
error_msg = "Failed to read age keys from SOPS_AGE_KEY"
raise ClanError(error_msg) from e
# Sops will try every location, see age/keysource.go
elif key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
@@ -249,7 +264,44 @@ def sops_run(
return p.returncode, p.stdout
def get_public_age_key(privkey: str) -> str:
def get_public_age_keys(contents: str) -> set[str]:
# we use a set as it's possible we may detect the same key twice, once in a `# comment` and once by recovering it
# from AGE-SECRET-KEY
keys: set[str] = set()
recipient: str | None = None
for line_number, line in enumerate(contents.splitlines()):
match = AGE_RECIPIENT_REGEX.match(line)
if match:
recipient = match[1]
keys.add(recipient)
if line.startswith("#"):
continue
if line.startswith("AGE-PLUGIN-"):
if not recipient:
msg = f"Did you forget to precede line {line_number} with it's corresponding `# recipient: age1xxxxxxxx` entry?"
raise ClanError(msg)
# reset recipient
recipient = None
if line.startswith("AGE-SECRET-KEY-"):
try:
keys.add(get_public_age_key_from_private_key(line))
except Exception as e:
msg = "Failed to get public key for age private key. Is the key malformed?"
raise ClanError(msg) from e
# reset recipient
recipient = None
return keys
def get_public_age_key_from_private_key(privkey: str) -> str:
cmd = nix_shell(["age"], ["age-keygen", "-y"])
error_msg = "Failed to get public key for age private key. Is the key malformed?"
@@ -298,23 +350,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
if folder.exists():
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type)}
return None
def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folder = sops_users_folder(flake_dir)
if folder.exists():
@@ -329,15 +365,6 @@ def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
return None
@API.register
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey]:
maybe_keys = maybe_get_user_or_machine(flake_dir, key)
if maybe_keys:
return maybe_keys
msg = f"A sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
def default_admin_private_key_path() -> Path:
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
if raw_path:
@@ -346,8 +373,9 @@ def default_admin_private_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> None | SopsKey:
def maybe_get_admin_public_key() -> SopsKey | None:
keyring = SopsKey.collect_public_keys()
if len(keyring) == 0:
return None
@@ -366,12 +394,21 @@ def maybe_get_admin_public_key() -> None | SopsKey:
return keyring[0]
def ensure_admin_public_key(flake_dir: Path) -> set[SopsKey]:
def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
if not key:
msg = "No SOPS key found. Please generate one with `clan secrets key generate`."
raise ClanError(msg)
user_keys = maybe_get_user(flake_dir, key)
if not user_keys:
# todo improve error message
msg = f"We could not figure out which Clan secrets user you are with the SOPS key we found: {key.pubkey}"
raise ClanError(msg)
return user_keys
def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]: