diff --git a/pkgs/clan-cli/tests/test_secrets_cli.py b/pkgs/clan-cli/tests/test_secrets_cli.py index 85003a541..939fb541a 100644 --- a/pkgs/clan-cli/tests/test_secrets_cli.py +++ b/pkgs/clan-cli/tests/test_secrets_cli.py @@ -11,6 +11,7 @@ from typing import TYPE_CHECKING import pytest from clan_cli.errors import ClanError +from clan_cli.secrets.folders import sops_secrets_folder from fixtures_flakes import FlakeForTest from helpers import cli from stdout import CaptureOutput @@ -26,9 +27,18 @@ def _test_identities( test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: sops_folder = test_flake.path / "sops" + what_singular = what[:-1] + test_secret_name = f"{what_singular}_secret" + + # fake some admin user that's different from the identity, we are going to + # try to add/remove/update from the clan, this way we can check that keys + # are properly updated on secrets when an identity changes. + admin_age_key = age_keys[2] + cli.run( [ "secrets", @@ -42,6 +52,18 @@ def _test_identities( ) assert (sops_folder / what / "foo" / "key.json").exists() + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + "admin", + admin_age_key.pubkey, + ] + ) + with pytest.raises(ClanError): # raises "foo already exists" cli.run( [ @@ -55,18 +77,44 @@ def _test_identities( ] ) - # rotate the key - cli.run( - [ - "secrets", - what, - "add", - "--flake", - str(test_flake.path), - "-f", - "foo", - age_keys[1].privkey, - ] + with monkeypatch.context(): + monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed") + monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey) + cli.run( + [ + "secrets", + "set", + "--flake", + str(test_flake.path), + f"--{what_singular}", + "foo", + test_secret_name, + ] + ) + assert_sops_file_recipients( + test_flake.path, + test_secret_name, + expected_age_recipients_keypairs=[age_keys[0], admin_age_key], + ) + + with monkeypatch.context(): + monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey) + cli.run( + [ + "secrets", + what, + "add", + "--flake", + str(test_flake.path), + "-f", + "foo", + age_keys[1].privkey, + ] + ) + assert_sops_file_recipients( + test_flake.path, + test_secret_name, + expected_age_recipients_keypairs=[age_keys[1], admin_age_key], ) with capture_output as output: @@ -96,26 +144,52 @@ def _test_identities( cli.run(["secrets", what, "list", "--flake", str(test_flake.path)]) assert "foo" not in output.out + if what == "machines": + # lopter@(2025-02-02): Let's address #2659 first and then figure out + # what we wanna do about secrets when a machine is deleted. + return + + user_symlink = sops_folder / "secrets" / test_secret_name / what / "foo" + err_msg = ( + f"Symlink to {what_singular} foo's key in secret " + f"{test_secret_name} was not cleaned up after " + f"{what_singular} foo was removed." + ) + assert not user_symlink.exists(follow_symlinks=False), err_msg + def test_users( - test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] + test_flake: FlakeForTest, + capture_output: CaptureOutput, + age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: - _test_identities("users", test_flake, capture_output, age_keys) + _test_identities("users", test_flake, capture_output, age_keys, monkeypatch) def test_machines( - test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] + test_flake: FlakeForTest, + capture_output: CaptureOutput, + age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: - _test_identities("machines", test_flake, capture_output, age_keys) + _test_identities("machines", test_flake, capture_output, age_keys, monkeypatch) def test_groups( - test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"] + test_flake: FlakeForTest, + capture_output: CaptureOutput, + age_keys: list["KeyPair"], + monkeypatch: pytest.MonkeyPatch, ) -> None: with capture_output as output: cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)]) assert output.out == "" + machine1_age_key = age_keys[0] + user1_age_key = age_keys[1] + admin_age_key = age_keys[2] + with pytest.raises(ClanError): # machine does not exist yet cli.run( [ @@ -148,7 +222,7 @@ def test_groups( "--flake", str(test_flake.path), "machine1", - age_keys[0].pubkey, + machine1_age_key.pubkey, ] ) cli.run( @@ -184,7 +258,18 @@ def test_groups( "--flake", str(test_flake.path), "user1", - age_keys[0].pubkey, + user1_age_key.pubkey, + ] + ) + cli.run( + [ + "secrets", + "users", + "add", + "--flake", + str(test_flake.path), + "admin", + admin_age_key.pubkey, ] ) cli.run( @@ -205,6 +290,37 @@ def test_groups( assert "user1" in out assert "machine1" in out + secret_name = "foo" + + with monkeypatch.context(): + monkeypatch.setenv("SOPS_NIX_SECRET", "deafbeef") + monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey) + cli.run( + [ + "secrets", + "set", + "--flake", + str(test_flake.path), + "--group", + "group1", + secret_name, + ] + ) + + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[ + machine1_age_key, + user1_age_key, + admin_age_key, + ], + err_msg=( + f"The secret `{secret_name}` owned by group1 was not encrypted " + f"with all members of the group." + ), + ) + cli.run( [ "secrets", @@ -216,6 +332,59 @@ def test_groups( "user1", ] ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[machine1_age_key, admin_age_key], + err_msg=( + f"The secret `{secret_name}` owned by group1 is still encrypted for " + f"`user1` even though this user has been removed from the group." + ), + ) + + # re-add the user to the group + cli.run( + [ + "secrets", + "groups", + "add-user", + "--flake", + str(test_flake.path), + "group1", + "user1", + ] + ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[ + machine1_age_key, + user1_age_key, + admin_age_key, + ], + ) + # and instead of removing the user from the group, remove the + # user instead, it should also remove it from the group: + cli.run( + [ + "secrets", + "users", + "remove", + "--flake", + str(test_flake.path), + "user1", + ] + ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[machine1_age_key, admin_age_key], + err_msg=( + f"The secret `{secret_name}` owned by group1 is still encrypted " + f"for `user1` even though this user has been deleted." + ), + ) + cli.run( [ "secrets", @@ -227,9 +396,50 @@ def test_groups( "machine1", ] ) + assert_sops_file_recipients( + test_flake.path, + secret_name, + expected_age_recipients_keypairs=[admin_age_key], + err_msg=( + f"The secret `{secret_name}` owned by group1 is still encrypted for " + f"`machine1` even though this machine has been removed from the group." + ), + ) + groups = os.listdir(test_flake.path / "sops" / "groups") assert len(groups) == 0 + # Check if the symlink to the group was removed our foo test secret: + group_symlink = test_flake.path / "sops/secrets/foo/groups/group1" + err_msg = ( + "Symlink to group1's key in foo secret " + "was not cleaned up after group1 was removed" + ) + assert not group_symlink.exists(follow_symlinks=False), err_msg + + +def assert_sops_file_recipients( + flake_path: Path, + secret_name: str, + expected_age_recipients_keypairs: list["KeyPair"], + err_msg: str | None = None, +) -> None: + """Checks that the recipients of a SOPS file matches expectations. + + :param err_msg: in case of failure, if you gave an error message then it + will be displayed, otherwise pytest will display the two different sets + of recipients. + """ + sops_file = sops_secrets_folder(flake_path) / secret_name / "secret" + with sops_file.open("rb") as fp: + sops_data = json.load(fp) + age_recipients = {each["recipient"] for each in sops_data["sops"]["age"]} + expected_age_recipients = {pair.pubkey for pair in expected_age_recipients_keypairs} + if not err_msg: + assert age_recipients == expected_age_recipients + return + assert age_recipients == expected_age_recipients, err_msg + @contextmanager def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]: