Merge pull request 'vars: add 'clan vars keygen'' (#2200) from DavHau/clan-core:DavHau-vars-migration into main
This commit is contained in:
@@ -6,6 +6,7 @@ from clan_cli.hyperlink import help_hyperlink
|
|||||||
from .check import register_check_parser
|
from .check import register_check_parser
|
||||||
from .generate import register_generate_parser
|
from .generate import register_generate_parser
|
||||||
from .get import register_get_parser
|
from .get import register_get_parser
|
||||||
|
from .keygen import register_keygen_parser
|
||||||
from .list import register_list_parser
|
from .list import register_list_parser
|
||||||
from .set import register_set_parser
|
from .set import register_set_parser
|
||||||
from .upload import register_upload_parser
|
from .upload import register_upload_parser
|
||||||
@@ -20,6 +21,20 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
required=True,
|
required=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
keygen_parser = subparser.add_parser(
|
||||||
|
"keygen",
|
||||||
|
help="initialize sops keys for vars",
|
||||||
|
epilog=(
|
||||||
|
"""
|
||||||
|
This subcommand allows initializing sops keys for vars.
|
||||||
|
This creates the file ~/.config/sops/age/keys.txt
|
||||||
|
|
||||||
|
"""
|
||||||
|
),
|
||||||
|
formatter_class=argparse.RawTextHelpFormatter,
|
||||||
|
)
|
||||||
|
register_keygen_parser(keygen_parser)
|
||||||
|
|
||||||
check_parser = subparser.add_parser(
|
check_parser = subparser.add_parser(
|
||||||
"check",
|
"check",
|
||||||
help="check if vars are up to date",
|
help="check if vars are up to date",
|
||||||
|
|||||||
49
pkgs/clan-cli/clan_cli/vars/keygen.py
Normal file
49
pkgs/clan-cli/clan_cli/vars/keygen.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
from clan_cli.clan_uri import FlakeId
|
||||||
|
from clan_cli.errors import ClanError
|
||||||
|
from clan_cli.secrets.key import generate_key
|
||||||
|
from clan_cli.secrets.users import add_user
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def keygen(user: str | None, flake: FlakeId, force: bool) -> None:
|
||||||
|
if user is None:
|
||||||
|
user = os.getenv("USER", None)
|
||||||
|
if not user:
|
||||||
|
msg = "No user provided and $USER is not set. Please provide a user via --user."
|
||||||
|
raise ClanError(msg)
|
||||||
|
pub_key = generate_key()
|
||||||
|
add_user(
|
||||||
|
flake_dir=flake.path / "vars",
|
||||||
|
name=user,
|
||||||
|
key=pub_key,
|
||||||
|
force=force,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _command(
|
||||||
|
args: argparse.Namespace,
|
||||||
|
) -> None:
|
||||||
|
keygen(
|
||||||
|
user=args.user,
|
||||||
|
flake=args.flake,
|
||||||
|
force=args.force,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def register_keygen_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
|
parser.add_argument(
|
||||||
|
"--user",
|
||||||
|
help="The user to generate the keys for. Default: $USER",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
"-f", "--force", help="overwrite existing user", action="store_true"
|
||||||
|
)
|
||||||
|
|
||||||
|
parser.set_defaults(func=_command)
|
||||||
@@ -37,6 +37,12 @@ class SopsKey:
|
|||||||
username: str
|
username: str
|
||||||
|
|
||||||
|
|
||||||
|
class MissingKeyError(ClanError):
|
||||||
|
def __init__(self) -> None:
|
||||||
|
msg = "Cannot find admin keys for current $USER on this computer. Please initialize admin keys once with 'clan vars keygen'"
|
||||||
|
super().__init__(msg)
|
||||||
|
|
||||||
|
|
||||||
class SecretStore(SecretStoreBase):
|
class SecretStore(SecretStoreBase):
|
||||||
def __init__(self, machine: Machine) -> None:
|
def __init__(self, machine: Machine) -> None:
|
||||||
self.machine = machine
|
self.machine = machine
|
||||||
@@ -124,22 +130,19 @@ class SecretStore(SecretStoreBase):
|
|||||||
def admin_key(self) -> SopsKey:
|
def admin_key(self) -> SopsKey:
|
||||||
pub_key = self.maybe_get_admin_public_key()
|
pub_key = self.maybe_get_admin_public_key()
|
||||||
if not pub_key:
|
if not pub_key:
|
||||||
msg = "No sops key found. Please generate one with 'clan secrets key generate'."
|
raise MissingKeyError
|
||||||
raise ClanError(msg)
|
|
||||||
# return SopsKey(pub_key, username="")
|
|
||||||
return self.ensure_user_or_machine(pub_key)
|
return self.ensure_user_or_machine(pub_key)
|
||||||
|
|
||||||
# TODO: find alternative to `clan secrets users add`
|
# TODO: find alternative to `clan secrets users add`
|
||||||
def ensure_user_or_machine(self, pub_key: str) -> SopsKey:
|
def ensure_user_or_machine(self, pub_key: str) -> SopsKey:
|
||||||
key = self.maybe_get_user_or_machine(pub_key)
|
key = self.maybe_get_user(pub_key)
|
||||||
if not key:
|
if not key:
|
||||||
msg = f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
raise MissingKeyError
|
||||||
raise ClanError(msg)
|
|
||||||
return key
|
return key
|
||||||
|
|
||||||
def maybe_get_user_or_machine(self, pub_key: str) -> SopsKey | None:
|
def maybe_get_user(self, pub_key: str) -> SopsKey | None:
|
||||||
key = SopsKey(pub_key, username="")
|
key = SopsKey(pub_key, username="")
|
||||||
folders = [self.sops_dir / "users", self.sops_dir / "machines"]
|
folders = [self.sops_dir / "users"]
|
||||||
|
|
||||||
for folder in folders:
|
for folder in folders:
|
||||||
if folder.exists():
|
if folder.exists():
|
||||||
|
|||||||
@@ -866,3 +866,23 @@ def test_fails_when_files_are_left_from_other_backend(
|
|||||||
generator,
|
generator,
|
||||||
regenerate=False,
|
regenerate=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.impure
|
||||||
|
def test_keygen(
|
||||||
|
monkeypatch: pytest.MonkeyPatch,
|
||||||
|
temporary_home: Path,
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.chdir(temporary_home)
|
||||||
|
cli.run(["vars", "keygen", "--flake", str(temporary_home), "--user", "user"])
|
||||||
|
# check public key exists
|
||||||
|
assert (temporary_home / "vars" / "sops" / "users" / "user").is_dir()
|
||||||
|
# check private key exists
|
||||||
|
assert (temporary_home / ".config" / "sops" / "age" / "keys.txt").is_file()
|
||||||
|
# it should still work, even if the keys already exist
|
||||||
|
import shutil
|
||||||
|
|
||||||
|
shutil.rmtree(temporary_home / "vars" / "sops" / "users" / "user")
|
||||||
|
cli.run(["vars", "keygen", "--flake", str(temporary_home), "--user", "user"])
|
||||||
|
# check public key exists
|
||||||
|
assert (temporary_home / "vars" / "sops" / "users" / "user").is_dir()
|
||||||
|
|||||||
Reference in New Issue
Block a user