Merge pull request 'clan_cli.secrets.groups: update keys if members are added/removed' (#389) from lassulus-secrets_cli into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/389
This commit is contained in:
@@ -5,7 +5,13 @@ from pathlib import Path
|
||||
from ..errors import ClanError
|
||||
from ..machines.types import machine_name_type, validate_hostname
|
||||
from . import secrets
|
||||
from .folders import sops_groups_folder, sops_machines_folder, sops_users_folder
|
||||
from .folders import (
|
||||
sops_groups_folder,
|
||||
sops_machines_folder,
|
||||
sops_secrets_folder,
|
||||
sops_users_folder,
|
||||
)
|
||||
from .sops import update_keys
|
||||
from .types import (
|
||||
VALID_USER_NAME,
|
||||
group_name_type,
|
||||
@@ -78,6 +84,16 @@ def list_directory(directory: Path) -> str:
|
||||
return msg
|
||||
|
||||
|
||||
def update_group_keys(group: str) -> None:
|
||||
for secret_ in secrets.list_secrets():
|
||||
secret = sops_secrets_folder() / secret_
|
||||
if (secret / "groups" / group).is_symlink():
|
||||
update_keys(
|
||||
secret,
|
||||
list(sorted(secrets.collect_keys_for_path(secret))),
|
||||
)
|
||||
|
||||
|
||||
def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
source = source_folder / name
|
||||
if not source.exists():
|
||||
@@ -93,6 +109,7 @@ def add_member(group_folder: Path, source_folder: Path, name: str) -> None:
|
||||
)
|
||||
os.remove(user_target)
|
||||
user_target.symlink_to(os.path.relpath(source, user_target.parent))
|
||||
update_group_keys(group_folder.parent.name)
|
||||
|
||||
|
||||
def remove_member(group_folder: Path, name: str) -> None:
|
||||
@@ -103,6 +120,9 @@ def remove_member(group_folder: Path, name: str) -> None:
|
||||
raise ClanError(msg)
|
||||
os.remove(target)
|
||||
|
||||
if len(os.listdir(group_folder)) > 0:
|
||||
update_group_keys(group_folder.parent.name)
|
||||
|
||||
if len(os.listdir(group_folder)) == 0:
|
||||
os.rmdir(group_folder)
|
||||
|
||||
|
||||
@@ -126,7 +126,11 @@ def update_keys(secret_path: Path, keys: list[str]) -> None:
|
||||
str(secret_path / "secret"),
|
||||
],
|
||||
)
|
||||
subprocess.run(cmd, check=True)
|
||||
res = subprocess.run(cmd)
|
||||
if res.returncode != 0:
|
||||
raise ClanError(
|
||||
f"Failed to update keys for {secret_path}: sops exited with {res.returncode}"
|
||||
)
|
||||
|
||||
|
||||
def encrypt_file(
|
||||
@@ -177,7 +181,11 @@ def decrypt_file(secret_path: Path) -> str:
|
||||
cmd = nix_shell(
|
||||
["sops"], ["sops", "--config", str(manifest), "--decrypt", str(secret_path)]
|
||||
)
|
||||
res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, text=True)
|
||||
res = subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
|
||||
if res.returncode != 0:
|
||||
raise ClanError(
|
||||
f"Failed to decrypt {secret_path}: sops exited with {res.returncode}"
|
||||
)
|
||||
return res.stdout
|
||||
|
||||
|
||||
|
||||
@@ -107,8 +107,11 @@ def test_groups(
|
||||
@contextmanager
|
||||
def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
|
||||
old_key = os.environ["SOPS_AGE_KEY_FILE"]
|
||||
monkeypatch.delenv("SOPS_AGE_KEY_FILE")
|
||||
monkeypatch.setenv("SOPS_AGE_KEY", key)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
monkeypatch.delenv("SOPS_AGE_KEY")
|
||||
monkeypatch.setenv("SOPS_AGE_KEY_FILE", old_key)
|
||||
|
||||
@@ -190,6 +193,23 @@ def test_secrets(
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
|
||||
# extend group will update secrets
|
||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey])
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user2"])
|
||||
|
||||
with use_key(age_keys[2].privkey, monkeypatch): # user2
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
|
||||
cli.run(["secrets", "groups", "remove-user", "admin-group", "user2"])
|
||||
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
|
||||
# user2 is not in the group anymore
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
print(capsys.readouterr().out)
|
||||
|
||||
cli.run(["secrets", "groups", "remove-secret", "admin-group", "key"])
|
||||
|
||||
cli.run(["secrets", "remove", "key"])
|
||||
|
||||
Reference in New Issue
Block a user