From 36a54ead12f189ca160e5c9e5b671c890dd1de77 Mon Sep 17 00:00:00 2001 From: Louis Opter Date: Sat, 1 Feb 2025 22:41:05 +0000 Subject: [PATCH] =?UTF-8?q?clan-cli:=20improve=20tests=20on=20`clan=20secr?= =?UTF-8?q?ets=20=E2=80=A6`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When users or groups are updated : - Check that keys are properly updated on sops secrets; - Check that no dangling symlinks are left behind in sops secrets. And when an user is removed from the clan, check that it is removed from the groups it belonged to. This doesn't check this works for vars explicitly, since they share the same logic, see `secret_paths.extend(list_vars_secrets(flake_dir))` in commit f2856cb773f11b3d32f246b9b07d79419575ab3f. Those improvements allow us to validate that #2659 is indeed fixed, and tell us that we need to make the same kind of fixes for machines and groups. For groups this is straightforward, and for machines, when one is deleted, I wanna discuss first whether we want to delete all its secrets as well. --- pkgs/clan-cli/tests/test_secrets_cli.py | 248 ++++++++++++++++++++++-- 1 file changed, 229 insertions(+), 19 deletions(-) diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index 85003a541..939fb541a 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING import pytest from clan_cli.errors import ClanError +from clan_cli.secrets.folders import sops_secrets_folder from fixtures_flakes import FlakeForTest from helpers import cli from stdout import CaptureOutput @@ -26,9 +27,18 @@ def _test_identities( test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: sops_folder = test_flake.path / "sops" + what_singular = what[:-1] + test_secret_name = f"{what_singular}_secret" + + # fake some admin user that's different from the identity, we are going to + # try to add/remove/update from the clan, this way we can check that keys + # are properly updated on secrets when an identity changes. + admin_age_key = age_keys[2] + cli.run( [ "secrets", @@ -42,6 +52,18 @@ def _test_identities( ) assert (sops_folder / what / "foo" / "key.json").exists() + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + "admin", + admin_age_key.pubkey, + ] + ) + with pytest.raises(ClanError): # raises "foo already exists" cli.run( [ @@ -55,18 +77,44 @@ def _test_identities( ] ) - # rotate the key - cli.run( - [ - "secrets", - what, - "add", - "--flake", - str(test_flake.path), - "-f", - "foo", - age_keys[1].privkey, - ] + with monkeypatch.context(): + monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed") + monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey) + cli.run( + [ + "secrets", + "set", + "--flake", + str(test_flake.path), + f"--{what_singular}", + "foo", + test_secret_name, + ] + ) + assert_sops_file_recipients( + test_flake.path, + test_secret_name, + expected_age_recipients_keypairs=[age_keys[0], admin_age_key], + ) + + with monkeypatch.context(): + monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey) + cli.run( + [ + "secrets", + what, + "add", + "--flake", + str(test_flake.path), + "-f", + "foo", + age_keys[1].privkey, + ] + ) + assert_sops_file_recipients( + test_flake.path, + test_secret_name, + expected_age_recipients_keypairs=[age_keys[1], admin_age_key], ) with capture_output as output: @@ -96,26 +144,52 @@ def _test_identities( cli.run(["secrets", what, "list", "--flake", str(test_flake.path)]) assert "foo" not in output.out + if what == "machines": + # lopter@(2025-02-02): Let's address #2659 first and then figure out + # what we wanna do about secrets when a machine is deleted. + return + + user_symlink = sops_folder / "secrets" / test_secret_name / what / "foo" + err_msg = ( + f"Symlink to {what_singular} foo's key in secret " + f"{test_secret_name} was not cleaned up after " + f"{what_singular} foo was removed." + ) + assert not user_symlink.exists(follow_symlinks=False), err_msg + def test_users( - test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] + test_flake: FlakeForTest, + capture_output: CaptureOutput, + age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: - _test_identities("users", test_flake, capture_output, age_keys) + _test_identities("users", test_flake, capture_output, age_keys, monkeypatch) def test_machines( - test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] + test_flake: FlakeForTest, + capture_output: CaptureOutput, + age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: - _test_identities("machines", test_flake, capture_output, age_keys) + _test_identities("machines", test_flake, capture_output, age_keys, monkeypatch) def test_groups( - test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] + test_flake: FlakeForTest, + capture_output: CaptureOutput, + age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: with capture_output as output: cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)]) assert output.out == "" + machine1_age_key = age_keys[0] + user1_age_key = age_keys[1] + admin_age_key = age_keys[2] + with pytest.raises(ClanError): # machine does not exist yet cli.run( [ @@ -148,7 +222,7 @@ def test_groups( "--flake", str(test_flake.path), "machine1", - age_keys[0].pubkey, + machine1_age_key.pubkey, ] ) cli.run( @@ -184,7 +258,18 @@ def test_groups( "--flake", str(test_flake.path), "user1", - age_keys[0].pubkey, + user1_age_key.pubkey, + ] + ) + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + "admin", + admin_age_key.pubkey, ] ) cli.run( @@ -205,6 +290,37 @@ def test_groups( assert "user1" in out assert "machine1" in out + secret_name = "foo" + + with monkeypatch.context(): + monkeypatch.setenv("SOPS_NIX_SECRET", "deafbeef") + monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey) + cli.run( + [ + "secrets", + "set", + "--flake", + str(test_flake.path), + "--group", + "group1", + secret_name, + ] + ) + + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[ + machine1_age_key, + user1_age_key, + admin_age_key, + ], + err_msg=( + f"The secret `{secret_name}` owned by group1 was not encrypted " + f"with all members of the group." + ), + ) + cli.run( [ "secrets", @@ -216,6 +332,59 @@ def test_groups( "user1", ] ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[machine1_age_key, admin_age_key], + err_msg=( + f"The secret `{secret_name}` owned by group1 is still encrypted for " + f"`user1` even though this user has been removed from the group." + ), + ) + + # re-add the user to the group + cli.run( + [ + "secrets", + "groups", + "add-user", + "--flake", + str(test_flake.path), + "group1", + "user1", + ] + ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[ + machine1_age_key, + user1_age_key, + admin_age_key, + ], + ) + # and instead of removing the user from the group, remove the + # user instead, it should also remove it from the group: + cli.run( + [ + "secrets", + "users", + "remove", + "--flake", + str(test_flake.path), + "user1", + ] + ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[machine1_age_key, admin_age_key], + err_msg=( + f"The secret `{secret_name}` owned by group1 is still encrypted " + f"for `user1` even though this user has been deleted." + ), + ) + cli.run( [ "secrets", @@ -227,9 +396,50 @@ def test_groups( "machine1", ] ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[admin_age_key], + err_msg=( + f"The secret `{secret_name}` owned by group1 is still encrypted for " + f"`machine1` even though this machine has been removed from the group." + ), + ) + groups = os.listdir(test_flake.path / "sops" / "groups") assert len(groups) == 0 + # Check if the symlink to the group was removed our foo test secret: + group_symlink = test_flake.path / "sops/secrets/foo/groups/group1" + err_msg = ( + "Symlink to group1's key in foo secret " + "was not cleaned up after group1 was removed" + ) + assert not group_symlink.exists(follow_symlinks=False), err_msg + + +def assert_sops_file_recipients( + flake_path: Path, + secret_name: str, + expected_age_recipients_keypairs: list["KeyPair"], + err_msg: str | None = None, +) -> None: + """Checks that the recipients of a SOPS file matches expectations. + + :param err_msg: in case of failure, if you gave an error message then it + will be displayed, otherwise pytest will display the two different sets + of recipients. + """ + sops_file = sops_secrets_folder(flake_path) / secret_name / "secret" + with sops_file.open("rb") as fp: + sops_data = json.load(fp) + age_recipients = {each["recipient"] for each in sops_data["sops"]["age"]} + expected_age_recipients = {pair.pubkey for pair in expected_age_recipients_keypairs} + if not err_msg: + assert age_recipients == expected_age_recipients + return + assert age_recipients == expected_age_recipients, err_msg + @contextmanager def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: