Revert "Merge pull request 'clan-cli: secrets: Add support for PGP keys with sops-nix' (#2186) from lopter/clan-core:lo-sops-nix-pgp-support into main"

This reverts commit b956b94039, reversing
changes made to b1af3d5d6d.

Reverting for now as Dave's recent change conflicts with this change.
This commit is contained in:
Jörg Thalheim
2024-10-04 17:54:17 +02:00
parent 66a94c91ae
commit d134d94a1e
11 changed files with 131 additions and 361 deletions

View File

@@ -1,4 +1,3 @@
import enum
import io
import json
import os
@@ -20,32 +19,13 @@ from clan_cli.nix import nix_shell
from .folders import sops_machines_folder, sops_users_folder
class KeyType(enum.Enum):
AGE = enum.auto()
PGP = enum.auto()
@classmethod
def validate(cls, value: str | None) -> "KeyType | None": # noqa: ANN102
if value:
return cls.__members__.get(value.upper())
return None
@dataclass(frozen=True, eq=False)
@dataclass
class SopsKey:
pubkey: str
username: str
key_type: KeyType
def as_dict(self) -> dict[str, str]:
return {
"publickey": self.pubkey,
"username": self.username,
"type": self.key_type.name.lower(),
}
def get_public_age_key(privkey: str) -> str:
def get_public_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try:
res = subprocess.run(
@@ -98,7 +78,8 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
def maybe_get_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey | None:
key = SopsKey(pub_key, username="")
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
@@ -106,20 +87,20 @@ def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
for user in folder.iterdir():
if not (user / "key.json").exists():
continue
this_pub_key, this_key_type = read_key(user)
if key.pubkey == this_pub_key and key.key_type == this_key_type:
return SopsKey(key.pubkey, user.name, key.key_type)
if read_key(user) == pub_key:
key.username = user.name
return key
return None
@API.register
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
maybe_key = maybe_get_user_or_machine(flake_dir, key)
if maybe_key:
return maybe_key
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
key = maybe_get_user_or_machine(flake_dir, pub_key)
if not key:
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
raise ClanError(msg)
return key
def default_admin_key_path() -> Path:
@@ -130,59 +111,43 @@ def default_admin_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> None | SopsKey:
age_key = os.environ.get("SOPS_AGE_KEY")
pgp_key = os.environ.get("SOPS_PGP_FP")
if age_key and pgp_key:
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
raise ClanError(msg)
if age_key:
return SopsKey(
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
)
if pgp_key:
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
def maybe_get_admin_public_key() -> str | None:
key = os.environ.get("SOPS_AGE_KEY")
if key:
return get_public_key(key)
path = default_admin_key_path()
if path.exists():
return SopsKey(
pubkey=get_public_age_key(path.read_text()),
key_type=KeyType.AGE,
username="",
)
return get_public_key(path.read_text())
return None
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
key = maybe_get_admin_public_key()
if not key:
return None
return maybe_get_user_or_machine(flake_dir, key)
pub_key = maybe_get_admin_public_key()
if pub_key:
return maybe_get_user_or_machine(flake_dir, pub_key)
return None
def ensure_admin_key(flake_dir: Path) -> SopsKey:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
pub_key = maybe_get_admin_public_key()
if not pub_key:
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
return ensure_user_or_machine(flake_dir, pub_key)
@contextmanager
def sops_manifest(keys: list[tuple[str, KeyType]]) -> Iterator[Path]:
all_keys: dict[str, list[str]] = {
key_type.lower(): [] for key_type in KeyType.__members__
}
for key, key_type in keys:
all_keys[key_type.name.lower()].append(key)
def sops_manifest(keys: list[str]) -> Iterator[Path]:
with NamedTemporaryFile(delete=False, mode="w") as manifest:
json.dump({"creation_rules": [{"key_groups": [all_keys]}]}, manifest, indent=2)
json.dump(
{"creation_rules": [{"key_groups": [{"age": keys}]}]}, manifest, indent=2
)
manifest.flush()
yield Path(manifest.name)
def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path]:
def update_keys(secret_path: Path, keys: list[str]) -> list[Path]:
with sops_manifest(keys) as manifest:
secret_path = secret_path / "secret"
time_before = secret_path.stat().st_mtime
@@ -206,7 +171,7 @@ def update_keys(secret_path: Path, keys: list[tuple[str, KeyType]]) -> list[Path
def encrypt_file(
secret_path: Path,
content: IO[str] | str | bytes | None,
pubkeys: list[tuple[str, KeyType]],
pubkeys: list[str],
) -> None:
folder = secret_path.parent
folder.mkdir(parents=True, exist_ok=True)
@@ -272,7 +237,7 @@ def get_meta(secret_path: Path) -> dict:
return json.load(f)
def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) -> None:
def write_key(path: Path, publickey: str, overwrite: bool) -> None:
path.mkdir(parents=True, exist_ok=True)
try:
flags = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
@@ -283,23 +248,21 @@ def write_key(path: Path, publickey: str, key_type: KeyType, overwrite: bool) ->
msg = f"{path.name} already exists in {path}. Use --force to overwrite."
raise ClanError(msg) from e
with os.fdopen(fd, "w") as f:
contents = {"publickey": publickey, "type": key_type.name.lower()}
json.dump(contents, f, indent=2)
json.dump({"publickey": publickey, "type": "age"}, f, indent=2)
def read_key(path: Path) -> tuple[str, KeyType]:
def read_key(path: Path) -> str:
with Path(path / "key.json").open() as f:
try:
key = json.load(f)
except json.JSONDecodeError as e:
msg = f"Failed to decode {path.name}: {e}"
raise ClanError(msg) from e
key_type = KeyType.validate(key.get("type"))
if key_type is None:
msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})."
if key["type"] != "age":
msg = f"{path.name} is not an age key but {key['type']}. This is not supported"
raise ClanError(msg)
publickey = key.get("publickey")
if not publickey:
msg = f"{path.name} does not contain a public key"
raise ClanError(msg)
return publickey, key_type
return publickey