sops: refactor some function names for clarity
This commit is contained in:
@@ -6,7 +6,7 @@ from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_files
|
||||
|
||||
from .secrets import update_secrets
|
||||
from .sops import default_sops_key_path, generate_private_key, get_public_key
|
||||
from .sops import default_admin_key_path, generate_private_key, get_public_key
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -34,19 +34,19 @@ def extract_public_key(filepath: Path) -> str:
|
||||
|
||||
|
||||
def generate_key() -> str:
|
||||
path = default_sops_key_path()
|
||||
path = default_admin_key_path()
|
||||
if path.exists():
|
||||
log.info(f"Key already exists at {path}")
|
||||
return extract_public_key(path)
|
||||
priv_key, pub_key = generate_private_key(out_file=path)
|
||||
log.info(
|
||||
f"Generated age private key at '{default_sops_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
||||
f"Generated age private key at '{default_admin_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
||||
)
|
||||
return pub_key
|
||||
|
||||
|
||||
def show_key() -> str:
|
||||
return get_public_key(default_sops_key_path().read_text())
|
||||
return get_public_key(default_admin_key_path().read_text())
|
||||
|
||||
|
||||
def generate_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -22,13 +22,13 @@ from .sops import read_key, write_key
|
||||
from .types import public_or_private_age_key_type, secret_name_type
|
||||
|
||||
|
||||
def add_machine(flake_dir: Path, machine: str, key: str, force: bool) -> None:
|
||||
path = sops_machines_folder(flake_dir) / machine
|
||||
write_key(path, key, force)
|
||||
paths = [path]
|
||||
def add_machine(flake_dir: Path, machine: str, pubkey: str, force: bool) -> None:
|
||||
machine_path = sops_machines_folder(flake_dir) / machine
|
||||
write_key(machine_path, pubkey, force)
|
||||
paths = [machine_path]
|
||||
|
||||
def filter_machine_secrets(secret: Path) -> bool:
|
||||
return secret.joinpath("machines", machine).exists()
|
||||
return (secret / "machines" / machine).exists()
|
||||
|
||||
paths.extend(update_secrets(flake_dir, filter_secrets=filter_machine_secrets))
|
||||
commit_files(
|
||||
|
||||
@@ -27,7 +27,7 @@ from .folders import (
|
||||
sops_secrets_folder,
|
||||
sops_users_folder,
|
||||
)
|
||||
from .sops import decrypt_file, encrypt_file, ensure_sops_key, read_key, update_keys
|
||||
from .sops import decrypt_file, encrypt_file, ensure_admin_key, read_key, update_keys
|
||||
from .types import VALID_SECRET_NAME, secret_name_type
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ def encrypt_secret(
|
||||
add_machines = []
|
||||
if add_users is None:
|
||||
add_users = []
|
||||
key = ensure_sops_key(flake_dir)
|
||||
key = ensure_admin_key(flake_dir)
|
||||
recipient_keys = set()
|
||||
|
||||
files_to_commit = []
|
||||
@@ -293,7 +293,7 @@ def list_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def decrypt_secret(flake_dir: Path, secret_path: Path) -> str:
|
||||
ensure_sops_key(flake_dir)
|
||||
ensure_admin_key(flake_dir)
|
||||
path = secret_path / "secret"
|
||||
if not path.exists():
|
||||
msg = f"Secret '{secret_path!s}' does not exist"
|
||||
|
||||
@@ -103,7 +103,7 @@ def ensure_user_or_machine(flake_dir: Path, pub_key: str) -> SopsKey:
|
||||
return key
|
||||
|
||||
|
||||
def default_sops_key_path() -> Path:
|
||||
def default_admin_key_path() -> Path:
|
||||
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
||||
if raw_path:
|
||||
return Path(raw_path)
|
||||
@@ -111,11 +111,11 @@ def default_sops_key_path() -> Path:
|
||||
|
||||
|
||||
@API.register
|
||||
def maybe_get_public_key() -> str | None:
|
||||
def maybe_get_admin_public_key() -> str | None:
|
||||
key = os.environ.get("SOPS_AGE_KEY")
|
||||
if key:
|
||||
return get_public_key(key)
|
||||
path = default_sops_key_path()
|
||||
path = default_admin_key_path()
|
||||
if path.exists():
|
||||
return get_public_key(path.read_text())
|
||||
|
||||
@@ -123,14 +123,14 @@ def maybe_get_public_key() -> str | None:
|
||||
|
||||
|
||||
def maybe_get_sops_key(flake_dir: Path) -> SopsKey | None:
|
||||
pub_key = maybe_get_public_key()
|
||||
pub_key = maybe_get_admin_public_key()
|
||||
if pub_key:
|
||||
return maybe_get_user_or_machine(flake_dir, pub_key)
|
||||
return None
|
||||
|
||||
|
||||
def ensure_sops_key(flake_dir: Path) -> SopsKey:
|
||||
pub_key = maybe_get_public_key()
|
||||
def ensure_admin_key(flake_dir: Path) -> SopsKey:
|
||||
pub_key = maybe_get_admin_public_key()
|
||||
if not pub_key:
|
||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
raise ClanError(msg)
|
||||
|
||||
Reference in New Issue
Block a user