cli: don't error when more than one SOPS key found locally

This commit is contained in:
Michael Hoang
2025-06-12 16:18:46 +07:00
parent b3e1a35e63
commit 7f667ccc16
5 changed files with 41 additions and 45 deletions

View File

@@ -13,15 +13,16 @@ from .sops import (
default_admin_private_key_path, default_admin_private_key_path,
generate_private_key, generate_private_key,
load_age_plugins, load_age_plugins,
maybe_get_admin_public_key, maybe_get_admin_public_keys,
) )
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def generate_key() -> sops.SopsKey: def generate_key() -> sops.SopsKey:
key = maybe_get_admin_public_key() keys = maybe_get_admin_public_keys()
if key is not None: if keys is not None:
key = keys[0]
print(f"{key.key_type.name} key {key.pubkey} is already set", file=sys.stderr) print(f"{key.key_type.name} key {key.pubkey} is already set", file=sys.stderr)
return key return key
@@ -44,11 +45,11 @@ def generate_command(args: argparse.Namespace) -> None:
def show_command(args: argparse.Namespace) -> None: def show_command(args: argparse.Namespace) -> None:
key = sops.maybe_get_admin_public_key() keys = sops.maybe_get_admin_public_keys()
if not key: if not keys:
msg = "No public key found" msg = "No public key found"
raise ClanError(msg) raise ClanError(msg)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True) json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True)
def update_command(args: argparse.Namespace) -> None: def update_command(args: argparse.Namespace) -> None:

View File

@@ -7,7 +7,7 @@ import os
import re import re
import shutil import shutil
import subprocess import subprocess
from collections.abc import Iterable, Sequence from collections.abc import Iterable
from contextlib import suppress from contextlib import suppress
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
@@ -50,8 +50,8 @@ class KeyType(enum.Enum):
) )
raise ClanError(msg) raise ClanError(msg)
def collect_public_keys(self) -> Sequence[str]: def collect_public_keys(self) -> list[str]:
keyring: list[str] = [] keyring = []
if self == self.AGE: if self == self.AGE:
@@ -136,7 +136,7 @@ class SopsKey:
return read_keys(folder) return read_keys(folder)
@classmethod @classmethod
def collect_public_keys(cls) -> Sequence["SopsKey"]: def collect_public_keys(cls) -> list["SopsKey"]:
return [ return [
cls(pubkey=key, username="", key_type=key_type) cls(pubkey=key, username="", key_type=key_type)
for key_type in KeyType for key_type in KeyType
@@ -374,7 +374,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists") print(f"{flake_dir / user} already exists")
def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None: def maybe_get_user(flake_dir: Path, keys: set[SopsKey]) -> set[SopsKey] | None:
folder = sops_users_folder(flake_dir) folder = sops_users_folder(flake_dir)
if folder.exists(): if folder.exists():
@@ -382,9 +382,11 @@ def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
if not (user / "key.json").exists(): if not (user / "key.json").exists():
continue continue
keys = read_keys(user) user_keys = read_keys(user)
if key in keys: if len(keys.intersection(user_keys)):
return {SopsKey(key.pubkey, user.name, key.key_type) for key in keys} return {
SopsKey(key.pubkey, user.name, key.key_type) for key in user_keys
}
return None return None
@@ -397,39 +399,32 @@ def default_admin_private_key_path() -> Path:
@API.register @API.register
def maybe_get_admin_public_key() -> SopsKey | None: def maybe_get_admin_public_keys() -> list[SopsKey] | None:
keyring = SopsKey.collect_public_keys() keyring = SopsKey.collect_public_keys()
if len(keyring) == 0: if len(keyring) == 0:
return None return None
if len(keyring) > 1: return keyring
last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]]
msg = (
f"Found {len(keyring)} public keys in your "
f"environment/system and cannot decide which one to "
f"use, first {len(last_3)}:\n\n"
f"- {'\n- '.join(last_3)}\n\n"
f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or "
f"SOPS_PGP_FP appropriately"
)
raise ClanError(msg)
return keyring[0]
def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]: def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
key = maybe_get_admin_public_key() keys = maybe_get_admin_public_keys()
if not key: if not keys:
msg = "No SOPS key found. Please generate one with `clan secrets key generate`." msg = "No SOPS key found. Please generate one with `clan secrets key generate`."
raise ClanError(msg) raise ClanError(msg)
user_keys = maybe_get_user(flake_dir, key) user_keys = maybe_get_user(flake_dir, set(keys))
if not user_keys: if not user_keys:
# todo improve error message msg = (
msg = f"We could not figure out which Clan secrets user you are with the SOPS key we found: {key.pubkey}" f"We could not figure out which Clan secrets user you are with the SOPS keys we found:\n"
f"- {'\n- '.join(f'{key.key_type.name.lower()}: {key.pubkey}' for key in keys)}\n\n"
f"Please ensure you have created a Clan secrets user and added one of your SOPS keys"
f"to that user.\n"
f"For more information, see: https://docs.clan.lol/guides/getting-started/secrets/#add-your-public-keys"
)
raise ClanError(msg) raise ClanError(msg)
return user_keys return user_keys

View File

@@ -638,7 +638,7 @@ def test_secrets(
with capture_output as output: with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake_with_core.path)]) cli.run(["secrets", "key", "show", "--flake", str(test_flake_with_core.path)])
key = json.loads(output.out) key = json.loads(output.out)[0]
assert key["publickey"].startswith("age1") assert key["publickey"].startswith("age1")
# Add testuser with the key that was generated for the clan # Add testuser with the key that was generated for the clan
cli.run( cli.run(
@@ -991,7 +991,7 @@ def test_secrets_key_generate_gpg(
cli.run( cli.run(
["secrets", "key", "show", "--flake", str(test_flake_with_core.path)] ["secrets", "key", "show", "--flake", str(test_flake_with_core.path)]
) )
key = json.loads(output.out) key = json.loads(output.out)[0]
assert key["type"] == "pgp" assert key["type"] == "pgp"
assert key["publickey"] == gpg_key.fingerprint assert key["publickey"] == gpg_key.fingerprint

View File

@@ -4,7 +4,7 @@ import os
from pathlib import Path from pathlib import Path
from clan_cli.secrets.key import generate_key from clan_cli.secrets.key import generate_key
from clan_cli.secrets.sops import maybe_get_admin_public_key from clan_cli.secrets.sops import maybe_get_admin_public_keys
from clan_cli.secrets.users import add_user from clan_cli.secrets.users import add_user
from clan_lib.api import API from clan_lib.api import API
from clan_lib.errors import ClanError from clan_lib.errors import ClanError
@@ -19,14 +19,14 @@ def keygen(flake_dir: Path, user: str | None = None, force: bool = False) -> Non
if not user: if not user:
msg = "No user provided and $USER is not set. Please provide a user via --user." msg = "No user provided and $USER is not set. Please provide a user via --user."
raise ClanError(msg) raise ClanError(msg)
pub_key = maybe_get_admin_public_key() pub_keys = maybe_get_admin_public_keys()
if not pub_key: if not pub_keys:
pub_key = generate_key() pub_keys = [generate_key()]
# TODO set flake_dir=flake.path / "vars" # TODO set flake_dir=flake.path / "vars"
add_user( add_user(
flake_dir=flake_dir, flake_dir=flake_dir,
name=user, name=user,
keys=[pub_key], keys=pub_keys,
force=force, force=force,
) )

View File

@@ -11,7 +11,7 @@ import pytest
from clan_cli.machines.create import CreateOptions as ClanCreateOptions from clan_cli.machines.create import CreateOptions as ClanCreateOptions
from clan_cli.machines.create import create_machine from clan_cli.machines.create import create_machine
from clan_cli.secrets.key import generate_key from clan_cli.secrets.key import generate_key
from clan_cli.secrets.sops import maybe_get_admin_public_key from clan_cli.secrets.sops import maybe_get_admin_public_keys
from clan_cli.secrets.users import add_user from clan_cli.secrets.users import add_user
from clan_cli.ssh.host_key import HostKeyCheck from clan_cli.ssh.host_key import HostKeyCheck
from clan_cli.vars.generate import generate_vars_for_machine, get_generators_closure from clan_cli.vars.generate import generate_vars_for_machine, get_generators_closure
@@ -159,10 +159,10 @@ def test_clan_create_api(
fix_flake_inputs(dest_clan_dir, clan_core_dir) fix_flake_inputs(dest_clan_dir, clan_core_dir)
# ===== CREATE SOPS KEY ====== # ===== CREATE SOPS KEY ======
sops_key = maybe_get_admin_public_key() sops_keys = maybe_get_admin_public_keys()
if sops_key is None: if sops_keys is None:
# TODO: In the UI we need a view for this # TODO: In the UI we need a view for this
sops_key = generate_key() sops_keys = [generate_key()]
else: else:
msg = "SOPS key already exists, please remove it before running this test" msg = "SOPS key already exists, please remove it before running this test"
raise ClanError(msg) raise ClanError(msg)
@@ -171,7 +171,7 @@ def test_clan_create_api(
add_user( add_user(
dest_clan_dir, dest_clan_dir,
name="testuser", name="testuser",
keys=[sops_key], keys=sops_keys,
force=False, force=False,
) )