clan-cli: rebase sops changes on top of vars changes

vars changes in question are from commit: 54b8f5904e

With this changeset the age specific sops logic that was added is now
generic.

To keep things simple, this changeset modifies `SopsKey` so that
`username` is ignored when comparing different keys. I don't really see
us relying on `username` and this makes `SopsKey` hashable, and usable
in a `set`, which is nice when you check that you have a particular key.
This commit is contained in:
Louis Opter
2024-11-18 17:11:25 -08:00
committed by Mic92
parent 8d53568d95
commit 1ba27196d8
2 changed files with 61 additions and 31 deletions

View File

@@ -1,3 +1,4 @@
import dataclasses
import enum import enum
import functools import functools
import io import io
@@ -8,7 +9,6 @@ import shutil
import subprocess import subprocess
from collections.abc import Iterable, Sequence from collections.abc import Iterable, Sequence
from contextlib import suppress from contextlib import suppress
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import IO, Any, Protocol from typing import IO, Any, Protocol
@@ -34,11 +34,26 @@ class KeyType(enum.Enum):
return cls.__members__.get(value.upper()) return cls.__members__.get(value.upper())
return None return None
@property
def sops_recipient_attr(self) -> str:
"""Name of the attribute to get the recipient key from a Sops file."""
if self == self.AGE:
return "recipient"
if self == self.PGP:
return "fp"
msg = (
f"KeyType is not properly implemented: "
f'"sops_recipient_attr" is missing for key type "{self.name}"'
)
raise ClanError(msg)
@dataclass(frozen=True, eq=False)
@dataclasses.dataclass(frozen=True)
class SopsKey: class SopsKey:
pubkey: str pubkey: str
username: str # Two SopsKey are considered equal even
# if they don't have the same username:
username: str = dataclasses.field(compare=False)
key_type: KeyType key_type: KeyType
def as_dict(self) -> dict[str, str]: def as_dict(self) -> dict[str, str]:
@@ -48,6 +63,13 @@ class SopsKey:
"type": self.key_type.name.lower(), "type": self.key_type.name.lower(),
} }
@classmethod
def load_dir(cls, dir: Path) -> "SopsKey": # noqa: ANN102
"""Load from the file named `keys.json` in the given directory."""
pubkey, key_type = read_key(dir)
username = ""
return cls(pubkey, username, key_type)
class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go class ExitStatus(enum.IntEnum): # see: cmd/sops/codes/codes.go
ERROR_GENERIC = 1 ERROR_GENERIC = 1
@@ -389,6 +411,19 @@ def decrypt_file(secret_path: Path) -> str:
return stdout return stdout
def get_recipients(secret_path: Path) -> set[SopsKey]:
sops_attrs = json.loads((secret_path / "secret").read_text())["sops"]
return {
SopsKey(
pubkey=recipient[key_type.sops_recipient_attr],
username="",
key_type=key_type,
)
for key_type in KeyType
for recipient in sops_attrs[key_type.name.lower()]
}
def get_meta(secret_path: Path) -> dict: def get_meta(secret_path: Path) -> dict:
meta_path = secret_path.parent / "meta.json" meta_path = secret_path.parent / "meta.json"
if not meta_path.exists(): if not meta_path.exists():

View File

