Merge pull request 'clan-cli: secrets: fix encryption, and collect public and private keys separately' (#2500) from lo-sops-filter-env into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2500
This commit is contained in:
@@ -81,6 +81,7 @@ def create_machine(opts: CreateOptions) -> None:
|
||||
msg = "Machine name must be a valid hostname"
|
||||
raise ClanError(msg, location="Create Machine")
|
||||
|
||||
# lopter@(2024-10-22): Could we just use warn and use the existing config?
|
||||
if dst.exists():
|
||||
msg = f"Machine {machine_name} already exists in {clan_dir}"
|
||||
description = (
|
||||
|
||||
@@ -114,8 +114,8 @@ def deploy_machine(machines: MachineGroup) -> None:
|
||||
|
||||
def deploy(machine: Machine) -> None:
|
||||
host = machine.build_host
|
||||
generate_facts([machine])
|
||||
generate_vars([machine])
|
||||
generate_facts([machine], service=None, regenerate=False)
|
||||
generate_vars([machine], generator_name=None, regenerate=False)
|
||||
|
||||
upload_secrets(machine)
|
||||
upload_secret_vars(machine)
|
||||
|
||||
@@ -9,7 +9,7 @@ from clan_cli.git import commit_files
|
||||
from . import sops
|
||||
from .secrets import update_secrets
|
||||
from .sops import (
|
||||
default_admin_key_path,
|
||||
default_admin_private_key_path,
|
||||
generate_private_key,
|
||||
maybe_get_admin_public_key,
|
||||
)
|
||||
@@ -23,7 +23,7 @@ def generate_key() -> sops.SopsKey:
|
||||
print(f"{key.key_type.name} key {key.pubkey} is already set")
|
||||
return key
|
||||
|
||||
path = default_admin_key_path()
|
||||
path = default_admin_private_key_path()
|
||||
_, pub_key = generate_private_key(out_file=path)
|
||||
print(
|
||||
f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
||||
@@ -62,8 +62,9 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser_generate = subparser.add_parser(
|
||||
"generate",
|
||||
description=(
|
||||
"Generate an age key for the Clan, "
|
||||
"to use PGP set `SOPS_PGP_FP` in your environment."
|
||||
"Generate an age key for the Clan, if you already have an age "
|
||||
"or PGP key, then use it to create your user, see: "
|
||||
"`clan secrets users add --help'"
|
||||
),
|
||||
)
|
||||
parser_generate.set_defaults(func=generate_command)
|
||||
|
||||
@@ -28,7 +28,12 @@ from .folders import (
|
||||
sops_secrets_folder,
|
||||
sops_users_folder,
|
||||
)
|
||||
from .sops import decrypt_file, encrypt_file, ensure_admin_key, read_key, update_keys
|
||||
from .sops import (
|
||||
decrypt_file,
|
||||
encrypt_file,
|
||||
read_key,
|
||||
update_keys,
|
||||
)
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -89,7 +94,7 @@ def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
def encrypt_secret(
|
||||
flake_dir: Path,
|
||||
secret_path: Path,
|
||||
value: IO[str] | str | bytes | None,
|
||||
value: IO[bytes] | str | bytes | None,
|
||||
add_users: list[str] | None = None,
|
||||
add_machines: list[str] | None = None,
|
||||
add_groups: list[str] | None = None,
|
||||
@@ -101,9 +106,13 @@ def encrypt_secret(
|
||||
add_machines = []
|
||||
if add_users is None:
|
||||
add_users = []
|
||||
key = ensure_admin_key(flake_dir)
|
||||
key = sops.ensure_admin_public_key(flake_dir)
|
||||
recipient_keys = set()
|
||||
|
||||
# encrypt_secret can be called before the secret has been created
|
||||
# so don't try to call sops.update_keys on a non-existent file:
|
||||
do_update_keys = False
|
||||
|
||||
files_to_commit = []
|
||||
for user in add_users:
|
||||
files_to_commit.extend(
|
||||
@@ -111,7 +120,7 @@ def encrypt_secret(
|
||||
users_folder(secret_path),
|
||||
sops_users_folder(flake_dir),
|
||||
user,
|
||||
False,
|
||||
do_update_keys,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -121,7 +130,7 @@ def encrypt_secret(
|
||||
machines_folder(secret_path),
|
||||
sops_machines_folder(flake_dir),
|
||||
machine,
|
||||
False,
|
||||
do_update_keys,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -131,7 +140,7 @@ def encrypt_secret(
|
||||
groups_folder(secret_path),
|
||||
sops_groups_folder(flake_dir),
|
||||
group,
|
||||
False,
|
||||
do_update_keys,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -144,7 +153,7 @@ def encrypt_secret(
|
||||
users_folder(secret_path),
|
||||
sops_users_folder(flake_dir),
|
||||
key.username,
|
||||
False,
|
||||
do_update_keys,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -296,7 +305,10 @@ def list_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
|
||||
ensure_admin_key(flake_dir)
|
||||
# lopter(2024-10): I can't think of a good way to ensure that we have the
|
||||
# private key for the secret. I mean we could collect all private keys we
|
||||
# could find and then make sure we have the one for the secret, but that
|
||||
# seems complicated for little ux gain?
|
||||
path = secret_path / "secret"
|
||||
if not path.exists():
|
||||
msg = f"Secret '{secret_path!s}' does not exist"
|
||||
@@ -320,7 +332,7 @@ def is_tty_interactive() -> bool:
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
env_value = os.environ.get("SOPS_NIX_SECRET")
|
||||
secret_value: str | IO[str] | None = sys.stdin
|
||||
secret_value: str | IO[bytes] | None = sys.stdin.buffer
|
||||
if args.edit:
|
||||
secret_value = None
|
||||
elif env_value:
|
||||
|
||||
@@ -1,24 +1,28 @@
|
||||
import dataclasses
|
||||
import enum
|
||||
import functools
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from collections.abc import Iterable, Iterator
|
||||
from contextlib import contextmanager, suppress
|
||||
from dataclasses import dataclass
|
||||
from collections.abc import Iterable, Sequence
|
||||
from contextlib import suppress
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO
|
||||
from typing import IO, Protocol
|
||||
|
||||
import clan_cli.cmd
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.dirs import user_config_dir
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.errors import ClanError, CmdOut
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
from .folders import sops_machines_folder, sops_users_folder
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KeyType(enum.Enum):
|
||||
AGE = enum.auto()
|
||||
@@ -30,11 +34,76 @@ class KeyType(enum.Enum):
|
||||
return cls.__members__.get(value.upper())
|
||||
return None
|
||||
|
||||
@property
|
||||
def sops_recipient_attr(self) -> str:
|
||||
"""Name of the attribute to get the recipient key from a Sops file."""
|
||||
if self == self.AGE:
|
||||
return "recipient"
|
||||
if self == self.PGP:
|
||||
return "fp"
|
||||
msg = (
|
||||
f"KeyType is not properly implemented: "
|
||||
f'"sops_recipient_attr" is missing for key type "{self.name}"'
|
||||
)
|
||||
raise ClanError(msg)
|
||||
|
||||
@dataclass(frozen=True, eq=False)
|
||||
def collect_public_keys(self) -> Sequence[str]:
|
||||
keyring: Sequence[str] = []
|
||||
|
||||
if self == self.AGE:
|
||||
if keys := os.environ.get("SOPS_AGE_KEY"):
|
||||
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and
|
||||
# reads identities line by line. See age/keysource.go in
|
||||
# Sops, and age/parse.go in Age.
|
||||
for private_key in keys.strip().splitlines():
|
||||
public_key = get_public_age_key(private_key)
|
||||
log.info(
|
||||
f"Found age public key from a private key "
|
||||
f"in the environment (SOPS_AGE_KEY): {public_key}"
|
||||
)
|
||||
keyring.append(public_key)
|
||||
|
||||
def maybe_read_from_path(key_path: Path) -> None:
|
||||
try:
|
||||
# as in parse.go in age:
|
||||
lines = Path(key_path).read_text().strip().splitlines()
|
||||
for private_key in filter(lambda ln: not ln.startswith("#"), lines):
|
||||
public_key = get_public_age_key(private_key)
|
||||
log.info(
|
||||
f"Found age public key from a private key "
|
||||
f"in {key_path}: {public_key}"
|
||||
)
|
||||
keyring.append(public_key)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except Exception as ex:
|
||||
log.warn(f"Could not read age keys from {key_path}: {ex}")
|
||||
|
||||
# Sops will try every location, see age/keysource.go
|
||||
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
|
||||
maybe_read_from_path(Path(key_path))
|
||||
maybe_read_from_path(user_config_dir() / "sops/age/keys.txt")
|
||||
|
||||
return keyring
|
||||
|
||||
if self == self.PGP:
|
||||
if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"):
|
||||
for fp in pgp_fingerprints.strip().split(","):
|
||||
msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}"
|
||||
log.info(msg)
|
||||
keyring.append(fp)
|
||||
return keyring
|
||||
|
||||
msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class SopsKey:
|
||||
pubkey: str
|
||||
username: str
|
||||
# Two SopsKey are considered equal even
|
||||
# if they don't have the same username:
|
||||
username: str = dataclasses.field(compare=False)
|
||||
key_type: KeyType
|
||||
|
||||
def as_dict(self) -> dict[str, str]:
|
||||
@@ -44,6 +113,131 @@ class SopsKey:
|
||||
"type": self.key_type.name.lower(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def load_dir(cls, folder: Path) -> "SopsKey": # noqa: ANN102
|
||||
"""Load from the file named `keys.json` in the given directory."""
|
||||
pubkey, key_type = read_key(folder)
|
||||
username = ""
|
||||
return cls(pubkey, username, key_type)
|
||||
|
||||
@classmethod
|
||||
def collect_public_keys(cls) -> Sequence["SopsKey"]: # noqa: ANN102
|
||||
return [
|
||||
cls(pubkey=key, username="", key_type=key_type)
|
||||
for key_type in KeyType
|
||||
for key in key_type.collect_public_keys()
|
||||
]
|
||||
|
||||
|
||||
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
|
||||
ERROR_GENERIC = 1
|
||||
COULD_NOT_READ_INPUT_FILE = 2
|
||||
COULD_NOT_WRITE_OUTPUT_FILE = 3
|
||||
ERROR_DUMPING_TREE = 4
|
||||
ERROR_READING_CONFIG = 5
|
||||
ERROR_INVALID_KMS_ENCRYPTION_CONTEXT_FORMAT = 6
|
||||
ERROR_INVALID_SET_FORMAT = 7
|
||||
ERROR_CONFLICTING_PARAMETERS = 8
|
||||
ERROR_ENCRYPTING_MAC = 21
|
||||
ERROR_ENCRYPTING_TREE = 23
|
||||
ERROR_DECRYPTING_MAC = 24
|
||||
ERROR_DECRYPTING_TREE = 25
|
||||
CANNOT_CHANGE_KEYS_FROM_NON_EXISTENT_FILE = 49
|
||||
MAC_MISMATCH = 51
|
||||
MAC_NOT_FOUND = 52
|
||||
CONFIG_FILE_NOT_FOUND = 61
|
||||
KEYBOARD_INTERRUPT = 85
|
||||
INVALID_TREE_PATH_FORMAT = 91
|
||||
NEED_AT_LEAST_ONE_DOCUMENT = 92
|
||||
NO_FILE_SPECIFIED = 100
|
||||
COULD_NOT_RETRIEVE_KEY = 128
|
||||
NO_ENCRYPTION_KEY_FOUND = 111
|
||||
DUPLICATE_DECRYPTION_KEY_TYPE = 112
|
||||
FILE_HAS_NOT_BEEN_MODIFIED = 200
|
||||
NO_EDITOR_FOUND = 201
|
||||
FAILED_TO_COMPARE_VERSIONS = 202
|
||||
FILE_ALREADY_ENCRYPTED = 203
|
||||
|
||||
@classmethod
|
||||
def parse(cls, code: int) -> "ExitStatus | None": # noqa: ANN102
|
||||
return ExitStatus(code) if code in ExitStatus else None
|
||||
|
||||
|
||||
class Executer(Protocol):
|
||||
def __call__(
|
||||
self, cmd: list[str], *, env: dict[str, str] | None = None
|
||||
) -> CmdOut: ...
|
||||
|
||||
|
||||
class Operation(enum.StrEnum):
|
||||
DECRYPT = "decrypt"
|
||||
EDIT = "edit"
|
||||
ENCRYPT = "encrypt"
|
||||
UPDATE_KEYS = "updatekeys"
|
||||
|
||||
|
||||
def run(
|
||||
call: Operation,
|
||||
secret_path: Path,
|
||||
public_keys: Iterable[tuple[str, KeyType]],
|
||||
executer: Executer,
|
||||
) -> tuple[int, str]:
|
||||
"""Call the sops binary for the given operation."""
|
||||
# louis(2024-11-19): I regrouped the call into the sops binary into this
|
||||
# one place because calling into sops needs to be done with a carefully
|
||||
# setup context, and I don't feel good about the idea of having that logic
|
||||
# exist in multiple places.
|
||||
sops_cmd = ["sops"]
|
||||
environ = os.environ.copy()
|
||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||
if call == Operation.DECRYPT:
|
||||
sops_cmd.append("decrypt")
|
||||
else:
|
||||
# When sops is used to edit a file the config is only used at
|
||||
# file creation, otherwise the keys from the exising file are
|
||||
# used.
|
||||
sops_cmd.extend(["--config", manifest.name])
|
||||
|
||||
keys_by_type: dict[KeyType, list[str]] = {}
|
||||
keys_by_type = {key_type: [] for key_type in KeyType}
|
||||
for key, key_type in public_keys:
|
||||
keys_by_type[key_type].append(key)
|
||||
it = keys_by_type.items()
|
||||
key_groups = [{key_type.name.lower(): keys for key_type, keys in it}]
|
||||
rules = {"creation_rules": [{"key_groups": key_groups}]}
|
||||
json.dump(rules, manifest, indent=2)
|
||||
manifest.flush()
|
||||
|
||||
if call == Operation.ENCRYPT:
|
||||
# Remove SOPS env vars used to specify public keys to force
|
||||
# sops to use our config file [1]; so that the file gets
|
||||
# encrypted with our keys and not something leaking out of
|
||||
# the environment.
|
||||
#
|
||||
# [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229
|
||||
for var in os.environ:
|
||||
if var.startswith("SOPS_") and var not in { # allowed:
|
||||
"SOPS_GPG_EXEC",
|
||||
"SOPS_AGE_KEY",
|
||||
"SOPS_AGE_KEY_FILE",
|
||||
}:
|
||||
del environ[var]
|
||||
sops_cmd.extend(["encrypt", "--in-place"])
|
||||
elif call == Operation.UPDATE_KEYS:
|
||||
sops_cmd.extend(["updatekeys", "--yes"])
|
||||
elif call != Operation.EDIT:
|
||||
known_operations = ",".join(Operation.__members__.values())
|
||||
msg = (
|
||||
f"Unsupported sops operation {call.value} "
|
||||
f"(known operations: {known_operations})"
|
||||
)
|
||||
raise ClanError(msg)
|
||||
sops_cmd.append(str(secret_path))
|
||||
|
||||
cmd = nix_shell(["nixpkgs#sops"], sops_cmd)
|
||||
p = executer(cmd, env=environ)
|
||||
return p.returncode, p.stdout
|
||||
|
||||
|
||||
def get_public_age_key(privkey: str) -> str:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
|
||||
@@ -60,7 +254,7 @@ def get_public_age_key(privkey: str) -> str:
|
||||
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
|
||||
try:
|
||||
proc = run(cmd)
|
||||
proc = clan_cli.cmd.run(cmd)
|
||||
res = proc.stdout.strip()
|
||||
pubkey = None
|
||||
private_key = None
|
||||
@@ -122,7 +316,7 @@ def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def default_admin_key_path() -> Path:
|
||||
def default_admin_private_key_path() -> Path:
|
||||
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
||||
if raw_path:
|
||||
return Path(raw_path)
|
||||
@@ -131,37 +325,26 @@ def default_admin_key_path() -> Path:
|
||||
|
||||
@API.register
|
||||
def maybe_get_admin_public_key() -> None | SopsKey:
|
||||
age_key = os.environ.get("SOPS_AGE_KEY")
|
||||
pgp_key = os.environ.get("SOPS_PGP_FP")
|
||||
if age_key and pgp_key:
|
||||
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
|
||||
keyring = SopsKey.collect_public_keys()
|
||||
if len(keyring) == 0:
|
||||
return None
|
||||
|
||||
if len(keyring) > 1:
|
||||
last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]]
|
||||
msg = (
|
||||
f"Found more than {len(keyring)} public keys in your "
|
||||
f"environment/system and cannot decide which one to "
|
||||
f"use, first {len(last_3)}:\n\n"
|
||||
f"- {'\n- '.join(last_3)}\n\n"
|
||||
f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or "
|
||||
f"SOPS_PGP_FP appropriately"
|
||||
)
|
||||
raise ClanError(msg)
|
||||
if age_key:
|
||||
return SopsKey(
|
||||
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
|
||||
)
|
||||
if pgp_key:
|
||||
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
|
||||
|
||||
path = default_admin_key_path()
|
||||
if path.exists():
|
||||
return SopsKey(
|
||||
pubkey=get_public_age_key(path.read_text()),
|
||||
key_type=KeyType.AGE,
|
||||
username="",
|
||||
)
|
||||
|
||||
return None
|
||||
return keyring[0]
|
||||
|
||||
|
||||
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
|
||||
key = maybe_get_admin_public_key()
|
||||
if not key:
|
||||
return None
|
||||
return maybe_get_user_or_machine(flake_dir, key)
|
||||
|
||||
|
||||
def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
||||
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||
key = maybe_get_admin_public_key()
|
||||
if key:
|
||||
return ensure_user_or_machine(flake_dir, key)
|
||||
@@ -169,100 +352,104 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
|
||||
all_keys: dict[str, list[str]] = {
|
||||
key_type.lower(): [] for key_type in KeyType.__members__
|
||||
}
|
||||
for key, key_type in keys:
|
||||
all_keys[key_type.name.lower()].append(key)
|
||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
|
||||
manifest.flush()
|
||||
yield Path(manifest.name)
|
||||
|
||||
|
||||
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
|
||||
keys_sorted = sorted(keys)
|
||||
with sops_manifest(keys_sorted) as manifest:
|
||||
secret_path = secret_path / "secret"
|
||||
time_before = secret_path.stat().st_mtime
|
||||
cmd = nix_shell(
|
||||
["nixpkgs#sops"],
|
||||
[
|
||||
"sops",
|
||||
"--config",
|
||||
str(manifest),
|
||||
"updatekeys",
|
||||
"--yes",
|
||||
str(secret_path),
|
||||
],
|
||||
error_msg = f"Could not update keys for {secret_path}"
|
||||
executer = functools.partial(
|
||||
clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg
|
||||
)
|
||||
run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}")
|
||||
if time_before == secret_path.stat().st_mtime:
|
||||
return []
|
||||
return [secret_path]
|
||||
rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer)
|
||||
was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED
|
||||
return [secret_path] if was_modified else []
|
||||
|
||||
|
||||
def encrypt_file(
|
||||
secret_path: Path,
|
||||
content: IO[str] | str | bytes | None,
|
||||
content: str | IO[bytes] | bytes | None,
|
||||
pubkeys: list[tuple[str, KeyType]],
|
||||
) -> None:
|
||||
folder = secret_path.parent
|
||||
folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with sops_manifest(pubkeys) as manifest:
|
||||
if not content:
|
||||
args = ["sops", "--config", str(manifest)]
|
||||
args.extend([str(secret_path)])
|
||||
cmd = nix_shell(["nixpkgs#sops"], args)
|
||||
# Don't use our `run` here, because it breaks editor integration.
|
||||
# We never need this in our UI.
|
||||
p = subprocess.run(cmd, check=False)
|
||||
# returns 200 if the file is changed
|
||||
if p.returncode != 0 and p.returncode != 200:
|
||||
msg = (
|
||||
f"Failed to encrypt {secret_path}: sops exited with {p.returncode}"
|
||||
def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut:
|
||||
return CmdOut(
|
||||
stdout="",
|
||||
stderr="",
|
||||
cwd=Path.cwd(),
|
||||
env=env,
|
||||
command_list=cmd,
|
||||
returncode=subprocess.run(cmd, env=env, check=False).returncode,
|
||||
msg=None,
|
||||
)
|
||||
raise ClanError(msg)
|
||||
return
|
||||
|
||||
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
||||
with NamedTemporaryFile(delete=False) as f:
|
||||
rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer)
|
||||
status = ExitStatus.parse(rc)
|
||||
if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED:
|
||||
return
|
||||
msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}"
|
||||
raise ClanError(msg)
|
||||
|
||||
# lopter(2024-11-19): imo NamedTemporaryFile does RAII wrong since it
|
||||
# creates the file in __init__, when really it should be created in
|
||||
# __enter__ (that is in Python __enter__ is actually __init__ from a RAII
|
||||
# perspective, and __init__ should just be thought off as syntax sugar to
|
||||
# capture extra context), and now the linter is unhappy so hush it. Note
|
||||
# that if NamedTemporaryFile created the file in __enter__ then we'd have
|
||||
# to change exception handling:
|
||||
try:
|
||||
source = NamedTemporaryFile(dir="/dev/shm", delete=False) # noqa: SIM115
|
||||
except (FileNotFoundError, PermissionError):
|
||||
source = NamedTemporaryFile(delete=False) # noqa: SIM115
|
||||
try: # swap the secret:
|
||||
with source:
|
||||
if isinstance(content, str):
|
||||
Path(f.name).write_text(content)
|
||||
source.file.write(content.encode())
|
||||
elif isinstance(content, bytes):
|
||||
Path(f.name).write_bytes(content)
|
||||
elif isinstance(content, io.IOBase):
|
||||
with Path(f.name).open("w") as fd:
|
||||
shutil.copyfileobj(content, fd)
|
||||
source.file.write(content)
|
||||
elif isinstance(content, io.BufferedReader):
|
||||
# lopter@(2024-11-19): mypy is freaking out on the 1st
|
||||
# argument, idk why, it says:
|
||||
#
|
||||
# > Cannot infer type argument 1 of "copyfileobj"
|
||||
shutil.copyfileobj(content, source.file) # type: ignore[misc]
|
||||
else:
|
||||
msg = f"Invalid content type: {type(content)}"
|
||||
raise ClanError(msg)
|
||||
# we pass an empty manifest to pick up existing configuration of the user
|
||||
args = ["sops", "--config", str(manifest)]
|
||||
args.extend(["-i", "--encrypt", str(f.name)])
|
||||
cmd = nix_shell(["nixpkgs#sops"], args)
|
||||
run(cmd, log=Log.BOTH)
|
||||
executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH)
|
||||
run(Operation.ENCRYPT, Path(source.name), pubkeys, executer)
|
||||
# atomic copy of the encrypted file
|
||||
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
||||
shutil.copyfile(f.name, f2.name)
|
||||
Path(f2.name).rename(secret_path)
|
||||
with NamedTemporaryFile(dir=folder, delete=False) as dest:
|
||||
shutil.copyfile(source.name, dest.name)
|
||||
Path(dest.name).rename(secret_path)
|
||||
finally:
|
||||
with suppress(OSError):
|
||||
Path(f.name).unlink()
|
||||
Path(source.name).unlink()
|
||||
|
||||
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
with sops_manifest([]) as manifest:
|
||||
cmd = nix_shell(
|
||||
["nixpkgs#sops"],
|
||||
["sops", "--config", str(manifest), "--decrypt", str(secret_path)],
|
||||
# decryption uses private keys from the environment or default paths:
|
||||
no_public_keys_needed: list[tuple[str, KeyType]] = []
|
||||
executer = functools.partial(
|
||||
clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}"
|
||||
)
|
||||
res = run(cmd, error_msg=f"Could not decrypt {secret_path}")
|
||||
return res.stdout
|
||||
_, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer)
|
||||
return stdout
|
||||
|
||||
|
||||
def get_recipients(secret_path: Path) -> set[SopsKey]:
|
||||
sops_attrs = json.loads((secret_path / "secret").read_text())["sops"]
|
||||
return {
|
||||
SopsKey(
|
||||
pubkey=recipient[key_type.sops_recipient_attr],
|
||||
username="",
|
||||
key_type=key_type,
|
||||
)
|
||||
for key_type in KeyType
|
||||
for recipient in sops_attrs[key_type.name.lower()] or []
|
||||
}
|
||||
|
||||
|
||||
def get_meta(secret_path: Path) -> dict:
|
||||
|
||||
@@ -21,14 +21,15 @@ def secret_name_type(arg_value: str) -> str:
|
||||
def public_or_private_age_key_type(arg_value: str) -> str:
|
||||
if Path(arg_value).is_file():
|
||||
arg_value = Path(arg_value).read_text().strip()
|
||||
if arg_value.startswith("age1"):
|
||||
return arg_value.strip()
|
||||
if arg_value.startswith("AGE-SECRET-KEY-"):
|
||||
return get_public_age_key(arg_value)
|
||||
if not arg_value.startswith("age1"):
|
||||
msg = f"Please provide an age key starting with age1, got: '{arg_value}'"
|
||||
for line in arg_value.splitlines():
|
||||
if line.startswith("#"):
|
||||
continue
|
||||
if line.startswith("age1"):
|
||||
return line.strip()
|
||||
if line.startswith("AGE-SECRET-KEY-"):
|
||||
return get_public_age_key(line)
|
||||
msg = f"Please provide an age key starting with age1 or AGE-SECRET-KEY-, got: '{arg_value}'"
|
||||
raise ClanError(msg)
|
||||
return arg_value
|
||||
|
||||
|
||||
def group_or_user_name_type(what: str) -> Callable[[str], str]:
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import override
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.secrets import sops
|
||||
from clan_cli.secrets.folders import (
|
||||
sops_machines_folder,
|
||||
sops_secrets_folder,
|
||||
@@ -17,8 +17,8 @@ from clan_cli.secrets.secrets import (
|
||||
encrypt_secret,
|
||||
has_secret,
|
||||
)
|
||||
from clan_cli.secrets.sops import KeyType, generate_private_key
|
||||
from clan_cli.vars.generate import Generator, Var
|
||||
from clan_cli.vars.generate import Generator
|
||||
from clan_cli.vars.var import Var
|
||||
|
||||
from . import SecretStoreBase
|
||||
|
||||
@@ -52,7 +52,7 @@ class SecretStore(SecretStoreBase):
|
||||
|
||||
if has_machine(self.machine.flake_dir, self.machine.name):
|
||||
return
|
||||
priv_key, pub_key = generate_private_key()
|
||||
priv_key, pub_key = sops.generate_private_key()
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
sops_secrets_folder(self.machine.flake_dir)
|
||||
@@ -69,24 +69,18 @@ class SecretStore(SecretStoreBase):
|
||||
def user_has_access(
|
||||
self, user: str, generator: Generator, secret_name: str
|
||||
) -> bool:
|
||||
secret_path = self.secret_path(generator, secret_name)
|
||||
secret = json.loads((secret_path / "secret").read_text())
|
||||
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
|
||||
users_folder_path = sops_users_folder(self.machine.flake_dir)
|
||||
user_pubkey = json.loads((users_folder_path / user / "key.json").read_text())[
|
||||
"publickey"
|
||||
]
|
||||
return user_pubkey in recipients
|
||||
key_dir = sops_users_folder(self.machine.flake_dir) / user
|
||||
return self.key_has_access(key_dir, generator, secret_name)
|
||||
|
||||
def machine_has_access(self, generator: Generator, secret_name: str) -> bool:
|
||||
key_dir = sops_machines_folder(self.machine.flake_dir) / self.machine.name
|
||||
return self.key_has_access(key_dir, generator, secret_name)
|
||||
|
||||
def key_has_access(
|
||||
self, key_dir: Path, generator: Generator, secret_name: str
|
||||
) -> bool:
|
||||
secret_path = self.secret_path(generator, secret_name)
|
||||
secret = json.loads((secret_path / "secret").read_text())
|
||||
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
|
||||
machines_folder_path = sops_machines_folder(self.machine.flake_dir)
|
||||
machine_pubkey = json.loads(
|
||||
(machines_folder_path / self.machine.name / "key.json").read_text()
|
||||
)["publickey"]
|
||||
return machine_pubkey in recipients
|
||||
return sops.SopsKey.load_dir(key_dir) in sops.get_recipients(secret_path)
|
||||
|
||||
def secret_path(self, generator: Generator, secret_name: str) -> Path:
|
||||
return self.directory(generator, secret_name)
|
||||
@@ -141,7 +135,7 @@ class SecretStore(SecretStoreBase):
|
||||
secret_folder = self.secret_path(generator, name)
|
||||
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
|
||||
|
||||
def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]:
|
||||
def collect_keys_for_secret(self, path: Path) -> set[sops.SopsKey]:
|
||||
from clan_cli.secrets.secrets import (
|
||||
collect_keys_for_path,
|
||||
collect_keys_for_type,
|
||||
@@ -159,23 +153,24 @@ class SecretStore(SecretStoreBase):
|
||||
self.machine.flake_dir / "sops" / "groups" / group / "users"
|
||||
)
|
||||
)
|
||||
return keys
|
||||
|
||||
@override
|
||||
return {
|
||||
sops.SopsKey(pubkey=key, username="", key_type=key_type)
|
||||
for (key, key_type) in keys
|
||||
}
|
||||
|
||||
# }
|
||||
def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]:
|
||||
secret_path = self.secret_path(generator, name)
|
||||
recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"]
|
||||
current_recipients = {r["recipient"] for r in recipients_}
|
||||
wanted_recipients = {
|
||||
key[0] for key in self.collect_keys_for_secret(secret_path)
|
||||
}
|
||||
current_recipients = sops.get_recipients(secret_path)
|
||||
wanted_recipients = self.collect_keys_for_secret(secret_path)
|
||||
needs_update = current_recipients != wanted_recipients
|
||||
recipients_to_add = wanted_recipients - current_recipients
|
||||
var_id = f"{generator.name}/{name}"
|
||||
msg = (
|
||||
f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. "
|
||||
f"This could have been a malicious actor trying to add their keys, please investigate. "
|
||||
f"Added keys: {', '.join(recipients_to_add)}"
|
||||
f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}"
|
||||
)
|
||||
return needs_update, msg
|
||||
|
||||
|
||||
Reference in New Issue
Block a user