Merge pull request 'clan-cli: secrets: fix encryption, and collect public and private keys separately' (#2500) from lo-sops-filter-env into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2500
This commit is contained in:
Mic92
2024-11-27 06:27:53 +00:00
7 changed files with 365 additions and 168 deletions

View File

@@ -81,6 +81,7 @@ def create_machine(opts: CreateOptions) -> None:
msg = "Machine name must be a valid hostname"
raise ClanError(msg, location="Create Machine")
# lopter@(2024-10-22): Could we just use warn and use the existing config?
if dst.exists():
msg = f"Machine {machine_name} already exists in {clan_dir}"
description = (

View File

@@ -114,8 +114,8 @@ def deploy_machine(machines: MachineGroup) -> None:
def deploy(machine: Machine) -> None:
host = machine.build_host
generate_facts([machine])
generate_vars([machine])
generate_facts([machine], service=None, regenerate=False)
generate_vars([machine], generator_name=None, regenerate=False)
upload_secrets(machine)
upload_secret_vars(machine)

View File

@@ -9,7 +9,7 @@ from clan_cli.git import commit_files
from . import sops
from .secrets import update_secrets
from .sops import (
default_admin_key_path,
default_admin_private_key_path,
generate_private_key,
maybe_get_admin_public_key,
)
@@ -23,7 +23,7 @@ def generate_key() -> sops.SopsKey:
print(f"{key.key_type.name} key {key.pubkey} is already set")
return key
path = default_admin_key_path()
path = default_admin_private_key_path()
_, pub_key = generate_private_key(out_file=path)
print(
f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
@@ -62,8 +62,9 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None:
parser_generate = subparser.add_parser(
"generate",
description=(
"Generate an age key for the Clan, "
"to use PGP set `SOPS_PGP_FP` in your environment."
"Generate an age key for the Clan, if you already have an age "
"or PGP key, then use it to create your user, see: "
"`clan secrets users add --help'"
),
)
parser_generate.set_defaults(func=generate_command)

View File

@@ -28,7 +28,12 @@ from .folders import (
sops_secrets_folder,
sops_users_folder,
)
from .sops import decrypt_file, encrypt_file, ensure_admin_key, read_key, update_keys
from .sops import (
decrypt_file,
encrypt_file,
read_key,
update_keys,
)
from .types import VALID_SECRET_NAME, secret_name_type
log = logging.getLogger(__name__)
@@ -89,7 +94,7 @@ def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
def encrypt_secret(
flake_dir: Path,
secret_path: Path,
value: IO[str] | str | bytes | None,
value: IO[bytes] | str | bytes | None,
add_users: list[str] | None = None,
add_machines: list[str] | None = None,
add_groups: list[str] | None = None,
@@ -101,9 +106,13 @@ def encrypt_secret(
add_machines = []
if add_users is None:
add_users = []
key = ensure_admin_key(flake_dir)
key = sops.ensure_admin_public_key(flake_dir)
recipient_keys = set()
# encrypt_secret can be called before the secret has been created
# so don't try to call sops.update_keys on a non-existent file:
do_update_keys = False
files_to_commit = []
for user in add_users:
files_to_commit.extend(
@@ -111,7 +120,7 @@ def encrypt_secret(
users_folder(secret_path),
sops_users_folder(flake_dir),
user,
False,
do_update_keys,
)
)
@@ -121,7 +130,7 @@ def encrypt_secret(
machines_folder(secret_path),
sops_machines_folder(flake_dir),
machine,
False,
do_update_keys,
)
)
@@ -131,7 +140,7 @@ def encrypt_secret(
groups_folder(secret_path),
sops_groups_folder(flake_dir),
group,
False,
do_update_keys,
)
)
@@ -144,7 +153,7 @@ def encrypt_secret(
users_folder(secret_path),
sops_users_folder(flake_dir),
key.username,
False,
do_update_keys,
)
)
@@ -296,7 +305,10 @@ def list_command(args: argparse.Namespace) -> None:
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
ensure_admin_key(flake_dir)
# lopter(2024-10): I can't think of a good way to ensure that we have the
# private key for the secret. I mean we could collect all private keys we
# could find and then make sure we have the one for the secret, but that
# seems complicated for little ux gain?
path = secret_path / "secret"
if not path.exists():
msg = f"Secret '{secret_path!s}' does not exist"
@@ -320,7 +332,7 @@ def is_tty_interactive() -> bool:
def set_command(args: argparse.Namespace) -> None:
env_value = os.environ.get("SOPS_NIX_SECRET")
secret_value: str | IO[str] | None = sys.stdin
secret_value: str | IO[bytes] | None = sys.stdin.buffer
if args.edit:
secret_value = None
elif env_value:

View File

@@ -1,24 +1,28 @@
import dataclasses
import enum
import functools
import io
import json
import logging
import os
import shutil
import subprocess
from collections.abc import Iterable, Iterator
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from collections.abc import Iterable, Sequence
from contextlib import suppress
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import IO
from typing import IO, Protocol
import clan_cli.cmd
from clan_cli.api import API
from clan_cli.cmd import Log, run
from clan_cli.dirs import user_config_dir
from clan_cli.errors import ClanError
from clan_cli.errors import ClanError, CmdOut
from clan_cli.nix import nix_shell
from .folders import sops_machines_folder, sops_users_folder
log = logging.getLogger(__name__)
class KeyType(enum.Enum):
AGE = enum.auto()
@@ -30,11 +34,76 @@ class KeyType(enum.Enum):
return cls.__members__.get(value.upper())
return None
@property
def sops_recipient_attr(self) -> str:
"""Name of the attribute to get the recipient key from a Sops file."""
if self == self.AGE:
return "recipient"
if self == self.PGP:
return "fp"
msg = (
f"KeyType is not properly implemented: "
f'"sops_recipient_attr" is missing for key type "{self.name}"'
)
raise ClanError(msg)
@dataclass(frozen=True, eq=False)
def collect_public_keys(self) -> Sequence[str]:
keyring: Sequence[str] = []
if self == self.AGE:
if keys := os.environ.get("SOPS_AGE_KEY"):
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and
# reads identities line by line. See age/keysource.go in
# Sops, and age/parse.go in Age.
for private_key in keys.strip().splitlines():
public_key = get_public_age_key(private_key)
log.info(
f"Found age public key from a private key "
f"in the environment (SOPS_AGE_KEY): {public_key}"
)
keyring.append(public_key)
def maybe_read_from_path(key_path: Path) -> None:
try:
# as in parse.go in age:
lines = Path(key_path).read_text().strip().splitlines()
for private_key in filter(lambda ln: not ln.startswith("#"), lines):
public_key = get_public_age_key(private_key)
log.info(
f"Found age public key from a private key "
f"in {key_path}: {public_key}"
)
keyring.append(public_key)
except FileNotFoundError:
return
except Exception as ex:
log.warn(f"Could not read age keys from {key_path}: {ex}")
# Sops will try every location, see age/keysource.go
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
maybe_read_from_path(Path(key_path))
maybe_read_from_path(user_config_dir() / "sops/age/keys.txt")
return keyring
if self == self.PGP:
if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"):
for fp in pgp_fingerprints.strip().split(","):
msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}"
log.info(msg)
keyring.append(fp)
return keyring
msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys"
raise ClanError(msg)
@dataclasses.dataclass(frozen=True)
class SopsKey:
pubkey: str
username: str
# Two SopsKey are considered equal even
# if they don't have the same username:
username: str = dataclasses.field(compare=False)
key_type: KeyType
def as_dict(self) -> dict[str, str]:
@@ -44,6 +113,131 @@ class SopsKey:
"type": self.key_type.name.lower(),
}
@classmethod
def load_dir(cls, folder: Path) -> "SopsKey": # noqa: ANN102
"""Load from the file named `keys.json` in the given directory."""
pubkey, key_type = read_key(folder)
username = ""
return cls(pubkey, username, key_type)
@classmethod
def collect_public_keys(cls) -> Sequence["SopsKey"]: # noqa: ANN102
return [
cls(pubkey=key, username="", key_type=key_type)
for key_type in KeyType
for key in key_type.collect_public_keys()
]
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
ERROR_GENERIC = 1
COULD_NOT_READ_INPUT_FILE = 2
COULD_NOT_WRITE_OUTPUT_FILE = 3
ERROR_DUMPING_TREE = 4
ERROR_READING_CONFIG = 5
ERROR_INVALID_KMS_ENCRYPTION_CONTEXT_FORMAT = 6
ERROR_INVALID_SET_FORMAT = 7
ERROR_CONFLICTING_PARAMETERS = 8
ERROR_ENCRYPTING_MAC = 21
ERROR_ENCRYPTING_TREE = 23
ERROR_DECRYPTING_MAC = 24
ERROR_DECRYPTING_TREE = 25
CANNOT_CHANGE_KEYS_FROM_NON_EXISTENT_FILE = 49
MAC_MISMATCH = 51
MAC_NOT_FOUND = 52
CONFIG_FILE_NOT_FOUND = 61
KEYBOARD_INTERRUPT = 85
INVALID_TREE_PATH_FORMAT = 91
NEED_AT_LEAST_ONE_DOCUMENT = 92
NO_FILE_SPECIFIED = 100
COULD_NOT_RETRIEVE_KEY = 128
NO_ENCRYPTION_KEY_FOUND = 111
DUPLICATE_DECRYPTION_KEY_TYPE = 112
FILE_HAS_NOT_BEEN_MODIFIED = 200
NO_EDITOR_FOUND = 201
FAILED_TO_COMPARE_VERSIONS = 202
FILE_ALREADY_ENCRYPTED = 203
@classmethod
def parse(cls, code: int) -> "ExitStatus | None": # noqa: ANN102
return ExitStatus(code) if code in ExitStatus else None
class Executer(Protocol):
def __call__(
self, cmd: list[str], *, env: dict[str, str] | None = None
) -> CmdOut: ...
class Operation(enum.StrEnum):
DECRYPT = "decrypt"
EDIT = "edit"
ENCRYPT = "encrypt"
UPDATE_KEYS = "updatekeys"
def run(
call: Operation,
secret_path: Path,
public_keys: Iterable[tuple[str, KeyType]],
executer: Executer,
) -> tuple[int, str]:
"""Call the sops binary for the given operation."""
# louis(2024-11-19): I regrouped the call into the sops binary into this
# one place because calling into sops needs to be done with a carefully
# setup context, and I don't feel good about the idea of having that logic
# exist in multiple places.
sops_cmd = ["sops"]
environ = os.environ.copy()
with NamedTemporaryFile(delete=False, mode="w") as manifest:
if call == Operation.DECRYPT:
sops_cmd.append("decrypt")
else:
# When sops is used to edit a file the config is only used at
# file creation, otherwise the keys from the exising file are
# used.
sops_cmd.extend(["--config", manifest.name])
keys_by_type: dict[KeyType, list[str]] = {}
keys_by_type = {key_type: [] for key_type in KeyType}
for key, key_type in public_keys:
keys_by_type[key_type].append(key)
it = keys_by_type.items()
key_groups = [{key_type.name.lower(): keys for key_type, keys in it}]
rules = {"creation_rules": [{"key_groups": key_groups}]}
json.dump(rules, manifest, indent=2)
manifest.flush()
if call == Operation.ENCRYPT:
# Remove SOPS env vars used to specify public keys to force
# sops to use our config file [1]; so that the file gets
# encrypted with our keys and not something leaking out of
# the environment.
#
# [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229
for var in os.environ:
if var.startswith("SOPS_") and var not in { # allowed:
"SOPS_GPG_EXEC",
"SOPS_AGE_KEY",
"SOPS_AGE_KEY_FILE",
}:
del environ[var]
sops_cmd.extend(["encrypt", "--in-place"])
elif call == Operation.UPDATE_KEYS:
sops_cmd.extend(["updatekeys", "--yes"])
elif call != Operation.EDIT:
known_operations = ",".join(Operation.__members__.values())
msg = (
f"Unsupported sops operation {call.value} "
f"(known operations: {known_operations})"
)
raise ClanError(msg)
sops_cmd.append(str(secret_path))
cmd = nix_shell(["nixpkgs#sops"], sops_cmd)
p = executer(cmd, env=environ)
return p.returncode, p.stdout
def get_public_age_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
@@ -60,7 +254,7 @@ def get_public_age_key(privkey: str) -> str:
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
try:
proc = run(cmd)
proc = clan_cli.cmd.run(cmd)
res = proc.stdout.strip()
pubkey = None
private_key = None
@@ -122,7 +316,7 @@ def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
raise ClanError(msg)
def default_admin_key_path() -> Path:
def default_admin_private_key_path() -> Path:
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
if raw_path:
return Path(raw_path)
@@ -131,37 +325,26 @@ def default_admin_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> None | SopsKey:
age_key = os.environ.get("SOPS_AGE_KEY")
pgp_key = os.environ.get("SOPS_PGP_FP")
if age_key and pgp_key:
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
keyring = SopsKey.collect_public_keys()
if len(keyring) == 0:
return None
if len(keyring) > 1:
last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]]
msg = (
f"Found more than {len(keyring)} public keys in your "
f"environment/system and cannot decide which one to "
f"use, first {len(last_3)}:\n\n"
f"- {'\n- '.join(last_3)}\n\n"
f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or "
f"SOPS_PGP_FP appropriately"
)
raise ClanError(msg)
if age_key:
return SopsKey(
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
)
if pgp_key:
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
path = default_admin_key_path()
if path.exists():
return SopsKey(
pubkey=get_public_age_key(path.read_text()),
key_type=KeyType.AGE,
username="",
)
return None
return keyring[0]
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
key = maybe_get_admin_public_key()
if not key:
return None
return maybe_get_user_or_machine(flake_dir, key)
def ensure_admin_key(flake_dir: Path) -> SopsKey:
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
@@ -169,100 +352,104 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey:
raise ClanError(msg)
@contextmanager
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
all_keys: dict[str, list[str]] = {
key_type.lower(): [] for key_type in KeyType.__members__
}
for key, key_type in keys:
all_keys[key_type.name.lower()].append(key)
with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
manifest.flush()
yield Path(manifest.name)
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
keys_sorted = sorted(keys)
with sops_manifest(keys_sorted) as manifest:
secret_path = secret_path / "secret"
time_before = secret_path.stat().st_mtime
cmd = nix_shell(
["nixpkgs#sops"],
[
"sops",
"--config",
str(manifest),
"updatekeys",
"--yes",
str(secret_path),
],
error_msg = f"Could not update keys for {secret_path}"
executer = functools.partial(
clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg
)
run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}")
if time_before == secret_path.stat().st_mtime:
return []
return [secret_path]
rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer)
was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED
return [secret_path] if was_modified else []
def encrypt_file(
secret_path: Path,
content: IO[str] | str | bytes | None,
content: str | IO[bytes] | bytes | None,
pubkeys: list[tuple[str, KeyType]],
) -> None:
folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True)
with sops_manifest(pubkeys) as manifest:
if not content:
args = ["sops", "--config", str(manifest)]
args.extend([str(secret_path)])
cmd = nix_shell(["nixpkgs#sops"], args)
# Don't use our `run` here, because it breaks editor integration.
# We never need this in our UI.
p = subprocess.run(cmd, check=False)
# returns 200 if the file is changed
if p.returncode != 0 and p.returncode != 200:
msg = (
f"Failed to encrypt {secret_path}: sops exited with {p.returncode}"
def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut:
return CmdOut(
stdout="",
stderr="",
cwd=Path.cwd(),
env=env,
command_list=cmd,
returncode=subprocess.run(cmd, env=env, check=False).returncode,
msg=None,
)
raise ClanError(msg)
return
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
with NamedTemporaryFile(delete=False) as f:
rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer)
status = ExitStatus.parse(rc)
if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED:
return
msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}"
raise ClanError(msg)
# lopter(2024-11-19): imo NamedTemporaryFile does RAII wrong since it
# creates the file in __init__, when really it should be created in
# __enter__ (that is in Python __enter__ is actually __init__ from a RAII
# perspective, and __init__ should just be thought off as syntax sugar to
# capture extra context), and now the linter is unhappy so hush it. Note
# that if NamedTemporaryFile created the file in __enter__ then we'd have
# to change exception handling:
try:
source = NamedTemporaryFile(dir="/dev/shm", delete=False) # noqa: SIM115
except (FileNotFoundError, PermissionError):
source = NamedTemporaryFile(delete=False) # noqa: SIM115
try: # swap the secret:
with source:
if isinstance(content, str):
Path(f.name).write_text(content)
source.file.write(content.encode())
elif isinstance(content, bytes):
Path(f.name).write_bytes(content)
elif isinstance(content, io.IOBase):
with Path(f.name).open("w") as fd:
shutil.copyfileobj(content, fd)
source.file.write(content)
elif isinstance(content, io.BufferedReader):
# lopter@(2024-11-19): mypy is freaking out on the 1st
# argument, idk why, it says:
#
# > Cannot infer type argument 1 of "copyfileobj"
shutil.copyfileobj(content, source.file) # type: ignore[misc]
else:
msg = f"Invalid content type: {type(content)}"
raise ClanError(msg)
# we pass an empty manifest to pick up existing configuration of the user
args = ["sops", "--config", str(manifest)]
args.extend(["-i", "--encrypt", str(f.name)])
cmd = nix_shell(["nixpkgs#sops"], args)
run(cmd, log=Log.BOTH)
executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH)
run(Operation.ENCRYPT, Path(source.name), pubkeys, executer)
# atomic copy of the encrypted file
with NamedTemporaryFile(dir=folder, delete=False) as f2:
shutil.copyfile(f.name, f2.name)
Path(f2.name).rename(secret_path)
with NamedTemporaryFile(dir=folder, delete=False) as dest:
shutil.copyfile(source.name, dest.name)
Path(dest.name).rename(secret_path)
finally:
with suppress(OSError):
Path(f.name).unlink()
Path(source.name).unlink()
def decrypt_file(secret_path: Path) -> str:
with sops_manifest([]) as manifest:
cmd = nix_shell(
["nixpkgs#sops"],
["sops", "--config", str(manifest), "--decrypt", str(secret_path)],
# decryption uses private keys from the environment or default paths:
no_public_keys_needed: list[tuple[str, KeyType]] = []
executer = functools.partial(
clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}"
)
res = run(cmd, error_msg=f"Could not decrypt {secret_path}")
return res.stdout
_, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer)
return stdout
def get_recipients(secret_path: Path) -> set[SopsKey]:
sops_attrs = json.loads((secret_path / "secret").read_text())["sops"]
return {
SopsKey(
pubkey=recipient[key_type.sops_recipient_attr],
username="",
key_type=key_type,
)
for key_type in KeyType
for recipient in sops_attrs[key_type.name.lower()] or []
}
def get_meta(secret_path: Path) -> dict:

