clan-cli: rebase sops changes on top of vars changes
vars changes in question are from commit: 8b94bc71bc
With this changeset the age specific sops logic that was added is now
generic.
To keep things simple, this changeset modifies `SopsKey` so that
`username` is ignored when comparing different keys. I don't really see
us relying on `username` and this makes `SopsKey` hashable, and usable
in a `set`, which is nice when you check that you have a particular key.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import dataclasses
|
||||
import enum
|
||||
import functools
|
||||
import io
|
||||
@@ -8,7 +9,6 @@ import shutil
|
||||
import subprocess
|
||||
from collections.abc import Iterable, Sequence
|
||||
from contextlib import suppress
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO, Any, Protocol
|
||||
@@ -34,11 +34,26 @@ class KeyType(enum.Enum):
|
||||
return cls.__members__.get(value.upper())
|
||||
return None
|
||||
|
||||
@property
|
||||
def sops_recipient_attr(self) -> str:
|
||||
"""Name of the attribute to get the recipient key from a Sops file."""
|
||||
if self == self.AGE:
|
||||
return "recipient"
|
||||
if self == self.PGP:
|
||||
return "fp"
|
||||
msg = (
|
||||
f"KeyType is not properly implemented: "
|
||||
f'"sops_recipient_attr" is missing for key type "{self.name}"'
|
||||
)
|
||||
raise ClanError(msg)
|
||||
|
||||
@dataclass(frozen=True, eq=False)
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class SopsKey:
|
||||
pubkey: str
|
||||
username: str
|
||||
# Two SopsKey are considered equal even
|
||||
# if they don't have the same username:
|
||||
username: str = dataclasses.field(compare=False)
|
||||
key_type: KeyType
|
||||
|
||||
def as_dict(self) -> dict[str, str]:
|
||||
@@ -48,6 +63,13 @@ class SopsKey:
|
||||
"type": self.key_type.name.lower(),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def load_dir(cls, dir: Path) -> "SopsKey": # noqa: ANN102
|
||||
"""Load from the file named `keys.json` in the given directory."""
|
||||
pubkey, key_type = read_key(dir)
|
||||
username = ""
|
||||
return cls(pubkey, username, key_type)
|
||||
|
||||
|
||||
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
|
||||
ERROR_GENERIC = 1
|
||||
@@ -389,6 +411,19 @@ def decrypt_file(secret_path: Path) -> str:
|
||||
return stdout
|
||||
|
||||
|
||||
def get_recipients(secret_path: Path) -> set[SopsKey]:
|
||||
sops_attrs = json.loads((secret_path / "secret").read_text())["sops"]
|
||||
return {
|
||||
SopsKey(
|
||||
pubkey=recipient[key_type.sops_recipient_attr],
|
||||
username="",
|
||||
key_type=key_type,
|
||||
)
|
||||
for key_type in KeyType
|
||||
for recipient in sops_attrs[key_type.name.lower()]
|
||||
}
|
||||
|
||||
|
||||
def get_meta(secret_path: Path) -> dict:
|
||||
meta_path = secret_path.parent / "meta.json"
|
||||
if not meta_path.exists():
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import override
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.secrets import sops
|
||||
from clan_cli.secrets.folders import (
|
||||
sops_machines_folder,
|
||||
sops_secrets_folder,
|
||||
@@ -17,8 +17,8 @@ from clan_cli.secrets.secrets import (
|
||||
encrypt_secret,
|
||||
has_secret,
|
||||
)
|
||||
from clan_cli.secrets.sops import KeyType, generate_private_key
|
||||
from clan_cli.vars.generate import Generator, Var
|
||||
from clan_cli.vars.generate import Generator
|
||||
from clan_cli.vars.var import Var
|
||||
|
||||
from . import SecretStoreBase
|
||||
|
||||
@@ -52,7 +52,7 @@ class SecretStore(SecretStoreBase):
|
||||
|
||||
if has_machine(self.machine.flake_dir, self.machine.name):
|
||||
return
|
||||
priv_key, pub_key = generate_private_key()
|
||||
priv_key, pub_key = sops.generate_private_key()
|
||||
encrypt_secret(
|
||||
self.machine.flake_dir,
|
||||
sops_secrets_folder(self.machine.flake_dir)
|
||||
@@ -69,24 +69,18 @@ class SecretStore(SecretStoreBase):
|
||||
def user_has_access(
|
||||
self, user: str, generator: Generator, secret_name: str
|
||||
) -> bool:
|
||||
secret_path = self.secret_path(generator, secret_name)
|
||||
secret = json.loads((secret_path / "secret").read_text())
|
||||
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
|
||||
users_folder_path = sops_users_folder(self.machine.flake_dir)
|
||||
user_pubkey = json.loads((users_folder_path / user / "key.json").read_text())[
|
||||
"publickey"
|
||||
]
|
||||
return user_pubkey in recipients
|
||||
key_dir = sops_users_folder(self.machine.flake_dir) / user
|
||||
return self.key_has_access(key_dir, generator, secret_name)
|
||||
|
||||
def machine_has_access(self, generator: Generator, secret_name: str) -> bool:
|
||||
key_dir = sops_machines_folder(self.machine.flake_dir) / self.machine.name
|
||||
return self.key_has_access(key_dir, generator, secret_name)
|
||||
|
||||
def key_has_access(
|
||||
self, key_dir: Path, generator: Generator, secret_name: str
|
||||
) -> bool:
|
||||
secret_path = self.secret_path(generator, secret_name)
|
||||
secret = json.loads((secret_path / "secret").read_text())
|
||||
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
|
||||
machines_folder_path = sops_machines_folder(self.machine.flake_dir)
|
||||
machine_pubkey = json.loads(
|
||||
(machines_folder_path / self.machine.name / "key.json").read_text()
|
||||
)["publickey"]
|
||||
return machine_pubkey in recipients
|
||||
return sops.SopsKey.load_dir(key_dir) in sops.get_recipients(secret_path)
|
||||
|
||||
def secret_path(self, generator: Generator, secret_name: str) -> Path:
|
||||
return self.directory(generator, secret_name)
|
||||
@@ -141,7 +135,7 @@ class SecretStore(SecretStoreBase):
|
||||
secret_folder = self.secret_path(generator, name)
|
||||
add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
|
||||
|
||||
def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]:
|
||||
def collect_keys_for_secret(self, path: Path) -> set[sops.SopsKey]:
|
||||
from clan_cli.secrets.secrets import (
|
||||
collect_keys_for_path,
|
||||
collect_keys_for_type,
|
||||
@@ -159,23 +153,24 @@ class SecretStore(SecretStoreBase):
|
||||
self.machine.flake_dir / "sops" / "groups" / group / "users"
|
||||
)
|
||||
)
|
||||
return keys
|
||||
|
||||
@override
|
||||
return {
|
||||
sops.SopsKey(pubkey=key, username="", key_type=key_type)
|
||||
for (key, key_type) in keys
|
||||
}
|
||||
|
||||
# }
|
||||
def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]:
|
||||
secret_path = self.secret_path(generator, name)
|
||||
recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"]
|
||||
current_recipients = {r["recipient"] for r in recipients_}
|
||||
wanted_recipients = {
|
||||
key[0] for key in self.collect_keys_for_secret(secret_path)
|
||||
}
|
||||
current_recipients = sops.get_recipients(secret_path)
|
||||
wanted_recipients = self.collect_keys_for_secret(secret_path)
|
||||
needs_update = current_recipients != wanted_recipients
|
||||
recipients_to_add = wanted_recipients - current_recipients
|
||||
var_id = f"{generator.name}/{name}"
|
||||
msg = (
|
||||
f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. "
|
||||
f"This could have been a malicious actor trying to add their keys, please investigate. "
|
||||
f"Added keys: {', '.join(recipients_to_add)}"
|
||||
f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}"
|
||||
)
|
||||
return needs_update, msg
|
||||
|
||||
|
||||
Reference in New Issue
Block a user