clan-cli: improve tests on clan secrets …

When users or groups are updated :

- Check that keys are properly updated on sops secrets;
- Check that no dangling symlinks are left behind in sops secrets.

And when an user is removed from the clan, check that it is removed from
the groups it belonged to.

This doesn't check this works for vars explicitly, since they share the
same logic, see `secret_paths.extend(list_vars_secrets(flake_dir))` in
commit 7466445653.

Those improvements allow us to validate that #2659 is indeed fixed, and
tell us that we need to make the same kind of fixes for machines and
groups. For groups this is straightforward, and for machines, when one
is deleted, I wanna discuss first whether we want to delete all its
secrets as well.
This commit is contained in:
Louis Opter
2025-02-01 22:41:05 +00:00
committed by Mic92
parent 947095ad13
commit c99296aae8

View File

@@ -11,6 +11,7 @@ from typing import TYPE_CHECKING
import pytest import pytest
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.secrets.folders import sops_secrets_folder
from fixtures_flakes import FlakeForTest from fixtures_flakes import FlakeForTest
from helpers import cli from helpers import cli
from stdout import CaptureOutput from stdout import CaptureOutput
@@ -26,9 +27,18 @@ def _test_identities(
test_flake: FlakeForTest, test_flake: FlakeForTest,
capture_output: CaptureOutput, capture_output: CaptureOutput,
age_keys: list["KeyPair"], age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
sops_folder = test_flake.path / "sops" sops_folder = test_flake.path / "sops"
what_singular = what[:-1]
test_secret_name = f"{what_singular}_secret"
# fake some admin user that's different from the identity, we are going to
# try to add/remove/update from the clan, this way we can check that keys
# are properly updated on secrets when an identity changes.
admin_age_key = age_keys[2]
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -42,6 +52,18 @@ def _test_identities(
) )
assert (sops_folder / what / "foo" / "key.json").exists() assert (sops_folder / what / "foo" / "key.json").exists()
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"admin",
admin_age_key.pubkey,
]
)
with pytest.raises(ClanError): # raises "foo already exists" with pytest.raises(ClanError): # raises "foo already exists"
cli.run( cli.run(
[ [
@@ -55,18 +77,44 @@ def _test_identities(
] ]
) )
# rotate the key with monkeypatch.context():
cli.run( monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed")
[ monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
"secrets", cli.run(
what, [
"add", "secrets",
"--flake", "set",
str(test_flake.path), "--flake",
"-f", str(test_flake.path),
"foo", f"--{what_singular}",
age_keys[1].privkey, "foo",
] test_secret_name,
]
)
assert_sops_file_recipients(
test_flake.path,
test_secret_name,
expected_age_recipients_keypairs=[age_keys[0], admin_age_key],
)
with monkeypatch.context():
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
what,
"add",
"--flake",
str(test_flake.path),
"-f",
"foo",
age_keys[1].privkey,
]
)
assert_sops_file_recipients(
test_flake.path,
test_secret_name,
expected_age_recipients_keypairs=[age_keys[1], admin_age_key],
) )
with capture_output as output: with capture_output as output:
@@ -96,26 +144,52 @@ def _test_identities(
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)]) cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
assert "foo" not in output.out assert "foo" not in output.out
if what == "machines":
# lopter@(2025-02-02): Let's address #2659 first and then figure out
# what we wanna do about secrets when a machine is deleted.
return
user_symlink = sops_folder / "secrets" / test_secret_name / what / "foo"
err_msg = (
f"Symlink to {what_singular} foo's key in secret "
f"{test_secret_name} was not cleaned up after "
f"{what_singular} foo was removed."
)
assert not user_symlink.exists(follow_symlinks=False), err_msg
def test_users( def test_users(
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
_test_identities("users", test_flake, capture_output, age_keys) _test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
def test_machines( def test_machines(
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
_test_identities("machines", test_flake, capture_output, age_keys) _test_identities("machines", test_flake, capture_output, age_keys, monkeypatch)
def test_groups( def test_groups(
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] test_flake: FlakeForTest,
capture_output: CaptureOutput,
age_keys: list["KeyPair"],
monkeypatch: pytest.MonkeyPatch,
) -> None: ) -> None:
with capture_output as output: with capture_output as output:
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)]) cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
assert output.out == "" assert output.out == ""
machine1_age_key = age_keys[0]
user1_age_key = age_keys[1]
admin_age_key = age_keys[2]
with pytest.raises(ClanError): # machine does not exist yet with pytest.raises(ClanError): # machine does not exist yet
cli.run( cli.run(
[ [
@@ -148,7 +222,7 @@ def test_groups(
"--flake", "--flake",
str(test_flake.path), str(test_flake.path),
"machine1", "machine1",
age_keys[0].pubkey, machine1_age_key.pubkey,
] ]
) )
cli.run( cli.run(
@@ -184,7 +258,18 @@ def test_groups(
"--flake", "--flake",
str(test_flake.path), str(test_flake.path),
"user1", "user1",
age_keys[0].pubkey, user1_age_key.pubkey,
]
)
cli.run(
[
"secrets",
"users",
"add",
"--flake",
str(test_flake.path),
"admin",
admin_age_key.pubkey,
] ]
) )
cli.run( cli.run(
@@ -205,6 +290,37 @@ def test_groups(
assert "user1" in out assert "user1" in out
assert "machine1" in out assert "machine1" in out
secret_name = "foo"
with monkeypatch.context():
monkeypatch.setenv("SOPS_NIX_SECRET", "deafbeef")
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
cli.run(
[
"secrets",
"set",
"--flake",
str(test_flake.path),
"--group",
"group1",
secret_name,
]
)
assert_sops_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[
machine1_age_key,
user1_age_key,
admin_age_key,
],
err_msg=(
f"The secret `{secret_name}` owned by group1 was not encrypted "
f"with all members of the group."
),
)
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -216,6 +332,59 @@ def test_groups(
"user1", "user1",
] ]
) )
assert_sops_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
err_msg=(
f"The secret `{secret_name}` owned by group1 is still encrypted for "
f"`user1` even though this user has been removed from the group."
),
)
# re-add the user to the group
cli.run(
[
"secrets",
"groups",
"add-user",
"--flake",
str(test_flake.path),
"group1",
"user1",
]
)
assert_sops_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[
machine1_age_key,
user1_age_key,
admin_age_key,
],
)
# and instead of removing the user from the group, remove the
# user instead, it should also remove it from the group:
cli.run(
[
"secrets",
"users",
"remove",
"--flake",
str(test_flake.path),
"user1",
]
)
assert_sops_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
err_msg=(
f"The secret `{secret_name}` owned by group1 is still encrypted "
f"for `user1` even though this user has been deleted."
),
)
cli.run( cli.run(
[ [
"secrets", "secrets",
@@ -227,9 +396,50 @@ def test_groups(
"machine1", "machine1",
] ]
) )
assert_sops_file_recipients(
test_flake.path,
secret_name,
expected_age_recipients_keypairs=[admin_age_key],
err_msg=(
f"The secret `{secret_name}` owned by group1 is still encrypted for "
f"`machine1` even though this machine has been removed from the group."
),
)
groups = os.listdir(test_flake.path / "sops" / "groups") groups = os.listdir(test_flake.path / "sops" / "groups")
assert len(groups) == 0 assert len(groups) == 0
# Check if the symlink to the group was removed our foo test secret:
group_symlink = test_flake.path / "sops/secrets/foo/groups/group1"
err_msg = (
"Symlink to group1's key in foo secret "
"was not cleaned up after group1 was removed"
)
assert not group_symlink.exists(follow_symlinks=False), err_msg
def assert_sops_file_recipients(
flake_path: Path,
secret_name: str,
expected_age_recipients_keypairs: list["KeyPair"],
err_msg: str | None = None,
) -> None:
"""Checks that the recipients of a SOPS file matches expectations.
:param err_msg: in case of failure, if you gave an error message then it
will be displayed, otherwise pytest will display the two different sets
of recipients.
"""
sops_file = sops_secrets_folder(flake_path) / secret_name / "secret"
with sops_file.open("rb") as fp:
sops_data = json.load(fp)
age_recipients = {each["recipient"] for each in sops_data["sops"]["age"]}
expected_age_recipients = {pair.pubkey for pair in expected_age_recipients_keypairs}
if not err_msg:
assert age_recipients == expected_age_recipients
return
assert age_recipients == expected_age_recipients, err_msg
@contextmanager @contextmanager
def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: