Merge pull request 'clan-cli: fix keys and symlinks updates when users, machines, or groups are updated' (#2781) from lopter/clan-core:lo-fix-secrets-user-remove into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2781
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import argparse
|
||||
import os
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.completions import (
|
||||
@@ -103,13 +104,19 @@ def update_group_keys(flake_dir: Path, group: str) -> list[Path]:
|
||||
|
||||
|
||||
def add_member(
|
||||
flake_dir: Path, group_folder: Path, source_folder: Path, name: str
|
||||
flake_dir: Path,
|
||||
group_name: str,
|
||||
get_group_folder: Callable[[Path, str], Path],
|
||||
get_source_folder: Callable[[Path], Path],
|
||||
name: str,
|
||||
) -> list[Path]:
|
||||
source_folder = get_source_folder(flake_dir)
|
||||
source = source_folder / name
|
||||
if not source.exists():
|
||||
msg = f"{name} does not exist in {source_folder}: "
|
||||
msg += list_directory(source_folder)
|
||||
raise ClanError(msg)
|
||||
group_folder = get_group_folder(flake_dir, group_name)
|
||||
group_folder.mkdir(parents=True, exist_ok=True)
|
||||
user_target = group_folder / name
|
||||
if user_target.exists():
|
||||
@@ -118,19 +125,26 @@ def add_member(
|
||||
raise ClanError(msg)
|
||||
user_target.unlink()
|
||||
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
||||
return update_group_keys(flake_dir, group_folder.parent.name)
|
||||
changed_files = [user_target]
|
||||
group_name = group_folder.parent.name
|
||||
changed_files.extend(update_group_keys(flake_dir, group_name))
|
||||
return changed_files
|
||||
|
||||
|
||||
def remove_member(flake_dir: Path, group_folder: Path, name: str) -> None:
|
||||
def remove_member(
|
||||
flake_dir: Path,
|
||||
group_name: str,
|
||||
get_group_folder: Callable[[Path, str], Path],
|
||||
name: str,
|
||||
) -> list[Path]:
|
||||
group_folder = get_group_folder(flake_dir, group_name)
|
||||
target = group_folder / name
|
||||
if not target.exists():
|
||||
msg = f"{name} does not exist in group in {group_folder}: "
|
||||
msg += list_directory(group_folder)
|
||||
raise ClanError(msg)
|
||||
target.unlink()
|
||||
|
||||
if len(os.listdir(group_folder)) > 0:
|
||||
update_group_keys(flake_dir, group_folder.parent.name)
|
||||
updated_files = [target]
|
||||
|
||||
if len(os.listdir(group_folder)) == 0:
|
||||
group_folder.rmdir()
|
||||
@@ -138,10 +152,18 @@ def remove_member(flake_dir: Path, group_folder: Path, name: str) -> None:
|
||||
if len(os.listdir(group_folder.parent)) == 0:
|
||||
group_folder.parent.rmdir()
|
||||
|
||||
updated_files.extend(update_group_keys(flake_dir, group_name))
|
||||
|
||||
return updated_files
|
||||
|
||||
|
||||
def add_user(flake_dir: Path, group: str, name: str) -> None:
|
||||
updated_files = add_member(
|
||||
flake_dir, users_folder(flake_dir, group), sops_users_folder(flake_dir), name
|
||||
flake_dir,
|
||||
group,
|
||||
users_folder,
|
||||
sops_users_folder,
|
||||
name,
|
||||
)
|
||||
commit_files(
|
||||
updated_files,
|
||||
@@ -155,7 +177,17 @@ def add_user_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def remove_user(flake_dir: Path, group: str, name: str) -> None:
|
||||
remove_member(flake_dir, users_folder(flake_dir, group), name)
|
||||
updated_files = remove_member(
|
||||
flake_dir,
|
||||
group,
|
||||
users_folder,
|
||||
name,
|
||||
)
|
||||
commit_files(
|
||||
updated_files,
|
||||
flake_dir,
|
||||
f"Remove user {name} from group {group}",
|
||||
)
|
||||
|
||||
|
||||
def remove_user_command(args: argparse.Namespace) -> None:
|
||||
@@ -165,8 +197,9 @@ def remove_user_command(args: argparse.Namespace) -> None:
|
||||
def add_machine(flake_dir: Path, group: str, name: str) -> None:
|
||||
updated_files = add_member(
|
||||
flake_dir,
|
||||
machines_folder(flake_dir, group),
|
||||
sops_machines_folder(flake_dir),
|
||||
group,
|
||||
machines_folder,
|
||||
sops_machines_folder,
|
||||
name,
|
||||
)
|
||||
commit_files(
|
||||
@@ -181,7 +214,17 @@ def add_machine_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def remove_machine(flake_dir: Path, group: str, name: str) -> None:
|
||||
remove_member(flake_dir, machines_folder(flake_dir, group), name)
|
||||
updated_files = remove_member(
|
||||
flake_dir,
|
||||
group,
|
||||
machines_folder,
|
||||
name,
|
||||
)
|
||||
commit_files(
|
||||
updated_files,
|
||||
flake_dir,
|
||||
f"Remove machine {name} from group {group}",
|
||||
)
|
||||
|
||||
|
||||
def remove_machine_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -33,9 +33,9 @@ def generate_key() -> sops.SopsKey:
|
||||
|
||||
def generate_command(args: argparse.Namespace) -> None:
|
||||
key = generate_key()
|
||||
print("Also add your age public key to the repository with:")
|
||||
key_type = key.key_type.name.lower()
|
||||
print(f"clan secrets users add <username> --{key_type}-key <key>")
|
||||
print(f"Add your {key_type} public key to the repository with:")
|
||||
print(f"clan secrets users add <username> --{key_type}-key {key.pubkey}")
|
||||
|
||||
|
||||
def show_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -82,6 +82,11 @@ def update_secrets(
|
||||
for path in secret_paths:
|
||||
if not filter_secrets(path):
|
||||
continue
|
||||
# clean-up non-existent users, groups, and machines
|
||||
# from the secret before we update it:
|
||||
changed_files.extend(cleanup_dangling_symlinks(path / "users"))
|
||||
changed_files.extend(cleanup_dangling_symlinks(path / "groups"))
|
||||
changed_files.extend(cleanup_dangling_symlinks(path / "machines"))
|
||||
changed_files.extend(
|
||||
update_keys(
|
||||
path,
|
||||
@@ -91,6 +96,17 @@ def update_secrets(
|
||||
return changed_files
|
||||
|
||||
|
||||
def cleanup_dangling_symlinks(folder: Path) -> list[Path]:
|
||||
if not folder.exists():
|
||||
return []
|
||||
removed: list[Path] = []
|
||||
for link in folder.iterdir():
|
||||
if link.is_symlink() and not link.exists():
|
||||
link.unlink()
|
||||
removed.append(folder / link)
|
||||
return removed
|
||||
|
||||
|
||||
def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
if not folder.exists():
|
||||
return set()
|
||||
@@ -99,7 +115,7 @@ def collect_keys_for_type(folder: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
if not p.is_symlink():
|
||||
continue
|
||||
try:
|
||||
target = p.resolve()
|
||||
target = p.resolve(strict=True)
|
||||
except FileNotFoundError:
|
||||
log.warning(f"Ignoring broken symlink {p}")
|
||||
continue
|
||||
|
||||
@@ -337,7 +337,7 @@ def maybe_get_admin_public_key() -> None | SopsKey:
|
||||
if len(keyring) > 1:
|
||||
last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]]
|
||||
msg = (
|
||||
f"Found more than {len(keyring)} public keys in your "
|
||||
f"Found {len(keyring)} public keys in your "
|
||||
f"environment/system and cannot decide which one to "
|
||||
f"use, first {len(last_3)}:\n\n"
|
||||
f"- {'\n- '.join(last_3)}\n\n"
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
@@ -7,14 +9,14 @@ from clan_cli.completions import add_dynamic_completer, complete_secrets, comple
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_files
|
||||
|
||||
from . import secrets, sops
|
||||
from . import groups, secrets, sops
|
||||
from .folders import (
|
||||
list_objects,
|
||||
remove_object,
|
||||
sops_groups_folder,
|
||||
sops_secrets_folder,
|
||||
sops_users_folder,
|
||||
)
|
||||
from .groups import get_groups
|
||||
from .secrets import update_secrets
|
||||
from .sops import read_key, write_key
|
||||
from .types import (
|
||||
@@ -24,6 +26,8 @@ from .types import (
|
||||
user_name_type,
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def add_user(
|
||||
flake_dir: Path,
|
||||
@@ -34,12 +38,12 @@ def add_user(
|
||||
) -> None:
|
||||
path = sops_users_folder(flake_dir) / name
|
||||
|
||||
groups = get_groups(flake_dir, "users", name)
|
||||
groupnames = [p.name for p in groups.get_groups(flake_dir, "users", name)]
|
||||
|
||||
def filter_user_secrets(secret: Path) -> bool:
|
||||
if secret.joinpath("users", name).exists():
|
||||
return True
|
||||
return any(secret.joinpath("groups", group.name).exists() for group in groups)
|
||||
return any(secret.joinpath("groups", name).exists() for name in groupnames)
|
||||
|
||||
write_key(path, key, key_type, overwrite=force)
|
||||
paths = [path]
|
||||
@@ -53,12 +57,26 @@ def add_user(
|
||||
|
||||
|
||||
def remove_user(flake_dir: Path, name: str) -> None:
|
||||
removed_paths = remove_object(sops_users_folder(flake_dir), name)
|
||||
commit_files(
|
||||
removed_paths,
|
||||
flake_dir,
|
||||
f"Remove user {name}",
|
||||
)
|
||||
updated_paths: list[Path] = []
|
||||
# Remove the user from any group where it belonged:
|
||||
groups_dir = sops_groups_folder(flake_dir)
|
||||
if groups_dir.exists():
|
||||
for group in os.listdir(groups_dir):
|
||||
group_folder = groups_dir / group
|
||||
if not group_folder.is_dir():
|
||||
continue
|
||||
memberships = group_folder / "users"
|
||||
if not (memberships / name).exists():
|
||||
continue
|
||||
log.info(f"Removing user {name} from group {group}")
|
||||
updated_paths.extend(
|
||||
groups.remove_member(flake_dir, group, groups.users_folder, name)
|
||||
)
|
||||
# Remove the user's key:
|
||||
updated_paths.extend(remove_object(sops_users_folder(flake_dir), name))
|
||||
# Remove the user from any secret where it was used:
|
||||
updated_paths.extend(update_secrets(flake_dir))
|
||||
commit_files(updated_paths, flake_dir, f"Remove user {name}")
|
||||
|
||||
|
||||
def get_user(flake_dir: Path, name: str) -> sops.SopsKey:
|
||||
@@ -115,15 +133,19 @@ def add_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
keys_args = (args.age_key, args.agekey, args.pgp_key)
|
||||
keys_count = sum(1 if key else 0 for key in keys_args)
|
||||
if keys_count != 1:
|
||||
err_msg = (
|
||||
f"Please provide one key (got {keys_count}) through `--pgp-key`, "
|
||||
f"`--age-key`, or as a positional (age key) argument."
|
||||
)
|
||||
raise ClanError(err_msg)
|
||||
if args.age_key or args.agekey:
|
||||
key_type = sops.KeyType.AGE
|
||||
elif args.pgp_key:
|
||||
key_type = sops.KeyType.PGP
|
||||
else:
|
||||
msg = "BUG!: key type not set"
|
||||
raise ValueError(msg)
|
||||
key_type = sops.KeyType.PGP
|
||||
key = args.agekey or args.age_key or args.pgp_key
|
||||
assert key is not None, "key is None"
|
||||
add_user(args.flake.path, args.user, key, key_type, args.force)
|
||||
|
||||
|
||||
|
||||
@@ -7,15 +7,18 @@ from clan_cli.errors import ClanError
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.secrets import sops
|
||||
from clan_cli.secrets.folders import (
|
||||
sops_groups_folder,
|
||||
sops_machines_folder,
|
||||
sops_secrets_folder,
|
||||
sops_users_folder,
|
||||
)
|
||||
from clan_cli.secrets.machines import add_machine, add_secret, has_machine
|
||||
from clan_cli.secrets.secrets import (
|
||||
allow_member,
|
||||
collect_keys_for_path,
|
||||
decrypt_secret,
|
||||
encrypt_secret,
|
||||
groups_folder,
|
||||
has_secret,
|
||||
)
|
||||
from clan_cli.ssh.upload import upload
|
||||
@@ -284,6 +287,16 @@ class SecretStore(StoreBase):
|
||||
continue
|
||||
|
||||
secret_path = self.secret_path(generator, file.name)
|
||||
|
||||
for group in self.machine.deployment["sops"]["defaultGroups"]:
|
||||
allow_member(
|
||||
groups_folder(secret_path),
|
||||
sops_groups_folder(self.machine.flake_dir),
|
||||
group,
|
||||
# we just want to create missing symlinks, we call update_keys below:
|
||||
do_update_keys=False,
|
||||
)
|
||||
|
||||
update_keys(
|
||||
secret_path,
|
||||
collect_keys_for_path(secret_path),
|
||||
|
||||
@@ -11,6 +11,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.secrets.folders import sops_secrets_folder
|
||||
from fixtures_flakes import FlakeForTest
|
||||
from helpers import cli
|
||||
from stdout import CaptureOutput
|
||||
@@ -26,9 +27,18 @@ def _test_identities(
|
||||
test_flake: FlakeForTest,
|
||||
capture_output: CaptureOutput,
|
||||
age_keys: list["KeyPair"],
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
sops_folder = test_flake.path / "sops"
|
||||
|
||||
what_singular = what[:-1]
|
||||
test_secret_name = f"{what_singular}_secret"
|
||||
|
||||
# fake some admin user that's different from the identity, we are going to
|
||||
# try to add/remove/update from the clan, this way we can check that keys
|
||||
# are properly updated on secrets when an identity changes.
|
||||
admin_age_key = age_keys[2]
|
||||
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
@@ -42,6 +52,18 @@ def _test_identities(
|
||||
)
|
||||
assert (sops_folder / what / "foo" / "key.json").exists()
|
||||
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"users",
|
||||
"add",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"admin",
|
||||
admin_age_key.pubkey,
|
||||
]
|
||||
)
|
||||
|
||||
with pytest.raises(ClanError): # raises "foo already exists"
|
||||
cli.run(
|
||||
[
|
||||
@@ -55,18 +77,44 @@ def _test_identities(
|
||||
]
|
||||
)
|
||||
|
||||
# rotate the key
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
what,
|
||||
"add",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"-f",
|
||||
"foo",
|
||||
age_keys[1].privkey,
|
||||
]
|
||||
with monkeypatch.context():
|
||||
monkeypatch.setenv("SOPS_NIX_SECRET", "deadfeed")
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"set",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
f"--{what_singular}",
|
||||
"foo",
|
||||
test_secret_name,
|
||||
]
|
||||
)
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
test_secret_name,
|
||||
expected_age_recipients_keypairs=[age_keys[0], admin_age_key],
|
||||
)
|
||||
|
||||
with monkeypatch.context():
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
what,
|
||||
"add",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"-f",
|
||||
"foo",
|
||||
age_keys[1].privkey,
|
||||
]
|
||||
)
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
test_secret_name,
|
||||
expected_age_recipients_keypairs=[age_keys[1], admin_age_key],
|
||||
)
|
||||
|
||||
with capture_output as output:
|
||||
@@ -96,26 +144,52 @@ def _test_identities(
|
||||
cli.run(["secrets", what, "list", "--flake", str(test_flake.path)])
|
||||
assert "foo" not in output.out
|
||||
|
||||
if what == "machines":
|
||||
# lopter@(2025-02-02): Let's address #2659 first and then figure out
|
||||
# what we wanna do about secrets when a machine is deleted.
|
||||
return
|
||||
|
||||
user_symlink = sops_folder / "secrets" / test_secret_name / what / "foo"
|
||||
err_msg = (
|
||||
f"Symlink to {what_singular} foo's key in secret "
|
||||
f"{test_secret_name} was not cleaned up after "
|
||||
f"{what_singular} foo was removed."
|
||||
)
|
||||
assert not user_symlink.exists(follow_symlinks=False), err_msg
|
||||
|
||||
|
||||
def test_users(
|
||||
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"]
|
||||
test_flake: FlakeForTest,
|
||||
capture_output: CaptureOutput,
|
||||
age_keys: list["KeyPair"],
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_test_identities("users", test_flake, capture_output, age_keys)
|
||||
_test_identities("users", test_flake, capture_output, age_keys, monkeypatch)
|
||||
|
||||
|
||||
def test_machines(
|
||||
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"]
|
||||
test_flake: FlakeForTest,
|
||||
capture_output: CaptureOutput,
|
||||
age_keys: list["KeyPair"],
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
_test_identities("machines", test_flake, capture_output, age_keys)
|
||||
_test_identities("machines", test_flake, capture_output, age_keys, monkeypatch)
|
||||
|
||||
|
||||
def test_groups(
|
||||
test_flake: FlakeForTest, capture_output: CaptureOutput, age_keys: list["KeyPair"]
|
||||
test_flake: FlakeForTest,
|
||||
capture_output: CaptureOutput,
|
||||
age_keys: list["KeyPair"],
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
with capture_output as output:
|
||||
cli.run(["secrets", "groups", "list", "--flake", str(test_flake.path)])
|
||||
assert output.out == ""
|
||||
|
||||
machine1_age_key = age_keys[0]
|
||||
user1_age_key = age_keys[1]
|
||||
admin_age_key = age_keys[2]
|
||||
|
||||
with pytest.raises(ClanError): # machine does not exist yet
|
||||
cli.run(
|
||||
[
|
||||
@@ -148,7 +222,7 @@ def test_groups(
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"machine1",
|
||||
age_keys[0].pubkey,
|
||||
machine1_age_key.pubkey,
|
||||
]
|
||||
)
|
||||
cli.run(
|
||||
@@ -184,7 +258,18 @@ def test_groups(
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"user1",
|
||||
age_keys[0].pubkey,
|
||||
user1_age_key.pubkey,
|
||||
]
|
||||
)
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"users",
|
||||
"add",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"admin",
|
||||
admin_age_key.pubkey,
|
||||
]
|
||||
)
|
||||
cli.run(
|
||||
@@ -205,6 +290,37 @@ def test_groups(
|
||||
assert "user1" in out
|
||||
assert "machine1" in out
|
||||
|
||||
secret_name = "foo"
|
||||
|
||||
with monkeypatch.context():
|
||||
monkeypatch.setenv("SOPS_NIX_SECRET", "deafbeef")
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", admin_age_key.privkey)
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"set",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"--group",
|
||||
"group1",
|
||||
secret_name,
|
||||
]
|
||||
)
|
||||
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[
|
||||
machine1_age_key,
|
||||
user1_age_key,
|
||||
admin_age_key,
|
||||
],
|
||||
err_msg=(
|
||||
f"The secret `{secret_name}` owned by group1 was not encrypted "
|
||||
f"with all members of the group."
|
||||
),
|
||||
)
|
||||
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
@@ -216,6 +332,59 @@ def test_groups(
|
||||
"user1",
|
||||
]
|
||||
)
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
|
||||
err_msg=(
|
||||
f"The secret `{secret_name}` owned by group1 is still encrypted for "
|
||||
f"`user1` even though this user has been removed from the group."
|
||||
),
|
||||
)
|
||||
|
||||
# re-add the user to the group
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"groups",
|
||||
"add-user",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"group1",
|
||||
"user1",
|
||||
]
|
||||
)
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[
|
||||
machine1_age_key,
|
||||
user1_age_key,
|
||||
admin_age_key,
|
||||
],
|
||||
)
|
||||
# and instead of removing the user from the group, remove the
|
||||
# user instead, it should also remove it from the group:
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
"users",
|
||||
"remove",
|
||||
"--flake",
|
||||
str(test_flake.path),
|
||||
"user1",
|
||||
]
|
||||
)
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[machine1_age_key, admin_age_key],
|
||||
err_msg=(
|
||||
f"The secret `{secret_name}` owned by group1 is still encrypted "
|
||||
f"for `user1` even though this user has been deleted."
|
||||
),
|
||||
)
|
||||
|
||||
cli.run(
|
||||
[
|
||||
"secrets",
|
||||
@@ -227,9 +396,50 @@ def test_groups(
|
||||
"machine1",
|
||||
]
|
||||
)
|
||||
assert_sops_file_recipients(
|
||||
test_flake.path,
|
||||
secret_name,
|
||||
expected_age_recipients_keypairs=[admin_age_key],
|
||||
err_msg=(
|
||||
f"The secret `{secret_name}` owned by group1 is still encrypted for "
|
||||
f"`machine1` even though this machine has been removed from the group."
|
||||
),
|
||||
)
|
||||
|
||||
groups = os.listdir(test_flake.path / "sops" / "groups")
|
||||
assert len(groups) == 0
|
||||
|
||||
# Check if the symlink to the group was removed from our foo test secret:
|
||||
group_symlink = test_flake.path / "sops/secrets/foo/groups/group1"
|
||||
err_msg = (
|
||||
"Symlink to group1's key in foo secret "
|
||||
"was not cleaned up after group1 was removed"
|
||||
)
|
||||
assert not group_symlink.exists(follow_symlinks=False), err_msg
|
||||
|
||||
|
||||
def assert_sops_file_recipients(
|
||||
flake_path: Path,
|
||||
secret_name: str,
|
||||
expected_age_recipients_keypairs: list["KeyPair"],
|
||||
err_msg: str | None = None,
|
||||
) -> None:
|
||||
"""Checks that the recipients of a SOPS file matches expectations.
|
||||
|
||||
:param err_msg: in case of failure, if you gave an error message then it
|
||||
will be displayed, otherwise pytest will display the two different sets
|
||||
of recipients.
|
||||
"""
|
||||
sops_file = sops_secrets_folder(flake_path) / secret_name / "secret"
|
||||
with sops_file.open("rb") as fp:
|
||||
sops_data = json.load(fp)
|
||||
age_recipients = {each["recipient"] for each in sops_data["sops"]["age"]}
|
||||
expected_age_recipients = {pair.pubkey for pair in expected_age_recipients_keypairs}
|
||||
if not err_msg:
|
||||
assert age_recipients == expected_age_recipients
|
||||
return
|
||||
assert age_recipients == expected_age_recipients, err_msg
|
||||
|
||||
|
||||
@contextmanager
|
||||
def use_age_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
|
||||
|
||||
Reference in New Issue
Block a user