Merge pull request 'cli: don't error when more than one SOPS key found locally' (#3946) from push-tmsrnssnwrvn into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/3946
Reviewed-by: brianmcgee <brian@bmcgee.ie>
This commit is contained in:
Michael Hoang
2025-06-13 08:00:37 +00:00
5 changed files with 41 additions and 45 deletions

View File

@@ -13,15 +13,16 @@ from .sops import (
default_admin_private_key_path,
generate_private_key,
load_age_plugins,
maybe_get_admin_public_key,
maybe_get_admin_public_keys,
)
log = logging.getLogger(__name__)
def generate_key() -> sops.SopsKey:
key = maybe_get_admin_public_key()
if key is not None:
keys = maybe_get_admin_public_keys()
if keys is not None:
key = keys[0]
print(f"{key.key_type.name} key {key.pubkey} is already set", file=sys.stderr)
return key
@@ -44,11 +45,11 @@ def generate_command(args: argparse.Namespace) -> None:
def show_command(args: argparse.Namespace) -> None:
key = sops.maybe_get_admin_public_key()
if not key:
keys = sops.maybe_get_admin_public_keys()
if not keys:
msg = "No public key found"
raise ClanError(msg)
json.dump(key.as_dict(), sys.stdout, indent=2, sort_keys=True)
json.dump([key.as_dict() for key in keys], sys.stdout, indent=2, sort_keys=True)
def update_command(args: argparse.Namespace) -> None:

View File

@@ -7,7 +7,7 @@ import os
import re
import shutil
import subprocess
from collections.abc import Iterable, Sequence
from collections.abc import Iterable
from contextlib import suppress
from pathlib import Path
from tempfile import NamedTemporaryFile
@@ -50,8 +50,8 @@ class KeyType(enum.Enum):
)
raise ClanError(msg)
def collect_public_keys(self) -> Sequence[str]:
keyring: list[str] = []
def collect_public_keys(self) -> list[str]:
keyring = []
if self == self.AGE:
@@ -136,7 +136,7 @@ class SopsKey:
return read_keys(folder)
@classmethod
def collect_public_keys(cls) -> Sequence["SopsKey"]:
def collect_public_keys(cls) -> list["SopsKey"]:
return [
cls(pubkey=key, username="", key_type=key_type)
for key_type in KeyType
@@ -374,7 +374,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
def maybe_get_user(flake_dir: Path, keys: set[SopsKey]) -> set[SopsKey] | None:
folder = sops_users_folder(flake_dir)
if folder.exists():
@@ -382,9 +382,11 @@ def maybe_get_user(flake_dir: Path, key: SopsKey) -> set[SopsKey] | None:
if not (user / "key.json").exists():
continue
keys = read_keys(user)
if key in keys:
return {SopsKey(key.pubkey, user.name, key.key_type) for key in keys}
user_keys = read_keys(user)
if len(keys.intersection(user_keys)):
return {
SopsKey(key.pubkey, user.name, key.key_type) for key in user_keys
}
return None
@@ -397,39 +399,32 @@ def default_admin_private_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> SopsKey | None:
def maybe_get_admin_public_keys() -> list[SopsKey] | None:
keyring = SopsKey.collect_public_keys()
if len(keyring) == 0:
return None
if len(keyring) > 1:
last_3 = [f"{key.key_type.name.lower()}:{key.pubkey}" for key in keyring[:3]]
msg = (
f"Found {len(keyring)} public keys in your "
f"environment/system and cannot decide which one to "
f"use, first {len(last_3)}:\n\n"
f"- {'\n- '.join(last_3)}\n\n"
f"Please set one of SOPS_AGE_KEY, SOPS_AGE_KEY_FILE or "
f"SOPS_PGP_FP appropriately"
)
raise ClanError(msg)
return keyring[0]
return keyring
def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
key = maybe_get_admin_public_key()
keys = maybe_get_admin_public_keys()
if not key:
if not keys:
msg = "No SOPS key found. Please generate one with `clan secrets key generate`."
raise ClanError(msg)
user_keys = maybe_get_user(flake_dir, key)
user_keys = maybe_get_user(flake_dir, set(keys))
if not user_keys:
# todo improve error message
msg = f"We could not figure out which Clan secrets user you are with the SOPS key we found: {key.pubkey}"
msg = (
f"We could not figure out which Clan secrets user you are with the SOPS keys we found:\n"
f"- {'\n- '.join(f'{key.key_type.name.lower()}: {key.pubkey}' for key in keys)}\n\n"
f"Please ensure you have created a Clan secrets user and added one of your SOPS keys"
f"to that user.\n"
f"For more information, see: https://docs.clan.lol/guides/getting-started/secrets/#add-your-public-keys"
)
raise ClanError(msg)
return user_keys

View File

@@ -638,7 +638,7 @@ def test_secrets(
with capture_output as output:
cli.run(["secrets", "key", "show", "--flake", str(test_flake_with_core.path)])
key = json.loads(output.out)
key = json.loads(output.out)[0]
assert key["publickey"].startswith("age1")
# Add testuser with the key that was generated for the clan
cli.run(
@@ -991,7 +991,7 @@ def test_secrets_key_generate_gpg(
cli.run(
["secrets", "key", "show", "--flake", str(test_flake_with_core.path)]
)
key = json.loads(output.out)
key = json.loads(output.out)[0]
assert key["type"] == "pgp"
assert key["publickey"] == gpg_key.fingerprint

View File

@@ -4,7 +4,7 @@ import os
from pathlib import Path
from clan_cli.secrets.key import generate_key
from clan_cli.secrets.sops import maybe_get_admin_public_key
from clan_cli.secrets.sops import maybe_get_admin_public_keys
from clan_cli.secrets.users import add_user
from clan_lib.api import API
from clan_lib.errors import ClanError
@@ -19,14 +19,14 @@ def keygen(flake_dir: Path, user: str | None = None, force: bool = False) -> Non
if not user:
msg = "No user provided and $USER is not set. Please provide a user via --user."
raise ClanError(msg)
pub_key = maybe_get_admin_public_key()
if not pub_key:
pub_key = generate_key()
pub_keys = maybe_get_admin_public_keys()
if not pub_keys:
pub_keys = [generate_key()]
# TODO set flake_dir=flake.path / "vars"
add_user(
flake_dir=flake_dir,
name=user,
keys=[pub_key],
keys=pub_keys,
force=force,
)

View File

@@ -11,7 +11,7 @@ import pytest
from clan_cli.machines.create import CreateOptions as ClanCreateOptions
from clan_cli.machines.create import create_machine
from clan_cli.secrets.key import generate_key
from clan_cli.secrets.sops import maybe_get_admin_public_key
from clan_cli.secrets.sops import maybe_get_admin_public_keys
from clan_cli.secrets.users import add_user
from clan_cli.ssh.host_key import HostKeyCheck
from clan_cli.vars.generate import generate_vars_for_machine, get_generators_closure
@@ -159,10 +159,10 @@ def test_clan_create_api(
fix_flake_inputs(dest_clan_dir, clan_core_dir)
# ===== CREATE SOPS KEY ======
sops_key = maybe_get_admin_public_key()
if sops_key is None:
sops_keys = maybe_get_admin_public_keys()
if sops_keys is None:
# TODO: In the UI we need a view for this
sops_key = generate_key()
sops_keys = [generate_key()]
else:
msg = "SOPS key already exists, please remove it before running this test"
raise ClanError(msg)
@@ -171,7 +171,7 @@ def test_clan_create_api(
add_user(
dest_clan_dir,
name="testuser",
keys=[sops_key],
keys=sops_keys,
force=False,
)