clan flakes create: initialize keys automatically (#4435)
fixes https://git.clan.lol/clan/clan-core/issues/2665 fixes https://git.clan.lol/clan/clan-core/issues/4407 Co-authored-by: DavHau <d.hauer.it@gmail.com> Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4435 Co-authored-by: Jörg Thalheim <joerg@thalheim.io> Co-committed-by: Jörg Thalheim <joerg@thalheim.io>
This commit is contained in:
@@ -7,6 +7,7 @@ from clan_lib.clan.create import CreateOptions, create_clan
|
||||
from clan_lib.errors import ClanError
|
||||
|
||||
from clan_cli.completions import add_dynamic_completer, complete_templates_clan
|
||||
from clan_cli.vars.keygen import create_secrets_user_auto
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -43,6 +44,12 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
|
||||
default=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--user",
|
||||
help="The user to generate the keys for. Default: logged-in OS username (e.g. from $LOGNAME or system)",
|
||||
default=None,
|
||||
)
|
||||
|
||||
def create_flake_command(args: argparse.Namespace) -> None:
|
||||
# Ask for a path interactively if none provided
|
||||
if args.name is None:
|
||||
@@ -62,5 +69,10 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
|
||||
update_clan=not args.no_update,
|
||||
)
|
||||
)
|
||||
create_secrets_user_auto(
|
||||
flake_dir=Path(args.name).resolve(),
|
||||
user=args.user,
|
||||
force=True,
|
||||
)
|
||||
|
||||
parser.set_defaults(func=create_flake_command)
|
||||
|
||||
@@ -285,7 +285,7 @@ Examples:
|
||||
$ clan secrets get [SECRET]
|
||||
Will display the content of the specified secret.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
@@ -324,7 +324,7 @@ Examples:
|
||||
This is especially useful for resetting certain passwords while leaving the rest
|
||||
of the facts for a machine in place.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
@@ -362,7 +362,7 @@ Examples:
|
||||
This is especially useful for resetting certain passwords while leaving the rest
|
||||
of the vars for a machine in place.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
|
||||
@@ -31,7 +31,7 @@ Examples:
|
||||
Will check facts for the specified machine.
|
||||
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
@@ -61,7 +61,7 @@ Examples:
|
||||
Will list facts for the specified machine.
|
||||
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
@@ -101,7 +101,7 @@ Examples:
|
||||
This is especially useful for resetting certain passwords while leaving the rest
|
||||
of the facts for a machine in place.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
@@ -125,7 +125,7 @@ Examples:
|
||||
$ clan facts upload [MACHINE]
|
||||
Will upload secrets to a specific machine.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
|
||||
@@ -29,7 +29,7 @@ def generate_key() -> sops.SopsKey:
|
||||
path = default_admin_private_key_path()
|
||||
_, pub_key = generate_private_key(out_file=path)
|
||||
print(
|
||||
f"Generated age private key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
||||
f"Generated age private key at '{path}' for your user.\nPlease back it up on a secure location or you will lose access to your secrets."
|
||||
)
|
||||
return sops.SopsKey(pub_key, username="", key_type=sops.KeyType.AGE)
|
||||
|
||||
|
||||
@@ -421,7 +421,7 @@ def ensure_admin_public_keys(flake_dir: Path) -> set[SopsKey]:
|
||||
f"- {'\n- '.join(f'{key.key_type.name.lower()}: {key.pubkey}' for key in keys)}\n\n"
|
||||
f"Please ensure you have created a Clan secrets user and added one of your SOPS keys"
|
||||
f"to that user.\n"
|
||||
f"For more information, see: https://docs.clan.lol/guides/getting-started/secrets/#add-your-public-keys"
|
||||
f"For more information, see: https://docs.clan.lol/guides/secrets/#add-your-public-keys"
|
||||
)
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from clan_cli.tests.fixtures_flakes import FlakeForTest, substitute
|
||||
from clan_cli.tests.fixtures_flakes import substitute
|
||||
from clan_cli.tests.helpers import cli
|
||||
from clan_cli.tests.stdout import CaptureOutput
|
||||
from clan_lib.cmd import run
|
||||
@@ -21,6 +21,7 @@ def test_create_flake(
|
||||
) -> None:
|
||||
flake_dir = temporary_home / "test-flake"
|
||||
|
||||
monkeypatch.setenv("LOGNAME", "testuser")
|
||||
cli.run(["flakes", "create", str(flake_dir), "--template=default", "--no-update"])
|
||||
|
||||
# Replace the inputs.clan.url in the template flake.nix
|
||||
@@ -65,6 +66,7 @@ def test_create_flake_existing_git(
|
||||
|
||||
run(["git", "init", str(temporary_home)])
|
||||
|
||||
monkeypatch.setenv("LOGNAME", "testuser")
|
||||
cli.run(["flakes", "create", str(flake_dir), "--template=default", "--no-update"])
|
||||
|
||||
# Replace the inputs.clan.url in the template flake.nix
|
||||
@@ -101,12 +103,12 @@ def test_create_flake_existing_git(
|
||||
def test_ui_template(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_home: Path,
|
||||
test_flake_with_core: FlakeForTest,
|
||||
clan_core: Path,
|
||||
capture_output: CaptureOutput,
|
||||
) -> None:
|
||||
flake_dir = temporary_home / "test-flake"
|
||||
|
||||
monkeypatch.setenv("LOGNAME", "testuser")
|
||||
cli.run(["flakes", "create", str(flake_dir), "--template=minimal", "--no-update"])
|
||||
|
||||
# Replace the inputs.clan.url in the template flake.nix
|
||||
|
||||
@@ -197,7 +197,7 @@ Examples:
|
||||
$ clan vars upload [MACHINE]
|
||||
Will upload secrets to a specific machine.
|
||||
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/getting-started/secrets")}
|
||||
For more detailed information, visit: {help_hyperlink("secrets", "https://docs.clan.lol/guides/secrets")}
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import argparse
|
||||
import getpass
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.secrets.key import generate_key
|
||||
from clan_cli.secrets.sops import maybe_get_admin_public_keys
|
||||
from clan_cli.secrets.sops import SopsKey, maybe_get_admin_public_keys
|
||||
from clan_cli.secrets.users import add_user
|
||||
from clan_lib.api import API
|
||||
from clan_lib.errors import ClanError
|
||||
@@ -12,6 +13,17 @@ from clan_lib.errors import ClanError
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_user_or_default(user: str | None) -> str:
|
||||
"""Get the user name, defaulting to the logged-in OS username if not provided."""
|
||||
if user is None:
|
||||
try:
|
||||
user = getpass.getuser()
|
||||
except Exception as e:
|
||||
msg = "No user provided and could not determine logged-in OS username. Please provide an explicit username via argument"
|
||||
raise ClanError(msg) from e
|
||||
return user
|
||||
|
||||
|
||||
# TODO: Unify with "create clan" should be done automatically
|
||||
@API.register
|
||||
def create_secrets_user(
|
||||
@@ -20,11 +32,7 @@ def create_secrets_user(
|
||||
"""
|
||||
initialize sops keys for vars
|
||||
"""
|
||||
if user is None:
|
||||
user = os.getenv("USER", None)
|
||||
if not user:
|
||||
msg = "No user provided and environment variable: '$USER' is not set. Please provide an explizit username via argument"
|
||||
raise ClanError(msg)
|
||||
user = _get_user_or_default(user)
|
||||
pub_keys = maybe_get_admin_public_keys()
|
||||
if not pub_keys:
|
||||
pub_keys = [generate_key()]
|
||||
@@ -37,10 +45,97 @@ def create_secrets_user(
|
||||
)
|
||||
|
||||
|
||||
def _select_keys_interactive(pub_keys: list[SopsKey]) -> list[SopsKey]:
|
||||
# let the user select which of the keys to use
|
||||
log.info("\nFound existing admin keys on this machine:")
|
||||
selected_keys: list[SopsKey] = []
|
||||
for i, key in enumerate(pub_keys):
|
||||
log.info(f"{i + 1}: {key}")
|
||||
while not selected_keys:
|
||||
choice = input(
|
||||
"Select keys to use (comma-separated list of numbers, or leave empty to select all): "
|
||||
).strip()
|
||||
if not choice:
|
||||
log.info("No keys selected, using all keys.")
|
||||
return pub_keys
|
||||
|
||||
try:
|
||||
indices = [int(i) - 1 for i in choice.split(",")]
|
||||
selected_keys = [pub_keys[i] for i in indices if 0 <= i < len(pub_keys)]
|
||||
except ValueError:
|
||||
log.info("Invalid input. Please enter a comma-separated list of numbers.")
|
||||
|
||||
return selected_keys
|
||||
|
||||
|
||||
def create_secrets_user_interactive(
|
||||
flake_dir: Path, user: str | None = None, force: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Initialize sops keys for vars interactively.
|
||||
"""
|
||||
user = _get_user_or_default(user)
|
||||
pub_keys = maybe_get_admin_public_keys()
|
||||
if pub_keys:
|
||||
# let the user select which of the keys to use
|
||||
pub_keys = _select_keys_interactive(pub_keys)
|
||||
else:
|
||||
log.info(
|
||||
"\nNo admin keys found on this machine, generating a new key for sops."
|
||||
)
|
||||
pub_keys = [generate_key()]
|
||||
# make sure the user backups the generated key
|
||||
log.info("\n⚠️ IMPORTANT: Secret Key Backup ⚠️")
|
||||
log.info(
|
||||
"The generated key above is CRITICAL for accessing your clan's secrets."
|
||||
)
|
||||
log.info("Without this key, you will lose access to all encrypted data!")
|
||||
log.info("Please backup the key file immediately to a secure location.")
|
||||
log.info("The key is typically stored in ~/.config/sops/age/keys.txt")
|
||||
confirm = None
|
||||
while not confirm or confirm.lower() != "y":
|
||||
log.info(
|
||||
"\nI have backed up the key file to a secure location. Confirm [y/N]: "
|
||||
)
|
||||
confirm = input().strip().lower()
|
||||
if confirm != "y":
|
||||
log.error(
|
||||
"You must backup the key before proceeding. This is critical for data recovery!"
|
||||
)
|
||||
|
||||
# persist the generated or chosen admin pubkey in the repo
|
||||
add_user(
|
||||
flake_dir=flake_dir,
|
||||
name=user,
|
||||
keys=pub_keys,
|
||||
force=force,
|
||||
)
|
||||
|
||||
|
||||
def create_secrets_user_auto(
|
||||
flake_dir: Path, user: str | None = None, force: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Detect if the user is in interactive mode or not and choose the appropriate routine.
|
||||
"""
|
||||
if sys.stdin.isatty():
|
||||
create_secrets_user_interactive(
|
||||
flake_dir=flake_dir,
|
||||
user=user,
|
||||
force=force,
|
||||
)
|
||||
else:
|
||||
create_secrets_user(
|
||||
flake_dir=flake_dir,
|
||||
user=user,
|
||||
force=force,
|
||||
)
|
||||
|
||||
|
||||
def _command(
|
||||
args: argparse.Namespace,
|
||||
) -> None:
|
||||
create_secrets_user(
|
||||
create_secrets_user_auto(
|
||||
flake_dir=args.flake.path,
|
||||
user=args.user,
|
||||
force=args.force,
|
||||
@@ -50,7 +145,7 @@ def _command(
|
||||
def register_keygen_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--user",
|
||||
help="The user to generate the keys for. Default: $USER",
|
||||
help="The user to generate the keys for. Default: logged-in OS username (e.g. from $LOGNAME or system)",
|
||||
default=None,
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user