sops: add explicit commands to generate secrets
This commit is contained in:
@@ -4,6 +4,7 @@ import argparse
|
|||||||
from .generate import register_generate_parser
|
from .generate import register_generate_parser
|
||||||
from .groups import register_groups_parser
|
from .groups import register_groups_parser
|
||||||
from .import_sops import register_import_sops_parser
|
from .import_sops import register_import_sops_parser
|
||||||
|
from .key import register_key_parser
|
||||||
from .machines import register_machines_parser
|
from .machines import register_machines_parser
|
||||||
from .secrets import register_secrets_parser
|
from .secrets import register_secrets_parser
|
||||||
from .users import register_users_parser
|
from .users import register_users_parser
|
||||||
@@ -35,4 +36,7 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
)
|
)
|
||||||
register_generate_parser(parser_generate)
|
register_generate_parser(parser_generate)
|
||||||
|
|
||||||
|
parser_key = subparser.add_parser("key", help="create and show age keys")
|
||||||
|
register_key_parser(parser_key)
|
||||||
|
|
||||||
register_secrets_parser(subparser)
|
register_secrets_parser(subparser)
|
||||||
|
|||||||
48
pkgs/clan-cli/clan_cli/secrets/key.py
Normal file
48
pkgs/clan-cli/clan_cli/secrets/key.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
import argparse
|
||||||
|
|
||||||
|
from .. import tty
|
||||||
|
from ..errors import ClanError
|
||||||
|
from .sops import default_sops_key_path, generate_private_key, get_public_key
|
||||||
|
|
||||||
|
|
||||||
|
def generate_key() -> str:
|
||||||
|
path = default_sops_key_path()
|
||||||
|
if path.exists():
|
||||||
|
raise ClanError(f"Key already exists at {path}")
|
||||||
|
generate_private_key(path)
|
||||||
|
pub_key = get_public_key(path.read_text())
|
||||||
|
return pub_key
|
||||||
|
|
||||||
|
|
||||||
|
def show_key() -> str:
|
||||||
|
return get_public_key(default_sops_key_path().read_text())
|
||||||
|
|
||||||
|
|
||||||
|
def generate_command(args: argparse.Namespace) -> None:
|
||||||
|
pub_key = generate_key()
|
||||||
|
tty.info(
|
||||||
|
f"Generated age private key at '{default_sops_key_path()}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
||||||
|
)
|
||||||
|
tty.info(
|
||||||
|
f"Also add your age public key to the repository with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
||||||
|
)
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def show_command(args: argparse.Namespace) -> None:
|
||||||
|
print(show_key())
|
||||||
|
|
||||||
|
|
||||||
|
def register_key_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
|
subparser = parser.add_subparsers(
|
||||||
|
title="command",
|
||||||
|
description="the command to run",
|
||||||
|
help="the command to run",
|
||||||
|
required=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
parser_generate = subparser.add_parser("generate", help="generate age key")
|
||||||
|
parser_generate.set_defaults(func=generate_command)
|
||||||
|
|
||||||
|
parser_show = subparser.add_parser("show", help="show age public key")
|
||||||
|
parser_show.set_defaults(func=show_command)
|
||||||
@@ -7,11 +7,10 @@ from pathlib import Path
|
|||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from typing import IO, Iterator, Union
|
from typing import IO, Iterator, Union
|
||||||
|
|
||||||
from .. import tty
|
|
||||||
from ..dirs import user_config_dir
|
from ..dirs import user_config_dir
|
||||||
from ..errors import ClanError
|
from ..errors import ClanError
|
||||||
from ..nix import nix_shell
|
from ..nix import nix_shell
|
||||||
from .folders import sops_users_folder
|
from .folders import sops_machines_folder, sops_users_folder
|
||||||
|
|
||||||
|
|
||||||
class SopsKey:
|
class SopsKey:
|
||||||
@@ -31,15 +30,10 @@ def get_public_key(privkey: str) -> str:
|
|||||||
return res.stdout.strip()
|
return res.stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
def get_unique_user(users_folder: Path, user: str) -> str:
|
def generate_private_key(path: Path) -> None:
|
||||||
"""Return a unique path in the users_folder for the given user."""
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
i = 0
|
cmd = nix_shell(["age"], ["age-keygen", "-o", str(path)])
|
||||||
path = users_folder / user
|
subprocess.run(cmd, check=True)
|
||||||
while path.exists():
|
|
||||||
i += 1
|
|
||||||
user = user + str(i)
|
|
||||||
path = users_folder / user
|
|
||||||
return user
|
|
||||||
|
|
||||||
|
|
||||||
def get_user_name(user: str) -> str:
|
def get_user_name(user: str) -> str:
|
||||||
@@ -55,57 +49,42 @@ def get_user_name(user: str) -> str:
|
|||||||
print(f"{sops_users_folder() / user} already exists")
|
print(f"{sops_users_folder() / user} already exists")
|
||||||
|
|
||||||
|
|
||||||
def ensure_user(pub_key: str) -> SopsKey:
|
def ensure_user_or_machine(pub_key: str) -> SopsKey:
|
||||||
key = SopsKey(pub_key, username="")
|
key = SopsKey(pub_key, username="")
|
||||||
users_folder = sops_users_folder()
|
folders = [sops_users_folder(), sops_machines_folder()]
|
||||||
|
for folder in folders:
|
||||||
|
if folder.exists():
|
||||||
|
for user in folder.iterdir():
|
||||||
|
if not user.is_dir():
|
||||||
|
continue
|
||||||
|
if read_key(user) == pub_key:
|
||||||
|
key.username = user.name
|
||||||
|
return key
|
||||||
|
|
||||||
# Check if the public key already exists for any user
|
raise ClanError(
|
||||||
if users_folder.exists():
|
f"Your sops key is not yet added to the repository. Please add it with 'clan secrets users add youruser {pub_key}' (replace youruser with your user name)"
|
||||||
for user in users_folder.iterdir():
|
)
|
||||||
if not user.is_dir():
|
|
||||||
continue
|
|
||||||
if read_key(user) == pub_key:
|
|
||||||
key.username = user.name
|
|
||||||
return key
|
|
||||||
|
|
||||||
# Find a unique user name if the public key is not found
|
|
||||||
try:
|
|
||||||
loginname = os.getlogin()
|
|
||||||
except OSError:
|
|
||||||
loginname = os.environ.get("USER", "nobody")
|
|
||||||
username = get_unique_user(users_folder, loginname)
|
|
||||||
|
|
||||||
if tty.is_interactive():
|
def default_sops_key_path() -> Path:
|
||||||
# Ask the user for their name until a unique one is provided
|
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
||||||
username = get_user_name(username)
|
if raw_path:
|
||||||
|
return Path(raw_path)
|
||||||
# Add the public key for the user
|
else:
|
||||||
write_key(users_folder / username, pub_key, False)
|
return user_config_dir() / "sops" / "age" / "keys.txt"
|
||||||
|
|
||||||
key.username = username
|
|
||||||
|
|
||||||
return key
|
|
||||||
|
|
||||||
|
|
||||||
def ensure_sops_key() -> SopsKey:
|
def ensure_sops_key() -> SopsKey:
|
||||||
key = os.environ.get("SOPS_AGE_KEY")
|
key = os.environ.get("SOPS_AGE_KEY")
|
||||||
if key:
|
if key:
|
||||||
return ensure_user(get_public_key(key))
|
return ensure_user_or_machine(get_public_key(key))
|
||||||
raw_path = os.environ.get("SOPS_AGE_KEY_FILE")
|
path = default_sops_key_path()
|
||||||
if raw_path:
|
|
||||||
path = Path(raw_path)
|
|
||||||
else:
|
|
||||||
path = user_config_dir() / "sops" / "age" / "keys.txt"
|
|
||||||
if path.exists():
|
if path.exists():
|
||||||
return ensure_user(get_public_key(path.read_text()))
|
return ensure_user_or_machine(get_public_key(path.read_text()))
|
||||||
path.parent.mkdir(parents=True, exist_ok=True)
|
else:
|
||||||
cmd = nix_shell(["age"], ["age-keygen", "-o", str(path)])
|
raise ClanError(
|
||||||
subprocess.run(cmd, check=True)
|
"No sops key found. Please generate one with 'clan secrets key generate'."
|
||||||
|
)
|
||||||
tty.info(
|
|
||||||
f"Generated age key at '{path}' for your user. Please back it up on a secure location or you will lose access to your secrets."
|
|
||||||
)
|
|
||||||
return ensure_user(get_public_key(path.read_text()))
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
|||||||
@@ -126,6 +126,12 @@ def test_secrets(
|
|||||||
|
|
||||||
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
|
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
|
||||||
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(clan_flake / ".." / "age.key"))
|
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(clan_flake / ".." / "age.key"))
|
||||||
|
cli.run(["secrets", "key", "generate"])
|
||||||
|
capsys.readouterr() # empty the buffer
|
||||||
|
cli.run(["secrets", "key", "show"])
|
||||||
|
key = capsys.readouterr().out
|
||||||
|
assert key.startswith("age1")
|
||||||
|
cli.run(["secrets", "users", "add", "testuser", key])
|
||||||
|
|
||||||
with pytest.raises(ClanError): # does not exist yet
|
with pytest.raises(ClanError): # does not exist yet
|
||||||
cli.run(["secrets", "get", "nonexisting"])
|
cli.run(["secrets", "get", "nonexisting"])
|
||||||
@@ -151,6 +157,7 @@ def test_secrets(
|
|||||||
with use_key(age_keys[0].privkey, monkeypatch):
|
with use_key(age_keys[0].privkey, monkeypatch):
|
||||||
capsys.readouterr()
|
capsys.readouterr()
|
||||||
cli.run(["secrets", "get", "key"])
|
cli.run(["secrets", "get", "key"])
|
||||||
|
|
||||||
assert capsys.readouterr().out == "foo"
|
assert capsys.readouterr().out == "foo"
|
||||||
|
|
||||||
cli.run(["secrets", "machines", "remove-secret", "machine1", "key"])
|
cli.run(["secrets", "machines", "remove-secret", "machine1", "key"])
|
||||||
|
|||||||
Reference in New Issue
Block a user