Files
clan-core/pkgs/clan-cli/tests/test_secrets_cli.py
Jörg Thalheim fb70e715cf use pre-generate gpg key for tests
this is a bit faster.
2025-03-25 18:30:11 +01:00

814 lines
21 KiB
Python

import json
import logging
import os
import re
import shutil
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from age_keys import assert_secrets_file_recipients
from clan_cli.errors import ClanError
from fixtures_flakes import FlakeForTest
from gpg_keys import GpgKey
from helpers import cli
from stdout import CaptureOutput
if TYPE_CHECKING:
from age_keys import KeyPair
log = logging.getLogger(__name__)
def _test_identities(
what: str,
test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
sops_folder = test_flake.path / "sops"
what_singular = what[:-1]
test_secret_name = f"{what_singular}_secret"
# fake some admin user that's different from the identity, we are going to
# try to add/remove/update from the clan, this way we can check that keys
# are properly updated on secrets when an identity changes.
admin_age_key = age_keys[2]
cli.run(
[
"secrets",
what,
"add",
"--flake",
str(test_flake.path),
"foo",
age_keys[0].pubkey,
]
)
assert (sops_folder / what / "foo" / "key.json").exists()
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"admin",
admin_age_key.pubkey,
]
)
with pytest.raises(ClanError): # raises "foo already exists"
cli.run(
[
"secrets",
what,
"add",
"--flake",
str(test_flake.path),
"foo",
age_keys[0].pubkey,
]
)
with monkeypatch.context():
monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed")
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
f"--{what_singular}",
"foo",
test_secret_name,
]
)
assert_secrets_file_recipients(
test_flake.path,
test_secret_name,
expected_age_recipients_keypairs=[age_keys[0], admin_age_key],
)
with monkeypatch.context():
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
what,
"add",
"--flake",
str(test_flake.path),
"-f",
"foo",
age_keys[1].privkey,
]
)
assert_secrets_file_recipients(
test_flake.path,
test_secret_name,
expected_age_recipients_keypairs=[age_keys[1], admin_age_key],
)
with capture_output as output:
cli.run(
[
"secrets",
what,
"get",
"--flake",
str(test_flake.path),
"foo",
]
)
assert age_keys[1].pubkey in output.out
with capture_output as output:
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
assert "foo" in output.out
cli.run(["secrets", what, "remove", "--flake", str(test_flake.path), "foo"])
assert not (sops_folder / what / "foo" / "key.json").exists()
with pytest.raises(ClanError): # already removed
cli.run(["secrets", what, "remove", "--flake", str(test_flake.path), "foo"])
with capture_output as output:
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
assert "foo" not in output.out
user_or_machine_symlink = sops_folder / "secrets" / test_secret_name / what / "foo"
err_msg = (
f"Symlink to {what_singular} foo's key in secret "
f"{test_secret_name} was not cleaned up after "
f"{what_singular} foo was removed."
)
assert not user_or_machine_symlink.exists(follow_symlinks=False), err_msg
def test_users(
test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
_test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
def test_machines(
test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
_test_identities("machines", test_flake, capture_output, age_keys, monkeypatch)
def test_groups(
test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None:
with capture_output as output:
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
assert output.out == ""
machine1_age_key = age_keys[0]
user1_age_key = age_keys[1]
admin_age_key = age_keys[2]
with pytest.raises(ClanError): # machine does not exist yet
cli.run(
[
"secrets",
"groups",
"add-machine",
"--flake",
str(test_flake.path),
"group1",
"machine1",
]
)
with pytest.raises(ClanError): # user does not exist yet
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"groupb1",
"user1",
]
)
cli.run(
[
"secrets",
"machines",
"add",
"--flake",
str(test_flake.path),
"machine1",
machine1_age_key.pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-machine",
"--flake",
str(test_flake.path),
"group1",
"machine1",
]
)
# Should this fail?
cli.run(
[
"secrets",
"groups",
"add-machine",
"--flake",
str(test_flake.path),
"group1",
"machine1",
]
)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"user1",
user1_age_key.pubkey,
]
)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"admin",
admin_age_key.pubkey,
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"group1",
"user1",
]
)
with capture_output as output:
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
out = output.out
assert "user1" in out
assert "machine1" in out
secret_name = "foo"
with monkeypatch.context():
monkeypatch.setenv("SOPS_NIX_SECRET", "deafbeef")
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
"--group",
"group1",
secret_name,
]
)
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[
machine1_age_key,
user1_age_key,
admin_age_key,
],
err_msg=(
f"The secret `{secret_name}` owned by group1 was not encrypted "
f"with all members of the group."
),
)
cli.run(
[
"secrets",
"groups",
"remove-user",
"--flake",
str(test_flake.path),
"group1",
"user1",
]
)
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
err_msg=(
f"The secret `{secret_name}` owned by group1 is still encrypted for "
f"`user1` even though this user has been removed from the group."
),
)
# re-add the user to the group
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"group1",
"user1",
]
)
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[
machine1_age_key,
user1_age_key,
admin_age_key,
],
)
# and instead of removing the user from the group, remove the
# user instead, it should also remove it from the group:
cli.run(
[
"secrets",
"users",
"remove",
"--flake",
str(test_flake.path),
"user1",
]
)
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
err_msg=(
f"The secret `{secret_name}` owned by group1 is still encrypted "
f"for `user1` even though this user has been deleted."
),
)
cli.run(
[
"secrets",
"groups",
"remove-machine",
"--flake",
str(test_flake.path),
"group1",
"machine1",
]
)
assert_secrets_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_age_key],
err_msg=(
f"The secret `{secret_name}` owned by group1 is still encrypted for "
f"`machine1` even though this machine has been removed from the group."
),
)
first_group = next((test_flake.path / "sops" / "groups").iterdir(), None)
assert first_group is None
# Check if the symlink to the group was removed from our foo test secret:
group_symlink = test_flake.path / "sops/secrets/foo/groups/group1"
err_msg = (
"Symlink to group1's key in foo secret "
"was not cleaned up after group1 was removed"
)
assert not group_symlink.exists(follow_symlinks=False), err_msg
@contextmanager
def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
old_key = os.environ["SOPS_AGE_KEY_FILE"]
monkeypatch.delenv("SOPS_AGE_KEY_FILE")
monkeypatch.setenv("SOPS_AGE_KEY", key)
try:
yield
finally:
monkeypatch.delenv("SOPS_AGE_KEY")
monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key)
@contextmanager
def use_gpg_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
old_key_file = os.environ.get("SOPS_AGE_KEY_FILE")
old_key = os.environ.get("SOPS_AGE_KEY")
monkeypatch.delenv("SOPS_AGE_KEY_FILE", raising=False)
monkeypatch.delenv("SOPS_AGE_KEY", raising=False)
monkeypatch.setenv("SOPS_PGP_FP", key)
try:
yield
finally:
monkeypatch.delenv("SOPS_PGP_FP")
if old_key_file is not None:
monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key_file)
if old_key is not None:
monkeypatch.setenv("SOPS_AGE_KEY", old_key)
@pytest.fixture
def gpg_key(
temp_dir: Path,
monkeypatch: pytest.MonkeyPatch,
test_root: Path,
) -> str:
gpg_home = temp_dir / "gnupghome"
shutil.copytree(test_root / "data" / "gnupg-home", gpg_home)
monkeypatch.setenv("GNUPGHOME", str(gpg_home))
return "9A9B2741C8062D3D3DF1302D8B049E262A5CA255"
def test_secrets(
test_flake: FlakeForTest,
capture_output: CaptureOutput,
monkeypatch: pytest.MonkeyPatch,
gpg_key: str,
age_keys: list["KeyPair"],
) -> None:
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == ""
# Generate a new key for the clan
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
with capture_output as output:
cli.run(["secrets", "key", "generate", "--flake", str(test_flake.path)])
assert "age private key" in output.out
# Read the key that was generated
with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = json.loads(output.out)["publickey"]
assert key.startswith("age1")
# Add testuser with the key that was generated for the clan
cli.run(
["secrets", "users", "add", "--flake", str(test_flake.path), "testuser", key]
)
with pytest.raises(ClanError): # does not exist yet
cli.run(["secrets", "get", "--flake", str(test_flake.path), "nonexisting"])
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
cli.run(["secrets", "set", "--flake", str(test_flake.path), "initialkey"])
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "initialkey"])
assert output.out == "foo"
with capture_output as output:
cli.run(["secrets", "users", "list", "--flake", str(test_flake.path)])
users = output.out.rstrip().split("\n")
assert len(users) == 1, f"users: {users}"
owner = users[0]
monkeypatch.setenv("EDITOR", "cat")
cli.run(["secrets", "set", "--edit", "--flake", str(test_flake.path), "initialkey"])
monkeypatch.delenv("EDITOR")
cli.run(["secrets", "rename", "--flake", str(test_flake.path), "initialkey", "key"])
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == "key\n"
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path), "nonexisting"])
assert output.out == ""
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path), "key"])
assert output.out == "key\n"
# using the `age_keys` KeyPair, add a machine and rotate its key
cli.run(
[
"secrets",
"machines",
"add",
"--flake",
str(test_flake.path),
"machine1",
age_keys[1].pubkey,
]
)
cli.run(
[
"secrets",
"machines",
"add-secret",
"--flake",
str(test_flake.path),
"machine1",
"key",
]
)
with capture_output as output:
cli.run(["secrets", "machines", "list", "--flake", str(test_flake.path)])
assert output.out == "machine1\n"
with use_age_key(age_keys[1].privkey, monkeypatch):
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
# rotate machines key
cli.run(
[
"secrets",
"machines",
"add",
"--flake",
str(test_flake.path),
"-f",
"machine1",
age_keys[0].privkey,
]
)
# should also rotate the encrypted secret
with use_age_key(age_keys[0].privkey, monkeypatch):
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
cli.run(
[
"secrets",
"machines",
"remove-secret",
"--flake",
str(test_flake.path),
"machine1",
"key",
]
)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"user1",
age_keys[1].pubkey,
]
)
cli.run(
[
"secrets",
"users",
"add-secret",
"--flake",
str(test_flake.path),
"user1",
"key",
]
)
with capture_output as output, use_age_key(age_keys[1].privkey, monkeypatch):
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
cli.run(
[
"secrets",
"users",
"remove-secret",
"--flake",
str(test_flake.path),
"user1",
"key",
]
)
with pytest.raises(ClanError): # does not exist yet
cli.run(
[
"secrets",
"groups",
"add-secret",
"--flake",
str(test_flake.path),
"admin-group",
"key",
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"admin-group",
"user1",
]
)
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"admin-group",
owner,
]
)
cli.run(
[
"secrets",
"groups",
"add-secret",
"--flake",
str(test_flake.path),
"admin-group",
"key",
]
)
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
"--group",
"admin-group",
"key2",
]
)
with use_age_key(age_keys[1].privkey, monkeypatch):
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
# Add an user with a GPG key
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"--pgp-key",
gpg_key,
"user2",
]
)
# Extend group will update secrets
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"admin-group",
"user2",
]
)
with use_gpg_key(gpg_key, monkeypatch): # user2
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
assert output.out == "foo"
cli.run(
[
"secrets",
"groups",
"remove-user",
"--flake",
str(test_flake.path),
"admin-group",
"user2",
]
)
with (
pytest.raises(ClanError),
use_gpg_key(gpg_key, monkeypatch),
capture_output as output,
):
# user2 is not in the group anymore
cli.run(["secrets", "get", "--flake", str(test_flake.path), "key"])
print(output.out)
cli.run(
[
"secrets",
"groups",
"remove-secret",
"--flake",
str(test_flake.path),
"admin-group",
"key",
]
)
cli.run(["secrets", "remove", "--flake", str(test_flake.path), "key"])
cli.run(["secrets", "remove", "--flake", str(test_flake.path), "key2"])
with capture_output as output:
cli.run(["secrets", "list", "--flake", str(test_flake.path)])
assert output.out == ""
def test_secrets_key_generate_gpg(
test_flake: FlakeForTest,
capture_output: CaptureOutput,
monkeypatch: pytest.MonkeyPatch,
gpg_key: str,
) -> None:
with use_gpg_key(gpg_key, monkeypatch):
# Make sure clan secrets key generate recognizes
# the PGP key and does nothing:
with capture_output as output:
cli.run(
[
"secrets",
"key",
"generate",
"--flake",
str(test_flake.path),
]
)
assert "age private key" not in output.out
assert re.match(r"PGP key.+is already set", output.out) is not None
with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake.path)])
key = json.loads(output.out)
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key
# Add testuser with the key that was (not) generated for the clan:
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"--pgp-key",
gpg_key,
"testuser",
]
)
with capture_output as output:
cli.run(
[
"secrets",
"users",
"get",
"--flake",
str(test_flake.path),
"testuser",
]
)
key = json.loads(output.out)
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key
monkeypatch.setenv("SOPS_NIX_SECRET", "secret-value")
cli.run(["secrets", "set", "--flake", str(test_flake.path), "secret-name"])
with capture_output as output:
cli.run(["secrets", "get", "--flake", str(test_flake.path), "secret-name"])
assert output.out == "secret-value"