clan-cli: secrets: address CR feedback for sops encryption and key handling
- Move public keys collection to a class method on `SopsKey`, and implement collection for each key type in `KeyType`, this helps make the code more generic ; - Replace `Operation.__call__` by `run` (`sops.run` if you import the entire module), that allows us to dedent the code so that's cool ; - Fix exception handling when trying to get a in-memory temporary file ; - Make Executor cuter 😵🪦.
This commit is contained in:
@@ -94,7 +94,7 @@ def collect_keys_for_path(path: Path) -> set[tuple[str, sops.KeyType]]:
|
||||
def encrypt_secret(
|
||||
flake_dir: Path,
|
||||
secret_path: Path,
|
||||
value: IO[str] | str | bytes | None,
|
||||
value: IO[bytes] | str | bytes | None,
|
||||
add_users: list[str] | None = None,
|
||||
add_machines: list[str] | None = None,
|
||||
add_groups: list[str] | None = None,
|
||||
@@ -305,10 +305,10 @@ def list_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
|
||||
# I can't think of a good way to ensure that we have the private key for
|
||||
# the secret. I mean we could collect all private keys we could find and
|
||||
# then make sure we have the one for the secret, but that seems
|
||||
# complicated for little ux gain?
|
||||
# lopter(2024-10): I can't think of a good way to ensure that we have the
|
||||
# private key for the secret. I mean we could collect all private keys we
|
||||
# could find and then make sure we have the one for the secret, but that
|
||||
# seems complicated for little ux gain?
|
||||
path = secret_path / "secret"
|
||||
if not path.exists():
|
||||
msg = f"Secret '{secret_path!s}' does not exist"
|
||||
@@ -332,7 +332,7 @@ def is_tty_interactive() -> bool:
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
env_value = os.environ.get("SOPS_NIX_SECRET")
|
||||
secret_value: str | IO[str] | None = sys.stdin
|
||||
secret_value: str | IO[bytes] | None = sys.stdin.buffer
|
||||
if args.edit:
|
||||
secret_value = None
|
||||
elif env_value:
|
||||
|
||||
@@ -11,10 +11,10 @@ from collections.abc import Iterable, Sequence
|
||||
from contextlib import suppress
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO, Any, Protocol
|
||||
from typing import IO, Protocol
|
||||
|
||||
import clan_cli.cmd
|
||||
from clan_cli.api import API
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.dirs import user_config_dir
|
||||
from clan_cli.errors import ClanError, CmdOut
|
||||
from clan_cli.nix import nix_shell
|
||||
@@ -47,6 +47,56 @@ class KeyType(enum.Enum):
|
||||
)
|
||||
raise ClanError(msg)
|
||||
|
||||
def collect_public_keys(self) -> Sequence[str]:
|
||||
keyring: Sequence[str] = []
|
||||
|
||||
if self == self.AGE:
|
||||
if keys := os.environ.get("SOPS_AGE_KEY"):
|
||||
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and
|
||||
# reads identities line by line. See age/keysource.go in
|
||||
# Sops, and age/parse.go in Age.
|
||||
for private_key in keys.strip().splitlines():
|
||||
public_key = get_public_age_key(private_key)
|
||||
log.info(
|
||||
f"Found age public key from a private key "
|
||||
f"in the environment (SOPS_AGE_KEY): {public_key}"
|
||||
)
|
||||
keyring.append(public_key)
|
||||
|
||||
def maybe_read_from_path(key_path: Path) -> None:
|
||||
try:
|
||||
# as in parse.go in age:
|
||||
lines = Path(key_path).read_text().strip().splitlines()
|
||||
for private_key in filter(lambda ln: not ln.startswith("#"), lines):
|
||||
public_key = get_public_age_key(private_key)
|
||||
log.info(
|
||||
f"Found age public key from a private key "
|
||||
f"in {key_path}: {public_key}"
|
||||
)
|
||||
keyring.append(public_key)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except Exception as ex:
|
||||
log.warn(f"Could not read age keys from {key_path}: {ex}")
|
||||
|
||||
# Sops will try every location, see age/keysource.go
|
||||
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
|
||||
maybe_read_from_path(Path(key_path))
|
||||
maybe_read_from_path(user_config_dir() / "sops/age/keys.txt")
|
||||
|
||||
return keyring
|
||||
|
||||
if self == self.PGP:
|
||||
if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"):
|
||||
for fp in pgp_fingerprints.strip().split(","):
|
||||
msg = f"Found PGP public key in the environment (SOPS_PGP_FP): {fp}"
|
||||
log.info(msg)
|
||||
keyring.append(fp)
|
||||
return keyring
|
||||
|
||||
msg = f"KeyType {self.name.lower()} is missing an implementation for collect_public_keys"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class SopsKey:
|
||||
@@ -64,12 +114,20 @@ class SopsKey:
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def load_dir(cls, dir: Path) -> "SopsKey": # noqa: ANN102
|
||||
def load_dir(cls, folder: Path) -> "SopsKey": # noqa: ANN102
|
||||
"""Load from the file named `keys.json` in the given directory."""
|
||||
pubkey, key_type = read_key(dir)
|
||||
pubkey, key_type = read_key(folder)
|
||||
username = ""
|
||||
return cls(pubkey, username, key_type)
|
||||
|
||||
@classmethod
|
||||
def collect_public_keys(cls) -> Sequence["SopsKey"]: # noqa: ANN102
|
||||
return [
|
||||
cls(pubkey=key, username="", key_type=key_type)
|
||||
for key_type in KeyType
|
||||
for key in key_type.collect_public_keys()
|
||||
]
|
||||
|
||||
|
||||
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
|
||||
ERROR_GENERIC = 1
|
||||
@@ -105,28 +163,34 @@ class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
|
||||
return ExitStatus(code) if code in ExitStatus else None
|
||||
|
||||
|
||||
class Executor(Protocol):
|
||||
class Executer(Protocol):
|
||||
def __call__(
|
||||
self, cmd: list[str], *, env: dict[str, str] | None = None
|
||||
) -> CmdOut: ...
|
||||
|
||||
|
||||
class Operation(enum.StrEnum):
|
||||
decrypt = "decrypt"
|
||||
edit = "edit"
|
||||
encrypt = "encrypt"
|
||||
update_keys = "updatekeys"
|
||||
DECRYPT = "decrypt"
|
||||
EDIT = "edit"
|
||||
ENCRYPT = "encrypt"
|
||||
UPDATE_KEYS = "updatekeys"
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
|
||||
def run(
|
||||
call: Operation,
|
||||
secret_path: Path,
|
||||
public_keys: Iterable[tuple[str, KeyType]],
|
||||
executor: Executor,
|
||||
) -> tuple[int, str]:
|
||||
executer: Executer,
|
||||
) -> tuple[int, str]:
|
||||
"""Call the sops binary for the given operation."""
|
||||
# louis(2024-11-19): I regrouped the call into the sops binary into this
|
||||
# one place because calling into sops needs to be done with a carefully
|
||||
# setup context, and I don't feel good about the idea of having that logic
|
||||
# exist in multiple places.
|
||||
sops_cmd = ["sops"]
|
||||
environ = os.environ.copy()
|
||||
with NamedTemporaryFile(delete=False, mode="w") as manifest:
|
||||
if self == Operation.decrypt:
|
||||
if call == Operation.DECRYPT:
|
||||
sops_cmd.append("decrypt")
|
||||
else:
|
||||
# When sops is used to edit a file the config is only used at
|
||||
@@ -144,7 +208,7 @@ class Operation(enum.StrEnum):
|
||||
json.dump(rules, manifest, indent=2)
|
||||
manifest.flush()
|
||||
|
||||
if self == Operation.encrypt:
|
||||
if call == Operation.ENCRYPT:
|
||||
# Remove SOPS env vars used to specify public keys to force
|
||||
# sops to use our config file [1]; so that the file gets
|
||||
# encrypted with our keys and not something leaking out of
|
||||
@@ -160,19 +224,19 @@ class Operation(enum.StrEnum):
|
||||
}:
|
||||
del environ[var]
|
||||
sops_cmd.extend(["encrypt", "--in-place"])
|
||||
elif self == Operation.update_keys:
|
||||
elif call == Operation.UPDATE_KEYS:
|
||||
sops_cmd.extend(["updatekeys", "--yes"])
|
||||
elif self != Operation.edit:
|
||||
elif call != Operation.EDIT:
|
||||
known_operations = ",".join(Operation.__members__.values())
|
||||
msg = (
|
||||
f"Unsupported sops operation {self.value} "
|
||||
f"Unsupported sops operation {call.value} "
|
||||
f"(known operations: {known_operations})"
|
||||
)
|
||||
raise ClanError(msg)
|
||||
sops_cmd.append(str(secret_path))
|
||||
|
||||
cmd = nix_shell(["nixpkgs#sops"], sops_cmd)
|
||||
p = executor(cmd, env=environ)
|
||||
p = executer(cmd, env=environ)
|
||||
return p.returncode, p.stdout
|
||||
|
||||
|
||||
@@ -191,7 +255,7 @@ def get_public_age_key(privkey: str) -> str:
|
||||
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen"])
|
||||
try:
|
||||
proc = run(cmd)
|
||||
proc = clan_cli.cmd.run(cmd)
|
||||
res = proc.stdout.strip()
|
||||
pubkey = None
|
||||
private_key = None
|
||||
@@ -262,21 +326,17 @@ def default_admin_private_key_path() -> Path:
|
||||
|
||||
@API.register
|
||||
def maybe_get_admin_public_key() -> None | SopsKey:
|
||||
keyring = collect_public_keys()
|
||||
keyring = SopsKey.collect_public_keys()
|
||||
if len(keyring) == 0:
|
||||
return None
|
||||
|
||||
if len(keyring) > 1:
|
||||
# louis@(2024-10-22):
|
||||
#
|
||||
# This is confusing when it shows up and you have no information
|
||||
# about where each key is going from, could we log the discovery
|
||||
# of each key?
|
||||
last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]]
|
||||
msg = (
|
||||
f"Found more than {len(keyring)} public keys in your "
|
||||
f"environment/system and cannot decide which one to "
|
||||
f"use, first three:\n\n"
|
||||
f"- {'\n- '.join(str(key.as_dict()) for key in keyring[:3])}\n\n"
|
||||
f"use, first {len(last_3)}:\n\n"
|
||||
f"- {'\n- '.join(last_3)}\n\n"
|
||||
f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or "
|
||||
f"SOPS_PGP_FP appropriately"
|
||||
)
|
||||
@@ -285,48 +345,6 @@ def maybe_get_admin_public_key() -> None | SopsKey:
|
||||
return keyring[0]
|
||||
|
||||
|
||||
def collect_public_keys() -> Sequence[SopsKey]:
|
||||
username = ""
|
||||
keyring: list[SopsKey] = []
|
||||
|
||||
for private_key in collect_private_age_keys():
|
||||
public_key = get_public_age_key(private_key)
|
||||
keyring.append(SopsKey(public_key, username, KeyType.AGE))
|
||||
|
||||
if pgp_fingerprints := os.environ.get("SOPS_PGP_FP"):
|
||||
for fp in pgp_fingerprints.strip().split(","):
|
||||
keyring.append(SopsKey(fp, username, KeyType.PGP))
|
||||
|
||||
return keyring
|
||||
|
||||
|
||||
def collect_private_age_keys() -> Sequence[str]:
|
||||
private_age_keys: list[str] = []
|
||||
|
||||
if keys := os.environ.get("SOPS_AGE_KEY"):
|
||||
# SOPS_AGE_KEY is fed into age.ParseIdentities by Sops, and reads
|
||||
# identities line by line. See age/keysource.go in Sops, and
|
||||
# age/parse.go in Age.
|
||||
private_age_keys.extend(keys.strip().splitlines())
|
||||
|
||||
def maybe_read_from_path(key_path: Path) -> None:
|
||||
try:
|
||||
contents = Path(key_path).read_text().strip()
|
||||
lines = contents.splitlines() # as in parse.go in age:
|
||||
private_age_keys.extend(ln for ln in lines if ln and not ln.startswith("#"))
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except Exception as ex:
|
||||
log.warn(f"Could not read age keys from {key_path}: {ex}")
|
||||
|
||||
# Sops will try every location, see age/keysource.go
|
||||
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
|
||||
maybe_read_from_path(Path(key_path))
|
||||
maybe_read_from_path(user_config_dir() / "sops/age/keys.txt")
|
||||
|
||||
return private_age_keys
|
||||
|
||||
|
||||
def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||
key = maybe_get_admin_public_key()
|
||||
if key:
|
||||
@@ -338,15 +356,17 @@ def ensure_admin_public_key(flake_dir: Path) -> SopsKey:
|
||||
def update_keys(secret_path: Path, keys: Iterable[tuple[str, KeyType]]) -> list[Path]:
|
||||
secret_path = secret_path / "secret"
|
||||
error_msg = f"Could not update keys for {secret_path}"
|
||||
executor = functools.partial(run, log=Log.BOTH, error_msg=error_msg)
|
||||
rc, _ = Operation.update_keys(secret_path, keys, executor)
|
||||
executer = functools.partial(
|
||||
clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH, error_msg=error_msg
|
||||
)
|
||||
rc, _ = run(Operation.UPDATE_KEYS, secret_path, keys, executer)
|
||||
was_modified = ExitStatus.parse(rc) != ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED
|
||||
return [secret_path] if was_modified else []
|
||||
|
||||
|
||||
def encrypt_file(
|
||||
secret_path: Path,
|
||||
content: IO[str] | str | bytes | None,
|
||||
content: str | IO[bytes] | bytes | None,
|
||||
pubkeys: list[tuple[str, KeyType]],
|
||||
) -> None:
|
||||
folder = secret_path.parent
|
||||
@@ -355,7 +375,7 @@ def encrypt_file(
|
||||
if not content:
|
||||
# Don't use our `run` here, because it breaks editor integration.
|
||||
# We never need this in our UI.
|
||||
def executor(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut:
|
||||
def executer(cmd: list[str], *, env: dict[str, str] | None = None) -> CmdOut:
|
||||
return CmdOut(
|
||||
stdout="",
|
||||
stderr="",
|
||||
@@ -366,48 +386,57 @@ def encrypt_file(
|
||||
msg=None,
|
||||
)
|
||||
|
||||
rc, _ = Operation.edit(secret_path, pubkeys, executor)
|
||||
rc, _ = run(Operation.EDIT, secret_path, pubkeys, executer)
|
||||
status = ExitStatus.parse(rc)
|
||||
if rc == 0 or status == ExitStatus.FILE_HAS_NOT_BEEN_MODIFIED:
|
||||
return
|
||||
msg = f"Failed to encrypt {secret_path}: sops exited with {status or rc}"
|
||||
raise ClanError(msg)
|
||||
|
||||
def swap_secret(f: Any) -> None: # f's type is not exposed by tempfile
|
||||
# lopter(2024-11-19): imo NamedTemporaryFile does RAII wrong since it
|
||||
# creates the file in __init__, when really it should be created in
|
||||
# __enter__ (that is in Python __enter__ is actually __init__ from a RAII
|
||||
# perspective, and __init__ should just be thought off as syntax sugar to
|
||||
# capture extra context), and now the linter is unhappy so hush it. Note
|
||||
# that if NamedTemporaryFile created the file in __enter__ then we'd have
|
||||
# to change exception handling:
|
||||
try:
|
||||
source = NamedTemporaryFile(dir="/dev/shm", delete=False) # noqa: SIM115
|
||||
except (FileNotFoundError, PermissionError):
|
||||
source = NamedTemporaryFile(delete=False) # noqa: SIM115
|
||||
try: # swap the secret:
|
||||
with source:
|
||||
if isinstance(content, str):
|
||||
Path(f.name).write_text(content)
|
||||
source.file.write(content.encode())
|
||||
elif isinstance(content, bytes):
|
||||
Path(f.name).write_bytes(content)
|
||||
elif isinstance(content, io.IOBase):
|
||||
with Path(f.name).open("w") as fd:
|
||||
shutil.copyfileobj(content, fd)
|
||||
source.file.write(content)
|
||||
elif isinstance(content, io.BufferedReader):
|
||||
# lopter@(2024-11-19): mypy is freaking out on the 1st
|
||||
# argument, idk why, it says:
|
||||
#
|
||||
# > Cannot infer type argument 1 of "copyfileobj"
|
||||
shutil.copyfileobj(content, source.file) # type: ignore[misc]
|
||||
else:
|
||||
msg = f"Invalid content type: {type(content)}"
|
||||
raise ClanError(msg)
|
||||
Operation.encrypt(f.name, pubkeys, functools.partial(run, log=Log.BOTH))
|
||||
executer = functools.partial(clan_cli.cmd.run, log=clan_cli.cmd.Log.BOTH)
|
||||
run(Operation.ENCRYPT, Path(source.name), pubkeys, executer)
|
||||
# atomic copy of the encrypted file
|
||||
with NamedTemporaryFile(dir=folder, delete=False) as f2:
|
||||
shutil.copyfile(f.name, f2.name)
|
||||
Path(f2.name).rename(secret_path)
|
||||
with NamedTemporaryFile(dir=folder, delete=False) as dest:
|
||||
shutil.copyfile(source.name, dest.name)
|
||||
Path(dest.name).rename(secret_path)
|
||||
finally:
|
||||
with suppress(OSError):
|
||||
Path(f.name).unlink()
|
||||
|
||||
try:
|
||||
with NamedTemporaryFile(dir="/dev/shm", delete=False) as f:
|
||||
swap_secret(f)
|
||||
except (FileNotFoundError, PermissionError):
|
||||
# hopefully /tmp is written to an in-memory file to avoid leaking secrets
|
||||
with NamedTemporaryFile(delete=False) as f:
|
||||
swap_secret(f)
|
||||
Path(source.name).unlink()
|
||||
|
||||
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
# decryption uses private keys from the environment or default paths:
|
||||
no_public_keys_needed: list[tuple[str, KeyType]] = []
|
||||
executor = functools.partial(run, error_msg=f"Could not decrypt {secret_path}")
|
||||
_, stdout = Operation.decrypt(secret_path, no_public_keys_needed, executor)
|
||||
executer = functools.partial(
|
||||
clan_cli.cmd.run, error_msg=f"Could not decrypt {secret_path}"
|
||||
)
|
||||
_, stdout = run(Operation.DECRYPT, secret_path, no_public_keys_needed, executer)
|
||||
return stdout
|
||||
|
||||
|
||||
@@ -420,7 +449,7 @@ def get_recipients(secret_path: Path) -> set[SopsKey]:
|
||||
key_type=key_type,
|
||||
)
|
||||
for key_type in KeyType
|
||||
for recipient in sops_attrs[key_type.name.lower()]
|
||||
for recipient in sops_attrs[key_type.name.lower()] or []
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user