View File

@@ -21,14 +21,15 @@ def secret_name_type(arg_value: str) -> str:
def public_or_private_age_key_type(arg_value: str) -> str:
if Path(arg_value).is_file():
arg_value = Path(arg_value).read_text().strip()
if arg_value.startswith("age1"):
return arg_value.strip()
if arg_value.startswith("AGE-SECRET-KEY-"):
return get_public_age_key(arg_value)
if not arg_value.startswith("age1"):
msg = f"Please provide an age key starting with age1, got: '{arg_value}'"
for line in arg_value.splitlines():
if line.startswith("#"):
continue
if line.startswith("age1"):
return line.strip()
if line.startswith("AGE-SECRET-KEY-"):
return get_public_age_key(line)
msg = f"Please provide an age key starting with age1 or AGE-SECRET-KEY-, got: '{arg_value}'"
raise ClanError(msg)
return arg_value
def group_or_user_name_type(what: str) -> Callable[[str], str]:

View File

@@ -1,10 +1,10 @@
import json
from dataclasses import dataclass
from pathlib import Path
from typing import override
from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine
from clan_cli.secrets import sops
from clan_cli.secrets.folders import (
sops_machines_folder,
sops_secrets_folder,
@@ -17,8 +17,8 @@ from clan_cli.secrets.secrets import (
encrypt_secret,
has_secret,
)
from clan_cli.secrets.sops import KeyType, generate_private_key
from clan_cli.vars.generate import Generator, Var
from clan_cli.vars.generate import Generator
from clan_cli.vars.var import Var
from . import SecretStoreBase
@@ -52,7 +52,7 @@ class SecretStore(SecretStoreBase):
if has_machine(self.machine.flake_dir, self.machine.name):
return
priv_key, pub_key = generate_private_key()
priv_key, pub_key = sops.generate_private_key()
encrypt_secret(
self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir)
@@ -69,24 +69,18 @@ class SecretStore(SecretStoreBase):
def user_has_access(
self, user: str, generator: Generator, secret_name: str
) -> bool:
secret_path = self.secret_path(generator, secret_name)
secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
users_folder_path = sops_users_folder(self.machine.flake_dir)
user_pubkey = json.loads((users_folder_path / user / "key.json").read_text())[
"publickey"
]
return user_pubkey in recipients
key_dir = sops_users_folder(self.machine.flake_dir) / user
return self.key_has_access(key_dir, generator, secret_name)
def machine_has_access(self, generator: Generator, secret_name: str) -> bool:
key_dir = sops_machines_folder(self.machine.flake_dir) / self.machine.name
return self.key_has_access(key_dir, generator, secret_name)
def key_has_access(
self, key_dir: Path, generator: Generator, secret_name: str
) -> bool:
secret_path = self.secret_path(generator, secret_name)
secret = json.loads((secret_path / "secret").read_text())
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
machines_folder_path = sops_machines_folder(self.machine.flake_dir)
machine_pubkey = json.loads(
(machines_folder_path / self.machine.name / "key.json").read_text()
)["publickey"]
return machine_pubkey in recipients
return sops.SopsKey.load_dir(key_dir) in sops.get_recipients(secret_path)
def secret_path(self, generator: Generator, secret_name: str) -> Path:
return self.directory(generator, secret_name)
@@ -141,7 +135,7 @@ class SecretStore(SecretStoreBase):
secret_folder = self.secret_path(generator, name)
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]:
def collect_keys_for_secret(self, path: Path) -> set[sops.SopsKey]:
from clan_cli.secrets.secrets import (
collect_keys_for_path,
collect_keys_for_type,
@@ -159,23 +153,24 @@ class SecretStore(SecretStoreBase):
self.machine.flake_dir / "sops" / "groups" / group / "users"
)
)
return keys
@override
return {
sops.SopsKey(pubkey=key, username="", key_type=key_type)
for (key, key_type) in keys
}
# }
def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]:
secret_path = self.secret_path(generator, name)
recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"]
current_recipients = {r["recipient"] for r in recipients_}
wanted_recipients = {
key[0] for key in self.collect_keys_for_secret(secret_path)
}
current_recipients = sops.get_recipients(secret_path)
wanted_recipients = self.collect_keys_for_secret(secret_path)
needs_update = current_recipients != wanted_recipients
recipients_to_add = wanted_recipients - current_recipients
var_id = f"{generator.name}/{name}"
msg = (
f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. "
f"This could have been a malicious actor trying to add their keys, please investigate. "
f"Added keys: {', '.join(recipients_to_add)}"
f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}"
)
return needs_update, msg