clan-cli secrets: flake_name -> flake_dir
This commit is contained in:
@@ -8,7 +8,6 @@ from typing import IO
|
||||
|
||||
from .. import tty
|
||||
from ..errors import ClanError
|
||||
from ..types import FlakeName
|
||||
from .folders import (
|
||||
list_objects,
|
||||
sops_groups_folder,
|
||||
@@ -54,36 +53,36 @@ def collect_keys_for_path(path: Path) -> set[str]:
|
||||
|
||||
|
||||
def encrypt_secret(
|
||||
flake_name: FlakeName,
|
||||
flake_dir: Path,
|
||||
secret: Path,
|
||||
value: IO[str] | str | None,
|
||||
add_users: list[str] = [],
|
||||
add_machines: list[str] = [],
|
||||
add_groups: list[str] = [],
|
||||
) -> None:
|
||||
key = ensure_sops_key(flake_name)
|
||||
key = ensure_sops_key(flake_dir)
|
||||
keys = set([])
|
||||
|
||||
for user in add_users:
|
||||
allow_member(
|
||||
users_folder(flake_name, secret.name),
|
||||
sops_users_folder(flake_name),
|
||||
users_folder(flake_dir, secret.name),
|
||||
sops_users_folder(flake_dir),
|
||||
user,
|
||||
False,
|
||||
)
|
||||
|
||||
for machine in add_machines:
|
||||
allow_member(
|
||||
machines_folder(flake_name, secret.name),
|
||||
sops_machines_folder(flake_name),
|
||||
machines_folder(flake_dir, secret.name),
|
||||
sops_machines_folder(flake_dir),
|
||||
machine,
|
||||
False,
|
||||
)
|
||||
|
||||
for group in add_groups:
|
||||
allow_member(
|
||||
groups_folder(flake_name, secret.name),
|
||||
sops_groups_folder(flake_name),
|
||||
groups_folder(flake_dir, secret.name),
|
||||
sops_groups_folder(flake_dir),
|
||||
group,
|
||||
False,
|
||||
)
|
||||
@@ -93,8 +92,8 @@ def encrypt_secret(
|
||||
if key.pubkey not in keys:
|
||||
keys.add(key.pubkey)
|
||||
allow_member(
|
||||
users_folder(flake_name, secret.name),
|
||||
sops_users_folder(flake_name),
|
||||
users_folder(flake_dir, secret.name),
|
||||
sops_users_folder(flake_dir),
|
||||
key.username,
|
||||
False,
|
||||
)
|
||||
@@ -102,31 +101,31 @@ def encrypt_secret(
|
||||
encrypt_file(secret / "secret", value, list(sorted(keys)))
|
||||
|
||||
|
||||
def remove_secret(flake_name: FlakeName, secret: str) -> None:
|
||||
path = sops_secrets_folder(flake_name) / secret
|
||||
def remove_secret(flake_dir: Path, secret: str) -> None:
|
||||
path = sops_secrets_folder(flake_dir) / secret
|
||||
if not path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
shutil.rmtree(path)
|
||||
|
||||
|
||||
def remove_command(args: argparse.Namespace) -> None:
|
||||
remove_secret(args.flake, args.secret)
|
||||
remove_secret(Path(args.flake), args.secret)
|
||||
|
||||
|
||||
def add_secret_argument(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("secret", help="the name of the secret", type=secret_name_type)
|
||||
|
||||
|
||||
def machines_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "machines"
|
||||
def machines_folder(flake_dir: Path, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_dir) / group / "machines"
|
||||
|
||||
|
||||
def users_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "users"
|
||||
def users_folder(flake_dir: Path, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_dir) / group / "users"
|
||||
|
||||
|
||||
def groups_folder(flake_name: FlakeName, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_name) / group / "groups"
|
||||
def groups_folder(flake_dir: Path, group: str) -> Path:
|
||||
return sops_secrets_folder(flake_dir) / group / "groups"
|
||||
|
||||
|
||||
def list_directory(directory: Path) -> str:
|
||||
@@ -189,37 +188,35 @@ def disallow_member(group_folder: Path, name: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def has_secret(flake_name: FlakeName, secret: str) -> bool:
|
||||
return (sops_secrets_folder(flake_name) / secret / "secret").exists()
|
||||
def has_secret(flake_dir: Path, secret: str) -> bool:
|
||||
return (sops_secrets_folder(flake_dir) / secret / "secret").exists()
|
||||
|
||||
|
||||
def list_secrets(flake_name: FlakeName) -> list[str]:
|
||||
path = sops_secrets_folder(flake_name)
|
||||
def list_secrets(flake_dir: Path) -> list[str]:
|
||||
path = sops_secrets_folder(flake_dir)
|
||||
|
||||
def validate(name: str) -> bool:
|
||||
return VALID_SECRET_NAME.match(name) is not None and has_secret(
|
||||
flake_name, name
|
||||
)
|
||||
return VALID_SECRET_NAME.match(name) is not None and has_secret(flake_dir, name)
|
||||
|
||||
return list_objects(path, validate)
|
||||
|
||||
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
lst = list_secrets(args.flake)
|
||||
lst = list_secrets(Path(args.flake))
|
||||
if len(lst) > 0:
|
||||
print("\n".join(lst))
|
||||
|
||||
|
||||
def decrypt_secret(flake_name: FlakeName, secret: str) -> str:
|
||||
ensure_sops_key(flake_name)
|
||||
secret_path = sops_secrets_folder(flake_name) / secret / "secret"
|
||||
def decrypt_secret(flake_dir: Path, secret: str) -> str:
|
||||
ensure_sops_key(flake_dir)
|
||||
secret_path = sops_secrets_folder(flake_dir) / secret / "secret"
|
||||
if not secret_path.exists():
|
||||
raise ClanError(f"Secret '{secret}' does not exist")
|
||||
return decrypt_file(secret_path)
|
||||
|
||||
|
||||
def get_command(args: argparse.Namespace) -> None:
|
||||
print(decrypt_secret(args.flake, args.secret), end="")
|
||||
print(decrypt_secret(Path(args.flake), args.secret), end="")
|
||||
|
||||
|
||||
def set_command(args: argparse.Namespace) -> None:
|
||||
@@ -232,8 +229,8 @@ def set_command(args: argparse.Namespace) -> None:
|
||||
elif tty.is_interactive():
|
||||
secret_value = getpass.getpass(prompt="Paste your secret: ")
|
||||
encrypt_secret(
|
||||
args.flake,
|
||||
sops_secrets_folder(args.flake) / args.secret,
|
||||
Path(args.flake),
|
||||
sops_secrets_folder(Path(args.flake)) / args.secret,
|
||||
secret_value,
|
||||
args.user,
|
||||
args.machine,
|
||||
@@ -242,8 +239,8 @@ def set_command(args: argparse.Namespace) -> None:
|
||||
|
||||
|
||||
def rename_command(args: argparse.Namespace) -> None:
|
||||
old_path = sops_secrets_folder(args.flake) / args.secret
|
||||
new_path = sops_secrets_folder(args.flake) / args.new_name
|
||||
old_path = sops_secrets_folder(Path(args.flake)) / args.secret
|
||||
new_path = sops_secrets_folder(Path(args.flake)) / args.new_name
|
||||
if not old_path.exists():
|
||||
raise ClanError(f"Secret '{args.secret}' does not exist")
|
||||
if new_path.exists():
|
||||
@@ -253,20 +250,10 @@ def rename_command(args: argparse.Namespace) -> None:
|
||||
|
||||
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
parser_list = subparser.add_parser("list", help="list secrets")
|
||||
parser_list.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_list.set_defaults(func=list_command)
|
||||
|
||||
parser_get = subparser.add_parser("get", help="get a secret")
|
||||
add_secret_argument(parser_get)
|
||||
parser_get.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_get.set_defaults(func=get_command)
|
||||
|
||||
parser_set = subparser.add_parser("set", help="set a secret")
|
||||
@@ -299,28 +286,13 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
default=False,
|
||||
help="edit the secret with $EDITOR instead of pasting it",
|
||||
)
|
||||
parser_set.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_set.set_defaults(func=set_command)
|
||||
|
||||
parser_rename = subparser.add_parser("rename", help="rename a secret")
|
||||
add_secret_argument(parser_rename)
|
||||
parser_rename.add_argument("new_name", type=str, help="the new name of the secret")
|
||||
parser_rename.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_rename.set_defaults(func=rename_command)
|
||||
|
||||
parser_remove = subparser.add_parser("remove", help="remove a secret")
|
||||
add_secret_argument(parser_remove)
|
||||
parser_remove.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_remove.set_defaults(func=remove_command)
|
||||
|
||||
Reference in New Issue
Block a user