175 lines
5.2 KiB
Python
175 lines
5.2 KiB
Python
import argparse
|
|
import getpass
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from clan_cli.secrets.key import generate_key
|
|
from clan_cli.secrets.sops import SopsKey, maybe_get_admin_public_keys
|
|
from clan_cli.secrets.users import add_user
|
|
from clan_lib.api import API
|
|
from clan_lib.errors import ClanError
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_user_or_default(user: str | None) -> str:
|
|
"""Get the user name, defaulting to the logged-in OS username if not provided."""
|
|
if user is None:
|
|
try:
|
|
user = getpass.getuser()
|
|
except Exception as e:
|
|
msg = "No user provided and could not determine logged-in OS username. Please provide an explicit username via argument"
|
|
raise ClanError(msg) from e
|
|
return user
|
|
|
|
|
|
# TODO: Unify with "create clan" should be done automatically
|
|
@API.register
|
|
def create_secrets_user(
|
|
flake_dir: Path,
|
|
user: str | None = None,
|
|
force: bool = False,
|
|
) -> None:
|
|
"""Initialize sops keys for vars"""
|
|
user = _get_user_or_default(user)
|
|
pub_keys = maybe_get_admin_public_keys()
|
|
if not pub_keys:
|
|
pub_keys = [generate_key()]
|
|
# TODO set flake_dir=flake.path / "vars"
|
|
add_user(
|
|
flake_dir=flake_dir,
|
|
name=user,
|
|
keys=pub_keys,
|
|
force=force,
|
|
)
|
|
|
|
|
|
def _select_keys_interactive(pub_keys: list[SopsKey]) -> list[SopsKey]:
|
|
# let the user select which of the keys to use
|
|
log.info("\nFound existing admin keys on this machine:")
|
|
selected_keys: list[SopsKey] = []
|
|
for i, key in enumerate(pub_keys):
|
|
log.info(
|
|
f"{i + 1}: type: {key.key_type}\n pubkey: {key.pubkey}\n source: {key.source}",
|
|
)
|
|
while not selected_keys:
|
|
choice = input(
|
|
"Select keys to use (comma-separated list of numbers, or leave empty to select all): ",
|
|
).strip()
|
|
if not choice:
|
|
log.info("No keys selected, using all keys.")
|
|
return pub_keys
|
|
|
|
try:
|
|
indices = [int(i) - 1 for i in choice.split(",")]
|
|
selected_keys = [pub_keys[i] for i in indices if 0 <= i < len(pub_keys)]
|
|
except ValueError:
|
|
log.info("Invalid input. Please enter a comma-separated list of numbers.")
|
|
|
|
return selected_keys
|
|
|
|
|
|
def create_secrets_user_interactive(
|
|
flake_dir: Path,
|
|
user: str | None = None,
|
|
force: bool = False,
|
|
) -> None:
|
|
"""Initialize sops keys for vars interactively."""
|
|
user = _get_user_or_default(user)
|
|
pub_keys = maybe_get_admin_public_keys()
|
|
if pub_keys:
|
|
# let the user select which of the keys to use
|
|
pub_keys = _select_keys_interactive(pub_keys)
|
|
else:
|
|
log.info(
|
|
"\nNo admin keys found on this machine, generating a new key for sops.",
|
|
)
|
|
pub_keys = [generate_key()]
|
|
# make sure the user backups the generated key
|
|
log.info("\n⚠️ IMPORTANT: Secret Key Backup ⚠️")
|
|
log.info(
|
|
"The generated key above is CRITICAL for accessing your clan's secrets.",
|
|
)
|
|
log.info("Without this key, you will lose access to all encrypted data!")
|
|
log.info("Please backup the key file immediately to a secure location.")
|
|
log.info("The key is typically stored in ~/.config/sops/age/keys.txt")
|
|
confirm = None
|
|
while not confirm or confirm.lower() != "y":
|
|
log.info(
|
|
"\nI have backed up the key file to a secure location. Confirm [y/N]: ",
|
|
)
|
|
confirm = input().strip().lower()
|
|
if confirm != "y":
|
|
log.error(
|
|
"You must backup the key before proceeding. This is critical for data recovery!",
|
|
)
|
|
|
|
# persist the generated or chosen admin pubkey in the repo
|
|
add_user(
|
|
flake_dir=flake_dir,
|
|
name=user,
|
|
keys=pub_keys,
|
|
force=force,
|
|
)
|
|
|
|
|
|
def create_secrets_user_auto(
|
|
flake_dir: Path,
|
|
user: str | None = None,
|
|
force: bool = False,
|
|
) -> None:
|
|
"""Detect if the user is in interactive mode or not and choose the appropriate routine."""
|
|
if sys.stdin.isatty():
|
|
create_secrets_user_interactive(
|
|
flake_dir=flake_dir,
|
|
user=user,
|
|
force=force,
|
|
)
|
|
else:
|
|
create_secrets_user(
|
|
flake_dir=flake_dir,
|
|
user=user,
|
|
force=force,
|
|
)
|
|
|
|
|
|
def _command(
|
|
args: argparse.Namespace,
|
|
) -> None:
|
|
if args.no_interactive:
|
|
create_secrets_user(
|
|
flake_dir=args.flake.path,
|
|
user=args.user,
|
|
force=args.force,
|
|
)
|
|
else:
|
|
create_secrets_user_auto(
|
|
flake_dir=args.flake.path,
|
|
user=args.user,
|
|
force=args.force,
|
|
)
|
|
|
|
|
|
def register_keygen_parser(parser: argparse.ArgumentParser) -> None:
|
|
parser.add_argument(
|
|
"--user",
|
|
help="The user to generate the keys for. Default: logged-in OS username (e.g. from $LOGNAME or system)",
|
|
default=None,
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-f",
|
|
"--force",
|
|
help="overwrite existing user",
|
|
action="store_true",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--no-interactive",
|
|
help="Run in non-interactive mode, using keys from the machine if available",
|
|
action="store_true",
|
|
)
|
|
|
|
parser.set_defaults(func=_command)
|