Merge pull request 'vars: add 'clan vars keygen'' (#2200) from DavHau/clan-core:DavHau-vars-migration into main
This commit is contained in:
@@ -6,6 +6,7 @@ from clan_cli.hyperlink import help_hyperlink
|
||||
from .check import register_check_parser
|
||||
from .generate import register_generate_parser
|
||||
from .get import register_get_parser
|
||||
from .keygen import register_keygen_parser
|
||||
from .list import register_list_parser
|
||||
from .set import register_set_parser
|
||||
from .upload import register_upload_parser
|
||||
@@ -20,6 +21,20 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
required=True,
|
||||
)
|
||||
|
||||
keygen_parser = subparser.add_parser(
|
||||
"keygen",
|
||||
help="initialize sops keys for vars",
|
||||
epilog=(
|
||||
"""
|
||||
This subcommand allows initializing sops keys for vars.
|
||||
This creates the file ~/.config/sops/age/keys.txt
|
||||
|
||||
"""
|
||||
),
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
)
|
||||
register_keygen_parser(keygen_parser)
|
||||
|
||||
check_parser = subparser.add_parser(
|
||||
"check",
|
||||
help="check if vars are up to date",
|
||||
|
||||
49
pkgs/clan-cli/clan_cli/vars/keygen.py
Normal file
49
pkgs/clan-cli/clan_cli/vars/keygen.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.secrets.key import generate_key
|
||||
from clan_cli.secrets.users import add_user
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def keygen(user: str | None, flake: FlakeId, force: bool) -> None:
|
||||
if user is None:
|
||||
user = os.getenv("USER", None)
|
||||
if not user:
|
||||
msg = "No user provided and $USER is not set. Please provide a user via --user."
|
||||
raise ClanError(msg)
|
||||
pub_key = generate_key()
|
||||
add_user(
|
||||
flake_dir=flake.path / "vars",
|
||||
name=user,
|
||||
key=pub_key,
|
||||
force=force,
|
||||
)
|
||||
|
||||
|
||||
def _command(
|
||||
args: argparse.Namespace,
|
||||
) -> None:
|
||||
keygen(
|
||||
user=args.user,
|
||||
flake=args.flake,
|
||||
force=args.force,
|
||||
)
|
||||
|
||||
|
||||
def register_keygen_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--user",
|
||||
help="The user to generate the keys for. Default: $USER",
|
||||
default=None,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"-f", "--force", help="overwrite existing user", action="store_true"
|
||||
)
|
||||
|
||||
parser.set_defaults(func=_command)
|
||||
@@ -37,6 +37,12 @@ class SopsKey:
|
||||
username: str
|
||||
|
||||
|
||||
class MissingKeyError(ClanError):
|
||||
def __init__(self) -> None:
|
||||
msg = "Cannot find admin keys for current $USER on this computer. Please initialize admin keys once with 'clan vars keygen'"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class SecretStore(SecretStoreBase):
|
||||
def __init__(self, machine: Machine) -> None:
|
||||
self.machine = machine
|
||||
@@ -124,22 +130,19 @@ class SecretStore(SecretStoreBase):
|
||||
def admin_key(self) -> SopsKey:
|
||||
pub_key = self.maybe_get_admin_public_key()
|
||||
if not pub_key:
|
||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
||||
raise ClanError(msg)
|
||||
# return SopsKey(pub_key, username="")
|
||||
raise MissingKeyError
|
||||
return self.ensure_user_or_machine(pub_key)
|
||||
|
||||
# TODO: find alternative to `clan secrets users add`
|
||||
def ensure_user_or_machine(self, pub_key: str) -> SopsKey:
|
||||
key = self.maybe_get_user_or_machine(pub_key)
|
||||
key = self.maybe_get_user(pub_key)
|
||||
if not key:
|
||||
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
||||
raise ClanError(msg)
|
||||
raise MissingKeyError
|
||||
return key
|
||||
|
||||
def maybe_get_user_or_machine(self, pub_key: str) -> SopsKey | None:
|
||||
def maybe_get_user(self, pub_key: str) -> SopsKey | None:
|
||||
key = SopsKey(pub_key, username="")
|
||||
folders = [self.sops_dir / "users", self.sops_dir / "machines"]
|
||||
folders = [self.sops_dir / "users"]
|
||||
|
||||
for folder in folders:
|
||||
if folder.exists():
|
||||
|
||||
@@ -866,3 +866,23 @@ def test_fails_when_files_are_left_from_other_backend(
|
||||
generator,
|
||||
regenerate=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_keygen(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_home: Path,
|
||||
) -> None:
|
||||
monkeypatch.chdir(temporary_home)
|
||||
cli.run(["vars", "keygen", "--flake", str(temporary_home), "--user", "user"])
|
||||
# check public key exists
|
||||
assert (temporary_home / "vars" / "sops" / "users" / "user").is_dir()
|
||||
# check private key exists
|
||||
assert (temporary_home / ".config" / "sops" / "age" / "keys.txt").is_file()
|
||||
# it should still work, even if the keys already exist
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(temporary_home / "vars" / "sops" / "users" / "user")
|
||||
cli.run(["vars", "keygen", "--flake", str(temporary_home), "--user", "user"])
|
||||
# check public key exists
|
||||
assert (temporary_home / "vars" / "sops" / "users" / "user").is_dir()
|
||||
|
||||
Reference in New Issue
Block a user