@@ -1,10 +1,10 @@
import json
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import override from typing import override
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.secrets import sops
from clan_cli.secrets.folders import ( from clan_cli.secrets.folders import (
sops_machines_folder, sops_machines_folder,
sops_secrets_folder, sops_secrets_folder,
@@ -17,8 +17,8 @@ from clan_cli.secrets.secrets import (
encrypt_secret, encrypt_secret,
has_secret, has_secret,
) )
from clan_cli.secrets.sops import KeyType, generate_private_key from clan_cli.vars.generate import Generator
from clan_cli.vars.generate import Generator, Var from clan_cli.vars.var import Var
from . import SecretStoreBase from . import SecretStoreBase
@@ -52,7 +52,7 @@ class SecretStore(SecretStoreBase):
if has_machine(self.machine.flake_dir, self.machine.name): if has_machine(self.machine.flake_dir, self.machine.name):
return return
priv_key, pub_key = generate_private_key() priv_key, pub_key = sops.generate_private_key()
encrypt_secret( encrypt_secret(
self.machine.flake_dir, self.machine.flake_dir,
sops_secrets_folder(self.machine.flake_dir) sops_secrets_folder(self.machine.flake_dir)
@@ -69,24 +69,18 @@ class SecretStore(SecretStoreBase):
def user_has_access( def user_has_access(
self, user: str, generator: Generator, secret_name: str self, user: str, generator: Generator, secret_name: str
) -> bool: ) -> bool:
secret_path = self.secret_path(generator, secret_name) key_dir = sops_users_folder(self.machine.flake_dir) / user
secret = json.loads((secret_path / "secret").read_text()) return self.key_has_access(key_dir, generator, secret_name)
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
users_folder_path = sops_users_folder(self.machine.flake_dir)
user_pubkey = json.loads((users_folder_path / user / "key.json").read_text())[
"publickey"
]
return user_pubkey in recipients
def machine_has_access(self, generator: Generator, secret_name: str) -> bool: def machine_has_access(self, generator: Generator, secret_name: str) -> bool:
key_dir = sops_machines_folder(self.machine.flake_dir) / self.machine.name
return self.key_has_access(key_dir, generator, secret_name)
def key_has_access(
self, key_dir: Path, generator: Generator, secret_name: str
) -> bool:
secret_path = self.secret_path(generator, secret_name) secret_path = self.secret_path(generator, secret_name)
secret = json.loads((secret_path / "secret").read_text()) return sops.SopsKey.load_dir(key_dir) in sops.get_recipients(secret_path)
recipients = [r["recipient"] for r in (secret["sops"].get("age") or [])]
machines_folder_path = sops_machines_folder(self.machine.flake_dir)
machine_pubkey = json.loads(
(machines_folder_path / self.machine.name / "key.json").read_text()
)["publickey"]
return machine_pubkey in recipients
def secret_path(self, generator: Generator, secret_name: str) -> Path: def secret_path(self, generator: Generator, secret_name: str) -> Path:
return self.directory(generator, secret_name) return self.directory(generator, secret_name)
@@ -141,7 +135,7 @@ class SecretStore(SecretStoreBase):
secret_folder = self.secret_path(generator, name) secret_folder = self.secret_path(generator, name)
add_secret(self.machine.flake_dir, self.machine.name, secret_folder) add_secret(self.machine.flake_dir, self.machine.name, secret_folder)
def collect_keys_for_secret(self, path: Path) -> set[tuple[str, KeyType]]: def collect_keys_for_secret(self, path: Path) -> set[sops.SopsKey]:
from clan_cli.secrets.secrets import ( from clan_cli.secrets.secrets import (
collect_keys_for_path, collect_keys_for_path,
collect_keys_for_type, collect_keys_for_type,
@@ -159,23 +153,24 @@ class SecretStore(SecretStoreBase):
self.machine.flake_dir / "sops" / "groups" / group / "users" self.machine.flake_dir / "sops" / "groups" / group / "users"
) )
) )
return keys
@override return {
sops.SopsKey(pubkey=key, username="", key_type=key_type)
for (key, key_type) in keys
}
# }
def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]: def needs_fix(self, generator: Generator, name: str) -> tuple[bool, str | None]:
secret_path = self.secret_path(generator, name) secret_path = self.secret_path(generator, name)
recipients_ = json.loads((secret_path / "secret").read_text())["sops"]["age"] current_recipients = sops.get_recipients(secret_path)
current_recipients = {r["recipient"] for r in recipients_} wanted_recipients = self.collect_keys_for_secret(secret_path)
wanted_recipients = {
key[0] for key in self.collect_keys_for_secret(secret_path)
}
needs_update = current_recipients != wanted_recipients needs_update = current_recipients != wanted_recipients
recipients_to_add = wanted_recipients - current_recipients recipients_to_add = wanted_recipients - current_recipients
var_id = f"{generator.name}/{name}" var_id = f"{generator.name}/{name}"
msg = ( msg = (
f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. " f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted. "
f"This could have been a malicious actor trying to add their keys, please investigate. " f"This could have been a malicious actor trying to add their keys, please investigate. "
f"Added keys: {', '.join(recipients_to_add)}" f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}"
) )
return needs_update, msg return needs_update, msg