secrets: replace Key, key type tuple with SopsKey class

This commit is contained in:
Jörg Thalheim
2024-10-01 17:14:51 +02:00
committed by Mic92
parent 541a73692f
commit 4a3030d6ed

View File

@@ -91,10 +91,7 @@ def get_user_name(flake_dir: Path, user: str) -> str:
print(f"{flake_dir / user} already exists")
def maybe_get_user_or_machine(
flake_dir: Path, pub_key: str, key_type: KeyType
) -> SopsKey | None:
key = SopsKey(pub_key, username="", key_type=key_type)
def maybe_get_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey | None:
folders = [sops_users_folder(flake_dir), sops_machines_folder(flake_dir)]
for folder in folders:
@@ -103,7 +100,7 @@ def maybe_get_user_or_machine(
if not (user / "key.json").exists():
continue
this_pub_key, this_key_type = read_key(user)
if pub_key == this_pub_key and key_type == this_key_type:
if key.pubkey == this_pub_key and key.key_type == this_key_type:
key.username = user.name
return key
@@ -111,12 +108,12 @@ def maybe_get_user_or_machine(
@API.register
def ensure_user_or_machine(flake_dir: Path, pub_key: str, key_type: KeyType) -> SopsKey:
key = maybe_get_user_or_machine(flake_dir, pub_key, key_type)
if not key:
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
def ensure_user_or_machine(flake_dir: Path, key: SopsKey) -> SopsKey:
maybe_key = maybe_get_user_or_machine(flake_dir, key)
if maybe_key:
return maybe_key
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {key.pubkey}' (replace youruser with your user name)"
raise ClanError(msg)
return key
def default_admin_key_path() -> Path:
@@ -127,37 +124,43 @@ def default_admin_key_path() -> Path:
@API.register
def maybe_get_admin_public_key() -> tuple[str, KeyType | None]:
def maybe_get_admin_public_key() -> None | SopsKey:
age_key = os.environ.get("SOPS_AGE_KEY")
pgp_key = os.environ.get("SOPS_PGP_FP")
if age_key and pgp_key:
msg = "Cannot decide which key to use when both `SOPS_AGE_KEY` and `SOPS_PGP_FP` are set. Please specify one or the other."
raise ClanError(msg)
if age_key:
return get_public_age_key(age_key), KeyType.AGE
return SopsKey(
pubkey=get_public_age_key(age_key), key_type=KeyType.AGE, username=""
)
if pgp_key:
return pgp_key, KeyType.PGP
return SopsKey(pubkey=pgp_key, key_type=KeyType.PGP, username="")
path = default_admin_key_path()
if path.exists():
return get_public_age_key(path.read_text()), KeyType.AGE
return SopsKey(
pubkey=get_public_age_key(path.read_text()),
key_type=KeyType.AGE,
username="",
)
return "", None
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
pub_key, key_type = maybe_get_admin_public_key()
if key_type:
return maybe_get_user_or_machine(flake_dir, pub_key, key_type)
return None
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
key = maybe_get_admin_public_key()
if not key:
return None
return maybe_get_user_or_machine(flake_dir, key)
def ensure_admin_key(flake_dir: Path) -> SopsKey:
pub_key, key_type = maybe_get_admin_public_key()
if not key_type:
key = maybe_get_admin_public_key()
if key:
return ensure_user_or_machine(flake_dir, key)
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
raise ClanError(msg)
return ensure_user_or_machine(flake_dir, pub_key, key_type)
@contextmanager