clan-cli: filter any sops recipients set in the environment for encryption
This forces sops to use our config file, otherwise if any of the
environment variables set to specify recipients is present then
`--config` will be ignored (see [env_check]).
That's simple enough, still I ended up refactoring how we call sops for
correctness, and to align with its behavior. The code now distinguishes
between public and private keys explicitly. `secrets.decrypt_secret`
does not try to lookup for public and private keys anymore.
With this changeset, some people might have to adjust their environment
as public age and PGP keys will be discovered like sops would do. In
particular if multiple public keys are discovered, then the user will
have to specify which one to use for the clan.
This also makes the following changes:
- try to use `/dev/shm` when swapping a secret (it's what [pass] does
fwiw);
- alias immediate values for readability;
- remove some float comparison that could never succeed, and use sops'
exit status instead;
- remove unused function `maybe_get_sops_key`.
[env_check]: 8c567aa8a7/cmd/sops/main.go (L2229)
[pass]: http://passwordstore.org/
This commit is contained in:
@@ -114,8 +114,8 @@ def deploy_machine(machines: MachineGroup) -> None:
|
|||||||
|
|
||||||
def deploy(machine: Machine) -> None:
|
def deploy(machine: Machine) -> None:
|
||||||
host = machine.build_host
|
host = machine.build_host
|
||||||
generate_facts([machine])
|
generate_facts([machine], service=None, regenerate=False)
|
||||||
generate_vars([machine])
|
generate_vars([machine], generator_name=None, regenerate=False)
|
||||||
|
|
||||||
upload_secrets(machine)
|
upload_secrets(machine)
|
||||||
upload_secret_vars(machine)
|
upload_secret_vars(machine)
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ from clan_cli.git import commit_files
|
|||||||
from . import sops
|
from . import sops
|
||||||
from .secrets import update_secrets
|
from .secrets import update_secrets
|
||||||
from .sops import (
|
from .sops import (
|
||||||
default_admin_key_path,
|
default_admin_private_key_path,
|
||||||
generate_private_key,
|
generate_private_key,
|
||||||
maybe_get_admin_public_key,
|
maybe_get_admin_public_key,
|
||||||
)
|
)
|
||||||
@@ -23,7 +23,7 @@ def generate_key() -> sops.SopsKey:
|
|||||||
print(f"{key.key_type.name} key {key.pubkey} is already set")
|
print(f"{key.key_type.name} key {key.pubkey} is already set")
|
||||||
return key
|
return key
|
||||||
|
|
||||||
path = default_admin_key_path()
|
path = default_admin_private_key_path()
|
||||||
_, pub_key = generate_private_key(out_file=path)
|
_, pub_key = generate_private_key(out_file=path)
|
||||||
print(
|
print(
|
||||||
f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
||||||
@@ -62,8 +62,9 @@ def register_key_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
parser_generate = subparser.add_parser(
|
parser_generate = subparser.add_parser(
|
||||||
"generate",
|
"generate",
|
||||||
description=(
|
description=(
|
||||||
"Generate an age key for the Clan, "
|
"Generate an age key for the Clan, if you already have an age "
|
||||||
"to use PGP set `SOPS_PGP_FP` in your environment."
|
"or PGP key, then use it to create your user, see: "
|
||||||
|
"`clan secrets users add --help'"
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
parser_generate.set_defaults(func=generate_command)
|
parser_generate.set_defaults(func=generate_command)
|
||||||
|
|||||||
@@ -28,7 +28,12 @@ from .folders import (
|
|||||||
sops_secrets_folder,
|
sops_secrets_folder,
|
||||||
sops_users_folder,
|
sops_users_folder,
|
||||||
)
|
)
|
||||||
from .sops import decrypt_file, encrypt_file, ensure_admin_key, read_key, update_keys
|
from .sops import (
|
||||||
|
decrypt_file,
|
||||||
|
encrypt_file,
|
||||||
|
read_key,
|
||||||
|
update_keys,
|
||||||
|
)
|
||||||
from .types import VALID_SECRET_NAME, secret_name_type
|
from .types import VALID_SECRET_NAME, secret_name_type
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -101,9 +106,13 @@ def encrypt_secret(
|
|||||||
add_machines = []
|
add_machines = []
|
||||||
if add_users is None:
|
if add_users is None:
|
||||||
add_users = []
|
add_users = []
|
||||||
key = ensure_admin_key(flake_dir)
|
key = sops.ensure_admin_public_key(flake_dir)
|
||||||
recipient_keys = set()
|
recipient_keys = set()
|
||||||
|
|
||||||
|
# encrypt_secret can be called before the secret has been created
|
||||||
|
# so don't try to call sops.update_keys on a non-existent file:
|
||||||
|
do_update_keys = False
|
||||||
|
|
||||||
files_to_commit = []
|
files_to_commit = []
|
||||||
for user in add_users:
|
for user in add_users:
|
||||||
files_to_commit.extend(
|
files_to_commit.extend(
|
||||||
@@ -111,7 +120,7 @@ def encrypt_secret(
|
|||||||
users_folder(secret_path),
|
users_folder(secret_path),
|
||||||
sops_users_folder(flake_dir),
|
sops_users_folder(flake_dir),
|
||||||
user,
|
user,
|
||||||
False,
|
do_update_keys,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -121,7 +130,7 @@ def encrypt_secret(
|
|||||||
machines_folder(secret_path),
|
machines_folder(secret_path),
|
||||||
sops_machines_folder(flake_dir),
|
sops_machines_folder(flake_dir),
|
||||||
machine,
|
machine,
|
||||||
False,
|
do_update_keys,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -131,7 +140,7 @@ def encrypt_secret(
|
|||||||
groups_folder(secret_path),
|
groups_folder(secret_path),
|
||||||
sops_groups_folder(flake_dir),
|
sops_groups_folder(flake_dir),
|
||||||
group,
|
group,
|
||||||
False,
|
do_update_keys,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -144,7 +153,7 @@ def encrypt_secret(
|
|||||||
users_folder(secret_path),
|
users_folder(secret_path),
|
||||||
sops_users_folder(flake_dir),
|
sops_users_folder(flake_dir),
|
||||||
key.username,
|
key.username,
|
||||||
False,
|
do_update_keys,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -296,7 +305,10 @@ def list_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
|
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
|
||||||
ensure_admin_key(flake_dir)
|
# I can't think of a good way to ensure that we have the private key for
|
||||||
|
# the secret. I mean we could collect all private keys we could find and
|
||||||
|
# then make sure we have the one for the secret, but that seems
|
||||||
|
# complicated for little ux gain?
|
||||||
path = secret_path / "secret"
|
path = secret_path / "secret"
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
msg = f"Secret '{secret_path!s}' does not exist"
|
msg = f"Secret '{secret_path!s}' does not exist"
|
||||||
|
|||||||
@@ -1,24 +1,28 @@
|
|||||||
import enum
|
import enum
|
||||||
|
import functools
|
||||||
import io
|
import io
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
from collections.abc import Iterable, Iterator
|
from collections.abc import Iterable, Sequence
|
||||||
from contextlib import contextmanager, suppress
|
from contextlib import suppress
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from typing import IO
|
from typing import IO, Any, Protocol
|
||||||
|
|
||||||
from clan_cli.api import API
|
from clan_cli.api import API
|
||||||
from clan_cli.cmd import Log, run
|
from clan_cli.cmd import Log, run
|
||||||
from clan_cli.dirs import user_config_dir
|
from clan_cli.dirs import user_config_dir
|
||||||
from clan_cli.errors import ClanError
|
from clan_cli.errors import ClanError, CmdOut
|
||||||
from clan_cli.nix import nix_shell
|
from clan_cli.nix import nix_shell
|
||||||
|
|
||||||
from .folders import sops_machines_folder, sops_users_folder
|
from .folders import sops_machines_folder, sops_users_folder
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class KeyType(enum.Enum):
|
class KeyType(enum.Enum):
|
||||||
AGE = enum.auto()
|
AGE = enum.auto()
|
||||||
@@ -45,6 +49,111 @@ class SopsKey:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
|
||||||
|
ERROR_GENERIC = 1
|
||||||
|
COULD_NOT_READ_INPUT_FILE = 2
|
||||||
|
COULD_NOT_WRITE_OUTPUT_FILE = 3
|
||||||
|
ERROR_DUMPING_TREE = 4
|
||||||
|
ERROR_READING_CONFIG = 5
|
||||||
|
ERROR_INVALID_KMS_ENCRYPTION_CONTEXT_FORMAT = 6
|
||||||
|
ERROR_INVALID_SET_FORMAT = 7
|
||||||
|
ERROR_CONFLICTING_PARAMETERS = 8
|
||||||
|
ERROR_ENCRYPTING_MAC = 21
|
||||||
|
ERROR_ENCRYPTING_TREE = 23
|
||||||
|
ERROR_DECRYPTING_MAC = 24
|
||||||
|
ERROR_DECRYPTING_TREE = 25
|
||||||
|
CANNOT_CHANGE_KEYS_FROM_NON_EXISTENT_FILE = 49
|
||||||
|
MAC_MISMATCH = 51
|
||||||
|
MAC_NOT_FOUND = 52
|
||||||
|
CONFIG_FILE_NOT_FOUND = 61
|
||||||
|
KEYBOARD_INTERRUPT = 85
|
||||||
|
INVALID_TREE_PATH_FORMAT = 91
|
||||||
|
NEED_AT_LEAST_ONE_DOCUMENT = 92
|
||||||
|
NO_FILE_SPECIFIED = 100
|
||||||
|
COULD_NOT_RETRIEVE_KEY = 128
|
||||||
|
NO_ENCRYPTION_KEY_FOUND = 111
|
||||||
|
DUPLICATE_DECRYPTION_KEY_TYPE = 112
|
||||||
|
FILE_HAS_NOT_BEEN_MODIFIED = 200
|
||||||
|
NO_EDITOR_FOUND = 201
|
||||||
|
FAILED_TO_COMPARE_VERSIONS = 202
|
||||||
|
FILE_ALREADY_ENCRYPTED = 203
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def parse(cls, code: int) -> "ExitStatus | None": # noqa: ANN102
|
||||||
|
return ExitStatus(code) if code in ExitStatus else None
|
||||||
|
|
||||||
|
|
||||||
|
class Executor(Protocol):
|
||||||
|
def __call__(
|
||||||
|
self, cmd: list[str], *, env: dict[str, str] | None = None
|
||||||
|
) -> CmdOut: ...
|
||||||
|
|
||||||
|
|
||||||
|
class Operation(enum.StrEnum):
|
||||||
|
decrypt = "decrypt"
|
||||||
|
edit = "edit"
|
||||||
|
encrypt = "encrypt"
|
||||||
|
update_keys = "updatekeys"
|
||||||
|
|
||||||
|
def __call__(
|
||||||
|
self,
|
||||||
|
secret_path: Path,
|
||||||
|
public_keys: Iterable[tuple[str, KeyType]],
|
||||||
|
executor: Executor,
|
||||||
|
) -> tuple[int, str]:
|
||||||
|
sops_cmd = ["sops"]
|
||||||
|
environ = os.environ.copy()
|
||||||
|
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||||
|
if self == Operation.decrypt:
|
||||||
|
sops_cmd.append("decrypt")
|
||||||
|
else:
|
||||||
|
# When sops is used to edit a file the config is only used at
|
||||||
|
# file creation, otherwise the keys from the exising file are
|
||||||
|
# used.
|
||||||
|
sops_cmd.extend(["--config", manifest.name])
|
||||||
|
|
||||||
|
keys_by_type: dict[KeyType, list[str]] = {}
|
||||||
|
keys_by_type = {key_type: [] for key_type in KeyType}
|
||||||
|
for key, key_type in public_keys:
|
||||||
|
keys_by_type[key_type].append(key)
|
||||||
|
it = keys_by_type.items()
|
||||||
|
key_groups = [{key_type.name.lower(): keys for key_type, keys in it}]
|
||||||
|
rules = {"creation_rules": [{"key_groups": key_groups}]}
|
||||||
|
json.dump(rules, manifest, indent=2)
|
||||||
|
manifest.flush()
|
||||||
|
|
||||||
|
if self == Operation.encrypt:
|
||||||
|
# Remove SOPS env vars used to specify public keys to force
|
||||||
|
# sops to use our config file [1]; so that the file gets
|
||||||
|
# encrypted with our keys and not something leaking out of
|
||||||
|
# the environment.
|
||||||
|
#
|
||||||
|
# [1]: https://github.com/getsops/sops/blob/8c567aa8a7cf4802e251e87efc84a1c50b69d4f0/cmd/sops/main.go#L2229
|
||||||
|
for var in os.environ:
|
||||||
|
if var.startswith("SOPS_") and var not in { # allowed:
|
||||||
|
"SOPS_GPG_EXEC",
|
||||||
|
"SOPS_AGE_KEY",
|
||||||
|
"SOPS_AGE_KEY_FILE",
|
||||||
|
"SOPS_NIX_SECRET",
|
||||||
|
}:
|
||||||
|
del environ[var]
|
||||||
|
sops_cmd.extend(["encrypt", "--in-place"])
|
||||||
|
elif self == Operation.update_keys:
|
||||||
|
sops_cmd.extend(["updatekeys", "--yes"])
|
||||||
|
elif self != Operation.edit:
|
||||||
|
known_operations = ",".join(Operation.__members__.values())
|
||||||
|
msg = (
|
||||||
|
f"Unsupported sops operation {self.value} "
|
||||||
|
f"(known operations: {known_operations})"
|
||||||
|
)
|
||||||
|
raise ClanError(msg)
|
||||||
|
sops_cmd.append(str(secret_path))
|
||||||
|
|
||||||
|
cmd = nix_shell(["nixpkgs#sops"], sops_cmd)
|
||||||
|
p = executor(cmd, env=environ)
|
||||||
|
return p.returncode, p.stdout
|
||||||
|
|
||||||
|
|
||||||
def get_public_age_key(privkey: str) -> str:
|
def get_public_age_key(privkey: str) -> str:
|
||||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
|
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
|
||||||
try:
|
try:
|
||||||
@@ -122,7 +231,7 @@ def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
|
|||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
|
||||||
def default_admin_key_path() -> Path:
|
def default_admin_private_key_path() -> Path:
|
||||||
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
||||||
if raw_path:
|
if raw_path:
|
||||||
return Path(raw_path)
|
return Path(raw_path)
|
||||||
@@ -131,37 +240,67 @@ def default_admin_key_path() -> Path:
|
|||||||
|
|
||||||
@API.register
|
@API.register
|
||||||
def maybe_get_admin_public_key() -> None | SopsKey:
|
def maybe_get_admin_public_key() -> None | SopsKey:
|
||||||
age_key = os.environ.get("SOPS_AGE_KEY")
|
keyring = collect_public_keys()
|
||||||
pgp_key = os.environ.get("SOPS_PGP_FP")
|
if len(keyring) == 0:
|
||||||
if age_key and pgp_key:
|
|
||||||
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
|
|
||||||
raise ClanError(msg)
|
|
||||||
if age_key:
|
|
||||||
return SopsKey(
|
|
||||||
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
|
|
||||||
)
|
|
||||||
if pgp_key:
|
|
||||||
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
|
|
||||||
|
|
||||||
path = default_admin_key_path()
|
|
||||||
if path.exists():
|
|
||||||
return SopsKey(
|
|
||||||
pubkey=get_public_age_key(path.read_text()),
|
|
||||||
key_type=KeyType.AGE,
|
|
||||||
username="",
|
|
||||||
)
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
|
|
||||||
key = maybe_get_admin_public_key()
|
|
||||||
if not key:
|
|
||||||
return None
|
return None
|
||||||
return maybe_get_user_or_machine(flake_dir, key)
|
|
||||||
|
if len(keyring) > 1:
|
||||||
|
msg = (
|
||||||
|
f"Found more than {len(keyring)} public keys in your "
|
||||||
|
f"environment/system and cannot decide which one to "
|
||||||
|
f"use, first three:\n\n"
|
||||||
|
f"- {'\n- '.join(str(key.as_dict()) for key in keyring[:3])}\n\n"
|
||||||
|
f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or "
|
||||||
|
f"SOPS_PGP_FP appropriately"
|
||||||
|
)
|
||||||
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
return keyring[0]
|
||||||
|
|
||||||
|
|
||||||
def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
def collect_public_keys() -> Sequence[SopsKey]:
|
||||||
|
username = ""
|
||||||
|
keyring: list[SopsKey] = []
|
||||||
|
|
||||||
|
for private_key in collect_private_age_keys():
|
||||||
|
public_key = get_public_age_key(private_key)
|
||||||
|
keyring.append(SopsKey(public_key, username, KeyType.AGE))
|
||||||
|
|
||||||
|
if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"):
|
||||||
|
for fp in pgp_fingerprints.strip().split(","):
|
||||||
|
keyring.append(SopsKey(fp, username, KeyType.PGP))
|
||||||
|
|
||||||
|
return keyring
|
||||||
|
|
||||||
|
|
||||||
|
def collect_private_age_keys() -> Sequence[str]:
|
||||||
|
private_age_keys: list[str] = []
|
||||||
|
|
||||||
|
if keys := os.environ.get("SOPS_AGE_KEY"):
|
||||||
|
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and reads
|
||||||
|
# identities line by line. See age/keysource.go in Sops, and
|
||||||
|
# age/parse.go in Age.
|
||||||
|
private_age_keys.extend(keys.strip().splitlines())
|
||||||
|
|
||||||
|
def maybe_read_from_path(key_path: Path) -> None:
|
||||||
|
try:
|
||||||
|
contents = Path(key_path).read_text().strip()
|
||||||
|
lines = contents.splitlines() # as in parse.go in age:
|
||||||
|
private_age_keys.extend(l for l in lines if l and not l.startswith("#"))
|
||||||
|
except FileNotFoundError:
|
||||||
|
return
|
||||||
|
except Exception as ex:
|
||||||
|
log.warn(f"Could not read age keys from {key_path}: {ex}")
|
||||||
|
|
||||||
|
# Sops will try every location, see age/keysource.go
|
||||||
|
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
|
||||||
|
maybe_read_from_path(Path(key_path))
|
||||||
|
maybe_read_from_path(user_config_dir() / "sops/age/keys.txt")
|
||||||
|
|
||||||
|
return private_age_keys
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||||
key = maybe_get_admin_public_key()
|
key = maybe_get_admin_public_key()
|
||||||
if key:
|
if key:
|
||||||
return ensure_user_or_machine(flake_dir, key)
|
return ensure_user_or_machine(flake_dir, key)
|
||||||
@@ -169,39 +308,13 @@ def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
|||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
|
|
||||||
all_keys: dict[str, list[str]] = {
|
|
||||||
key_type.lower(): [] for key_type in KeyType.__members__
|
|
||||||
}
|
|
||||||
for key, key_type in keys:
|
|
||||||
all_keys[key_type.name.lower()].append(key)
|
|
||||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
|
||||||
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
|
|
||||||
manifest.flush()
|
|
||||||
yield Path(manifest.name)
|
|
||||||
|
|
||||||
|
|
||||||
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
|
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
|
||||||
keys_sorted = sorted(keys)
|
secret_path = secret_path / "secret"
|
||||||
with sops_manifest(keys_sorted) as manifest:
|
error_msg = f"Could not update keys for {secret_path}"
|
||||||
secret_path = secret_path / "secret"
|
executor = functools.partial(run, log=Log.BOTH, error_msg=error_msg)
|
||||||
time_before = secret_path.stat().st_mtime
|
rc, _ = Operation.update_keys(secret_path, keys, executor)
|
||||||
cmd = nix_shell(
|
was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED
|
||||||
["nixpkgs#sops"],
|
return [secret_path] if was_modified else []
|
||||||
[
|
|
||||||
"sops",
|
|
||||||
"--config",
|
|
||||||
str(manifest),
|
|
||||||
"updatekeys",
|
|
||||||
"--yes",
|
|
||||||
str(secret_path),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
run(cmd, log=Log.BOTH, error_msg=f"Could not update keys for {secret_path}")
|
|
||||||
if time_before == secret_path.stat().st_mtime:
|
|
||||||
return []
|
|
||||||
return [secret_path]
|
|
||||||
|
|
||||||
|
|
||||||
def encrypt_file(
|
def encrypt_file(
|
||||||
@@ -212,57 +325,63 @@ def encrypt_file(
|
|||||||
folder = secret_path.parent
|
folder = secret_path.parent
|
||||||
folder.mkdir(parents=True, exist_ok=True)
|
folder.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
with sops_manifest(pubkeys) as manifest:
|
if not content:
|
||||||
if not content:
|
# Don't use our `run` here, because it breaks editor integration.
|
||||||
args = ["sops", "--config", str(manifest)]
|
# We never need this in our UI.
|
||||||
args.extend([str(secret_path)])
|
def executor(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut:
|
||||||
cmd = nix_shell(["nixpkgs#sops"], args)
|
return CmdOut(
|
||||||
# Don't use our `run` here, because it breaks editor integration.
|
stdout="",
|
||||||
# We never need this in our UI.
|
stderr="",
|
||||||
p = subprocess.run(cmd, check=False)
|
cwd=Path.cwd(),
|
||||||
# returns 200 if the file is changed
|
env=env,
|
||||||
if p.returncode != 0 and p.returncode != 200:
|
command_list=cmd,
|
||||||
msg = (
|
returncode=subprocess.run(cmd, env=env, check=False).returncode,
|
||||||
f"Failed to encrypt {secret_path}: sops exited with {p.returncode}"
|
msg=None,
|
||||||
)
|
)
|
||||||
raise ClanError(msg)
|
|
||||||
return
|
|
||||||
|
|
||||||
|
rc, _ = Operation.edit(secret_path, pubkeys, executor)
|
||||||
|
status = ExitStatus.parse(rc)
|
||||||
|
if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED:
|
||||||
|
return
|
||||||
|
msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}"
|
||||||
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
def swap_secret(f: Any) -> None: # f's type is not exposed by tempfile
|
||||||
|
try:
|
||||||
|
if isinstance(content, str):
|
||||||
|
Path(f.name).write_text(content)
|
||||||
|
elif isinstance(content, bytes):
|
||||||
|
Path(f.name).write_bytes(content)
|
||||||
|
elif isinstance(content, io.IOBase):
|
||||||
|
with Path(f.name).open("w") as fd:
|
||||||
|
shutil.copyfileobj(content, fd)
|
||||||
|
else:
|
||||||
|
msg = f"Invalid content type: {type(content)}"
|
||||||
|
raise ClanError(msg)
|
||||||
|
Operation.encrypt(f.name, pubkeys, functools.partial(run, log=Log.BOTH))
|
||||||
|
# atomic copy of the encrypted file
|
||||||
|
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
||||||
|
shutil.copyfile(f.name, f2.name)
|
||||||
|
Path(f2.name).rename(secret_path)
|
||||||
|
finally:
|
||||||
|
with suppress(OSError):
|
||||||
|
Path(f.name).unlink()
|
||||||
|
|
||||||
|
try:
|
||||||
|
with NamedTemporaryFile(dir="/dev/shm", delete=False) as f:
|
||||||
|
swap_secret(f)
|
||||||
|
except (FileNotFoundError, PermissionError):
|
||||||
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
||||||
with NamedTemporaryFile(delete=False) as f:
|
with NamedTemporaryFile(delete=False) as f:
|
||||||
try:
|
swap_secret(f)
|
||||||
if isinstance(content, str):
|
|
||||||
Path(f.name).write_text(content)
|
|
||||||
elif isinstance(content, bytes):
|
|
||||||
Path(f.name).write_bytes(content)
|
|
||||||
elif isinstance(content, io.IOBase):
|
|
||||||
with Path(f.name).open("w") as fd:
|
|
||||||
shutil.copyfileobj(content, fd)
|
|
||||||
else:
|
|
||||||
msg = f"Invalid content type: {type(content)}"
|
|
||||||
raise ClanError(msg)
|
|
||||||
# we pass an empty manifest to pick up existing configuration of the user
|
|
||||||
args = ["sops", "--config", str(manifest)]
|
|
||||||
args.extend(["-i", "--encrypt", str(f.name)])
|
|
||||||
cmd = nix_shell(["nixpkgs#sops"], args)
|
|
||||||
run(cmd, log=Log.BOTH)
|
|
||||||
# atomic copy of the encrypted file
|
|
||||||
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
|
||||||
shutil.copyfile(f.name, f2.name)
|
|
||||||
Path(f2.name).rename(secret_path)
|
|
||||||
finally:
|
|
||||||
with suppress(OSError):
|
|
||||||
Path(f.name).unlink()
|
|
||||||
|
|
||||||
|
|
||||||
def decrypt_file(secret_path: Path) -> str:
|
def decrypt_file(secret_path: Path) -> str:
|
||||||
with sops_manifest([]) as manifest:
|
# decryption uses private keys from the environment or default paths:
|
||||||
cmd = nix_shell(
|
no_public_keys_needed: list[tuple[str, KeyType]] = []
|
||||||
["nixpkgs#sops"],
|
executor = functools.partial(run, error_msg=f"Could not decrypt {secret_path}")
|
||||||
["sops", "--config", str(manifest), "--decrypt", str(secret_path)],
|
_, stdout = Operation.decrypt(secret_path, no_public_keys_needed, executor)
|
||||||
)
|
return stdout
|
||||||
res = run(cmd, error_msg=f"Could not decrypt {secret_path}")
|
|
||||||
return res.stdout
|
|
||||||
|
|
||||||
|
|
||||||
def get_meta(secret_path: Path) -> dict:
|
def get_meta(secret_path: Path) -> dict:
|
||||||
|
|||||||
Reference in New Issue
Block a user