feat: support age plugins

Extends how we parse the contents of `SOPS_AGE_KEY` / `SOPS_AGE_KEY_FILE` / `keys.txt`, allowing a user to prepend a comment before any `AGE-PLUGIN-` secret key entry to indicate its corresponding public key.

For example:

```
AGE-PLUGIN-FIDO2-HMAC-xxxxxxxxxxxxx
```

The comment can use any prefix (e.g. `# public key: age1xxxx`, `# recipient: age1xxx`) as we are looking directly for `age1xxxx` within the line.

This change is necessary to support `age` plugins as there is no unified mechanism to recover the public key from a plugin's secret key.

If a plugin secret key does not have a preceding public key comment, an error will be thrown when attempting to set a secret.
This commit is contained in:
Brian McGee
2025-04-09 16:06:46 +01:00
committed by Michael Hoang
parent 3800b8cc1d
commit 1bfe318865
9 changed files with 400 additions and 281 deletions

View File

@@ -155,16 +155,14 @@ def encrypt_secret(
if add_users is None:
add_users = []
keys = sops.ensure_admin_public_key(flake_dir)
admin_keys = sops.ensure_admin_public_keys(flake_dir)
if not keys:
if not admin_keys:
# todo double check the correct command to run
msg = "No keys found. Please run 'clan secrets add-key' to add a key."
raise ClanError(msg)
username = next(iter(keys)).username
recipient_keys = set()
username = next(iter(admin_keys)).username
# encrypt_secret can be called before the secret has been created
# so don't try to call sops.update_keys on a non-existent file:
@@ -203,8 +201,8 @@ def encrypt_secret(
recipient_keys = collect_keys_for_path(secret_path)
if not keys.intersection(recipient_keys):
recipient_keys.update(keys)
if not admin_keys.intersection(recipient_keys):
recipient_keys.update(admin_keys)
files_to_commit.extend(
allow_member(

View File

@@ -4,6 +4,7 @@ import io
import json
import logging
import os
import re
import shutil
import subprocess
from collections.abc import Iterable, Sequence
@@ -19,7 +20,9 @@ from clan_cli.dirs import user_config_dir
from clan_cli.errors import ClanError
from clan_cli.nix import nix_shell
from .folders import sops_machines_folder, sops_users_folder
from .folders import sops_users_folder
AGE_RECIPIENT_REGEX = re.compile(r"^.*((age1|ssh-(rsa|ed25519) ).*?)(\s|$)")
log = logging.getLogger(__name__)
@@ -55,14 +58,20 @@ class KeyType(enum.Enum):
def maybe_read_from_path(key_path: Path) -> None:
try:
# as in parse.go in age:
lines = Path(key_path).read_text().strip().splitlines()
for private_key in filter(lambda ln: not ln.startswith("#"), lines):
public_key = get_public_age_key(private_key)
log.info(
f"Found age public key from a private key "
f"in {key_path}: {public_key}"
)
keyring.append(public_key)
content = Path(key_path).read_text().strip()
try:
for public_key in get_public_age_keys(content):
log.info(
f"Found age public key from a private key "
f"in {key_path}: {public_key}"
)
keyring.append(public_key)
except ClanError as e:
error_msg = f"Failed to read age keys from {key_path}"
raise ClanError(error_msg) from e
except FileNotFoundError:
return
except Exception as ex:
@@ -72,13 +81,19 @@ class KeyType(enum.Enum):
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and
# reads identities line by line. See age/keysource.go in
# Sops, and age/parse.go in Age.
for private_key in keys.strip().splitlines():
public_key = get_public_age_key(private_key)
log.info(
f"Found age public key from a private key "
f"in the environment (SOPS_AGE_KEY): {public_key}"
)
keyring.append(public_key)
content = keys.strip()
try:
for public_key in get_public_age_keys(content):
log.info(
f"Found age public key from a private key "
f"in the environment (SOPS_AGE_KEY): {public_key}"
)
keyring.append(public_key)
except ClanError as e:
error_msg = "Failed to read age keys from SOPS_AGE_KEY"
raise ClanError(error_msg) from e
# Sops will try every location, see age/keysource.go
elif key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
@@ -249,7 +264,44 @@ def sops_run(
return p.returncode, p.stdout
def get_public_age_key(privkey: str) -> str:
def get_public_age_keys(contents: str) -> set[str]:
# we use a set as it's possible we may detect the same key twice, once in a `# comment` and once by recovering it
# from AGE-SECRET-KEY
keys: set[str] = set()
recipient: str | None = None
for line_number, line in enumerate(contents.splitlines()):
match = AGE_RECIPIENT_REGEX.match(line)
if match:
recipient = match[1]
keys.add(recipient)
if line.startswith("#"):
continue
if line.startswith("AGE-PLUGIN-"):
if not recipient:
msg = f"Did you forget to precede line {line_number} with it's corresponding `# recipient: age1xxxxxxxx` entry?"
raise ClanError(msg)
# reset recipient
recipient = None
if line.startswith("AGE-SECRET-KEY-"):
try:
keys.add(get_public_age_key_from_private_key(line))
except Exception as e:
msg = "Failed to get public key for age private key. Is the key malformed?"
raise ClanError(msg) from e
# reset recipient
recipient = None
return keys
def get_public_age_key_from_private_key(privkey: str) -> str:
cmd = nix_shell(["age"], ["age-keygen", "-y"])
error_msg = "Failed to get public key for age private key. Is the key malformed?"
@@ -298,23 +350,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
if folder.exists():
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type)}
return None
def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
folder = sops_users_folder(flake_dir)
if folder.exists():
@@ -329,15 +365,6 @@ def get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
return None
@API.register
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> set[SopsKey]:
maybe_keys = maybe_get_user_or_machine(flake_dir, key)
if maybe_keys:
return maybe_keys
msg = f"A sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
def default_admin_private_key_path() -> Path:
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
if raw_path:
@@ -346,8 +373,9 @@ def default_admin_private_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> None | SopsKey:
def maybe_get_admin_public_key() -> SopsKey | None:
keyring = SopsKey.collect_public_keys()
if len(keyring) == 0:
return None
@@ -366,12 +394,21 @@ def maybe_get_admin_public_key() -> None | SopsKey:
return keyring[0]
def ensure_admin_public_key(flake_dir: Path) -> set[SopsKey]:
def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
if not key:
msg = "No SOPS key found. Please generate one with `clan secrets key generate`."
raise ClanError(msg)
user_keys = maybe_get_user(flake_dir, key)
if not user_keys:
# todo improve error message
msg = f"We could not figure out which Clan secrets user you are with the SOPS key we found: {key.pubkey}"
raise ClanError(msg)
return user_keys
def update_keys(secret_path: Path, keys: Iterable[SopsKey]) -> list[Path]:

View File

@@ -5,7 +5,7 @@ from pathlib import Path
from clan_cli.errors import ClanError
from .sops import get_public_age_key
from .sops import get_public_age_keys
VALID_SECRET_NAME = re.compile(r"^[a-zA-Z0-9._-]+$")
VALID_USER_NAME = re.compile(r"^[a-z_]([a-z0-9_-]{0,31})?$")
@@ -21,15 +21,20 @@ def secret_name_type(arg_value: str) -> str:
def public_or_private_age_key_type(arg_value: str) -> str:
if Path(arg_value).is_file():
arg_value = Path(arg_value).read_text().strip()
for line in arg_value.splitlines():
if line.startswith("#"):
continue
if line.startswith("age1"):
return line.strip()
if line.startswith("AGE-SECRET-KEY-"):
return get_public_age_key(line)
msg = f"Please provide an age public key starting with age1 or an age private key AGE-SECRET-KEY-, got: '{arg_value}'"
raise ClanError(msg)
public_keys = get_public_age_keys(arg_value)
match len(public_keys):
case 0:
msg = f"Please provide an age public key starting with age1 or an age private key starting with AGE-SECRET-KEY- or AGE-PLUGIN-, got: '{arg_value}'"
raise ClanError(msg)
case 1:
return next(iter(public_keys))
case n:
msg = f"{n} age keys were provided, please provide only 1: '{arg_value}'"
raise ClanError(msg)
def group_or_user_name_type(what: str) -> Callable[[str], str]:

View File

@@ -91,6 +91,7 @@ def test_import_sops(
cli.run(cmd)
with capture_output as output:
cli.run(["secrets", "users", "list", "--flake", str(test_flake.path)])
users = sorted(output.out.rstrip().split())
assert users == ["user1", "user2"]

View File

@@ -40,79 +40,89 @@ def test_add_module_to_inventory(
age_keys: list["KeyPair"],
) -> None:
base_path = test_flake_with_core.path
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake_with_core.path),
"user1",
age_keys[0].pubkey,
]
)
opts = CreateOptions(
clan_dir=Flake(str(base_path)),
machine=Machine(name="machine1", tags=[], deploy=MachineDeploy()),
)
with monkeypatch.context():
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
create_machine(opts)
(test_flake_with_core.path / "machines" / "machine1" / "facter.json").write_text(
json.dumps(
{
"version": 1,
"system": "x86_64-linux",
}
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake_with_core.path),
"user1",
age_keys[0].pubkey,
]
)
opts = CreateOptions(
clan_dir=Flake(str(base_path)),
machine=Machine(name="machine1", tags=[], deploy=MachineDeploy()),
)
)
subprocess.run(["git", "add", "."], cwd=test_flake_with_core.path, check=True)
inventory: Inventory = {}
create_machine(opts)
(
test_flake_with_core.path / "machines" / "machine1" / "facter.json"
).write_text(
json.dumps(
{
"version": 1,
"system": "x86_64-linux",
}
)
)
subprocess.run(["git", "add", "."], cwd=test_flake_with_core.path, check=True)
inventory["services"] = {
"borgbackup": {
"borg1": {
"meta": {"name": "borg1"},
"roles": {
"client": {"machines": ["machine1"]},
"server": {"machines": ["machine1"]},
},
inventory: Inventory = {}
inventory["services"] = {
"borgbackup": {
"borg1": {
"meta": {"name": "borg1"},
"roles": {
"client": {"machines": ["machine1"]},
"server": {"machines": ["machine1"]},
},
}
}
}
}
set_inventory(inventory, base_path, "Add borgbackup service")
set_inventory(inventory, base_path, "Add borgbackup service")
# cmd = ["facts", "generate", "--flake", str(test_flake_with_core.path), "machine1"]
cmd = ["vars", "generate", "--flake", str(test_flake_with_core.path), "machine1"]
cli.run(cmd)
machine = MachineMachine(
name="machine1", flake=Flake(str(test_flake_with_core.path))
)
generator = None
for gen in machine.vars_generators:
if gen.name == "borgbackup":
generator = gen
break
assert generator
ssh_key = machine.public_vars_store.get(generator, "borgbackup.ssh.pub")
cmd = nix_eval(
[
f"{base_path}#nixosConfigurations.machine1.config.services.borgbackup.repos",
"--json",
# cmd = ["facts", "generate", "--flake", str(test_flake_with_core.path), "machine1"]
cmd = [
"vars",
"generate",
"--flake",
str(test_flake_with_core.path),
"machine1",
]
)
proc = run_no_stdout(cmd)
res = json.loads(proc.stdout.strip())
assert res["machine1"]["authorizedKeys"] == [ssh_key.decode()]
cli.run(cmd)
machine = MachineMachine(
name="machine1", flake=Flake(str(test_flake_with_core.path))
)
generator = None
for gen in machine.vars_generators:
if gen.name == "borgbackup":
generator = gen
break
assert generator
ssh_key = machine.public_vars_store.get(generator, "borgbackup.ssh.pub")
cmd = nix_eval(
[
f"{base_path}#nixosConfigurations.machine1.config.services.borgbackup.repos",
"--json",
]
)
proc = run_no_stdout(cmd)
res = json.loads(proc.stdout.strip())
assert res["machine1"]["authorizedKeys"] == [ssh_key.decode()]

View File

@@ -75,9 +75,10 @@ def _test_identities(
]
)
with monkeypatch.context():
monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed")
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
with monkeypatch.context() as m:
m.setenv("SOPS_NIX_SECRET", "deadfeed")
m.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
@@ -89,6 +90,7 @@ def _test_identities(
test_secret_name,
]
)
assert_secrets_file_recipients(
test_flake.path,
test_secret_name,
@@ -157,87 +159,92 @@ def test_users(
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
_test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
with monkeypatch.context():
_test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
# some additional user-specific tests
# some additional user-specific tests
admin_key = age_keys[2]
sops_folder = test_flake.path / "sops"
admin_key = age_keys[2]
sops_folder = test_flake.path / "sops"
user_keys = {
"bob": [age_keys[0], age_keys[1]],
"alice": [age_keys[2]],
"charlie": [age_keys[3], age_keys[4]],
}
user_keys = {
"bob": [age_keys[0], age_keys[1]],
"alice": [age_keys[2]],
"charlie": [age_keys[3], age_keys[4]],
}
for user, keys in user_keys.items():
key_args = [f"--age-key={key.pubkey}" for key in keys]
monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed")
# add the user keys
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
user,
*key_args,
]
)
assert (sops_folder / "users" / user / "key.json").exists()
for user, keys in user_keys.items():
key_args = [f"--age-key={key.pubkey}" for key in keys]
# check they are returned in get
with capture_output as output:
cli.run(["secrets", "users", "get", "--flake", str(test_flake.path), user])
# add the user keys
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
user,
*key_args,
]
)
assert (sops_folder / "users" / user / "key.json").exists()
for key in keys:
assert key.pubkey in output.out
# check they are returned in get
with capture_output as output:
cli.run(
["secrets", "users", "get", "--flake", str(test_flake.path), user]
)
# set a secret
secret_name = f"{user}_secret"
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
"--user",
user,
for key in keys:
assert key.pubkey in output.out
# set a secret
secret_name = f"{user}_secret"
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
"--user",
user,
secret_name,
]
)
# check the secret has each of our user's keys as a recipient
# in addition the admin key should be there
assert_secrets_file_recipients(
test_flake.path,
secret_name,
]
)
expected_age_recipients_keypairs=[admin_key, *keys],
)
# check the secret has each of our user's keys as a recipient
# in addition the admin key should be there
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_key, *keys],
)
if len(keys) == 1:
continue
if len(keys) == 1:
continue
# remove one of the keys
cli.run(
[
"secrets",
"users",
"remove-key",
"--flake",
str(test_flake.path),
user,
keys[0].pubkey,
]
)
# remove one of the keys
cli.run(
[
"secrets",
"users",
"remove-key",
"--flake",
str(test_flake.path),
user,
keys[0].pubkey,
]
)
# check the secret has been updated
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_key, *keys[1:]],
)
# check the secret has been updated
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_key, *keys[1:]],
)
def test_machines(
@@ -368,6 +375,7 @@ def test_groups(
with monkeypatch.context():
monkeypatch.setenv("SOPS_NIX_SECRET", "deafbeef")
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
@@ -539,11 +547,20 @@ def test_secrets(
# Read the key that was generated
with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = json.loads(output.out)["publickey"]
assert key.startswith("age1")
key = json.loads(output.out)
assert key["publickey"].startswith("age1")
# Add testuser with the key that was generated for the clan
cli.run(
["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key]
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"testuser",
key["publickey"],
]
)
with pytest.raises(ClanError): # does not exist yet
@@ -855,6 +872,7 @@ def test_secrets_key_generate_gpg(
"testuser",
]
)
with capture_output as output:
cli.run(
[
@@ -873,8 +891,12 @@ def test_secrets_key_generate_gpg(
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key.fingerprint
monkeypatch.setenv("SOPS_NIX_SECRET", "secret-value")
cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"])
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-name"])
assert output.out == "secret-value"
with monkeypatch.context() as m:
m.setenv("SOPS_NIX_SECRET", "secret-value")
cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"])
with capture_output as output:
cli.run(
["secrets", "get", "--flake", str(test_flake.path), "secret-name"]
)
assert output.out == "secret-value"

View File

@@ -44,50 +44,55 @@ def test_secrets_upload(
config["clan"]["networking"]["targetHost"] = addr
config["clan"]["core"]["facts"]["secretUploadDirectory"] = str(flake.path / "facts")
flake.refresh()
monkeypatch.chdir(str(flake.path))
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
sops_dir = flake.path / "facts"
with monkeypatch.context():
monkeypatch.chdir(str(flake.path))
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
# the flake defines this path as the location where the sops key should be installed
sops_key = sops_dir / "key.txt"
sops_key2 = sops_dir / "key2.txt"
sops_dir = flake.path / "facts"
# Create old state, which should be cleaned up
sops_dir.mkdir()
sops_key.write_text("OLD STATE")
sops_key2.write_text("OLD STATE2")
# the flake defines this path as the location where the sops key should be installed
sops_key = sops_dir / "key.txt"
sops_key2 = sops_dir / "key2.txt"
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
"user1",
age_keys[0].pubkey,
]
)
# Create old state, which should be cleaned up
sops_dir.mkdir()
sops_key.write_text("OLD STATE")
sops_key2.write_text("OLD STATE2")
cli.run(
[
"secrets",
"machines",
"add",
"--flake",
str(flake.path),
"vm1",
age_keys[1].pubkey,
]
)
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
cli.run(["secrets", "set", "--flake", str(flake.path), "vm1-age.key"])
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(flake.path),
"user1",
age_keys[0].pubkey,
]
)
flake_path = flake.path.joinpath("flake.nix")
cli.run(
[
"secrets",
"machines",
"add",
"--flake",
str(flake.path),
"vm1",
age_keys[1].pubkey,
]
)
cli.run(["facts", "upload", "--flake", str(flake_path), "vm1"])
with monkeypatch.context() as m:
m.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
assert sops_key.exists()
assert sops_key.read_text() == age_keys[0].privkey
assert not sops_key2.exists()
cli.run(["secrets", "set", "--flake", str(flake.path), "vm1-age.key"])
flake_path = flake.path.joinpath("flake.nix")
cli.run(["facts", "upload", "--flake", str(flake_path), "vm1"])
assert sops_key.exists()
assert sops_key.read_text() == age_keys[0].privkey
assert not sops_key2.exists()

View File

@@ -32,27 +32,29 @@ def test_run(
test_flake_with_core: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli.run(
[
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"admins",
"user1",
]
)
cli.run(["vms", "run", "--no-block", "vm1", "-c", "shutdown", "-h", "now"])
with monkeypatch.context():
monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli.run(
[
"secrets",
"users",
"add",
"user1",
age_keys[0].pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"admins",
"user1",
]
)
cli.run(["vms", "run", "--no-block", "vm1", "-c", "shutdown", "-h", "now"])
@pytest.mark.skipif(no_kvm, reason="Requires KVM")