Secrets tests passing. nix fmt doesn't complain
This commit is contained in:
@@ -6,6 +6,11 @@ from typing import Optional
|
||||
from . import config, flakes, join, machines, secrets, vms, webui
|
||||
from .ssh import cli as ssh_cli
|
||||
|
||||
import logging
|
||||
from .custom_logger import register
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
argcomplete: Optional[ModuleType] = None
|
||||
try:
|
||||
import argcomplete # type: ignore[no-redef]
|
||||
@@ -52,6 +57,10 @@ def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser:
|
||||
parser_vms = subparsers.add_parser("vms", help="manage virtual machines")
|
||||
vms.register_parser(parser_vms)
|
||||
|
||||
# if args.debug:
|
||||
register(logging.DEBUG)
|
||||
log.debug("Debug log activated")
|
||||
|
||||
if argcomplete:
|
||||
argcomplete.autocomplete(parser)
|
||||
|
||||
@@ -65,6 +74,8 @@ def main() -> None:
|
||||
parser = create_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
|
||||
if not hasattr(args, "func"):
|
||||
return
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
from pathlib import Path
|
||||
|
||||
grey = "\x1b[38;20m"
|
||||
yellow = "\x1b[33;20m"
|
||||
@@ -9,11 +10,14 @@ green = "\u001b[32m"
|
||||
blue = "\u001b[34m"
|
||||
|
||||
|
||||
def get_formatter(color: str) -> logging.Formatter:
|
||||
reset = "\x1b[0m"
|
||||
return logging.Formatter(
|
||||
f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s"
|
||||
)
|
||||
def get_formatter(color: str) -> Callable[[logging.LogRecord], logging.Formatter]:
|
||||
def myformatter(record: logging.LogRecord) -> logging.Formatter:
|
||||
reset = "\x1b[0m"
|
||||
filepath = Path(record.pathname).resolve()
|
||||
return logging.Formatter(
|
||||
f"{filepath}:%(lineno)d::%(funcName)s\n{color}%(levelname)s{reset}: %(message)s"
|
||||
)
|
||||
return myformatter
|
||||
|
||||
|
||||
FORMATTER = {
|
||||
@@ -26,8 +30,8 @@ FORMATTER = {
|
||||
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
def format(self, record: Any) -> str:
|
||||
return FORMATTER[record.levelno].format(record)
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
return FORMATTER[record.levelno](record).format(record)
|
||||
|
||||
|
||||
def register(level: Any) -> None:
|
||||
|
||||
@@ -2,10 +2,12 @@ import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
import logging
|
||||
|
||||
from .errors import ClanError
|
||||
from .types import FlakeName
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def _get_clan_flake_toplevel() -> Path:
|
||||
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
|
||||
@@ -51,28 +53,31 @@ def user_data_dir() -> Path:
|
||||
def clan_data_dir() -> Path:
|
||||
path = user_data_dir() / "clan"
|
||||
if not path.exists():
|
||||
path.mkdir()
|
||||
log.debug(f"Creating path with parents {path}")
|
||||
path.mkdir(parents=True)
|
||||
return path.resolve()
|
||||
|
||||
|
||||
def clan_config_dir() -> Path:
|
||||
path = user_config_dir() / "clan"
|
||||
if not path.exists():
|
||||
path.mkdir()
|
||||
log.debug(f"Creating path with parents {path}")
|
||||
path.mkdir(parents=True)
|
||||
return path.resolve()
|
||||
|
||||
|
||||
def clan_flakes_dir() -> Path:
|
||||
path = clan_data_dir() / "flake"
|
||||
if not path.exists():
|
||||
path.mkdir()
|
||||
log.debug(f"Creating path with parents {path}")
|
||||
path.mkdir(parents=True)
|
||||
return path.resolve()
|
||||
|
||||
|
||||
def specific_flake_dir(flake_name: FlakeName) -> Path:
|
||||
flake_dir = clan_flakes_dir() / flake_name
|
||||
if not flake_dir.exists():
|
||||
raise ClanError(f"Flake {flake_name} does not exist")
|
||||
raise ClanError(f"Flake '{flake_name}' does not exist")
|
||||
return flake_dir
|
||||
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ async def create_flake(directory: Path, url: AnyUrl) -> Dict[str, CmdOut]:
|
||||
|
||||
def create_flake_command(args: argparse.Namespace) -> None:
|
||||
flake_dir = clan_flakes_dir() / args.name
|
||||
runforcli(create_flake, flake_dir, DEFAULT_URL)
|
||||
runforcli(create_flake, flake_dir, args.url)
|
||||
|
||||
|
||||
# takes a (sub)parser and configures it
|
||||
@@ -65,5 +65,11 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
|
||||
type=str,
|
||||
help="name for the flake",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--url",
|
||||
type=AnyUrl,
|
||||
help="url for the flake",
|
||||
default=DEFAULT_URL,
|
||||
)
|
||||
# parser.add_argument("name", type=str, help="name of the flake")
|
||||
parser.set_defaults(func=create_flake_command)
|
||||
|
||||
@@ -204,9 +204,17 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
help="the command to run",
|
||||
required=True,
|
||||
)
|
||||
|
||||
# List groups
|
||||
list_parser = subparser.add_parser("list", help="list groups")
|
||||
list_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
list_parser.set_defaults(func=list_command)
|
||||
|
||||
# Add user
|
||||
add_machine_parser = subparser.add_parser(
|
||||
"add-machine", help="add a machine to group"
|
||||
)
|
||||
@@ -214,8 +222,14 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
add_machine_parser.add_argument(
|
||||
"machine", help="the name of the machines to add", type=machine_name_type
|
||||
)
|
||||
add_machine_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_machine_parser.set_defaults(func=add_machine_command)
|
||||
|
||||
# Remove machine
|
||||
remove_machine_parser = subparser.add_parser(
|
||||
"remove-machine", help="remove a machine from group"
|
||||
)
|
||||
@@ -223,15 +237,27 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
remove_machine_parser.add_argument(
|
||||
"machine", help="the name of the machines to remove", type=machine_name_type
|
||||
)
|
||||
remove_machine_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_machine_parser.set_defaults(func=remove_machine_command)
|
||||
|
||||
# Add user
|
||||
add_user_parser = subparser.add_parser("add-user", help="add a user to group")
|
||||
add_group_argument(add_user_parser)
|
||||
add_user_parser.add_argument(
|
||||
"user", help="the name of the user to add", type=user_name_type
|
||||
)
|
||||
add_user_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_user_parser.set_defaults(func=add_user_command)
|
||||
|
||||
# Remove user
|
||||
remove_user_parser = subparser.add_parser(
|
||||
"remove-user", help="remove a user from group"
|
||||
)
|
||||
@@ -239,8 +265,14 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
remove_user_parser.add_argument(
|
||||
"user", help="the name of the user to remove", type=user_name_type
|
||||
)
|
||||
remove_user_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_user_parser.set_defaults(func=remove_user_command)
|
||||
|
||||
# Add secret
|
||||
add_secret_parser = subparser.add_parser(
|
||||
"add-secret", help="allow a user to access a secret"
|
||||
)
|
||||
@@ -250,8 +282,14 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
add_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_secret_parser.set_defaults(func=add_secret_command)
|
||||
|
||||
# Remove secret
|
||||
remove_secret_parser = subparser.add_parser(
|
||||
"remove-secret", help="remove a group's access to a secret"
|
||||
)
|
||||
@@ -261,4 +299,9 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
|
||||
remove_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||
|
||||
@@ -96,11 +96,6 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
add_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
@@ -109,6 +104,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
help="public key or private key of the user",
|
||||
type=public_or_private_age_key_type,
|
||||
)
|
||||
add_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_parser.set_defaults(func=add_command)
|
||||
|
||||
# Parser
|
||||
@@ -125,46 +125,46 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
||||
# Parser
|
||||
remove_parser = subparser.add_parser("remove", help="remove a machine")
|
||||
remove_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
remove_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
remove_parser.set_defaults(func=remove_command)
|
||||
|
||||
# Parser
|
||||
add_secret_parser = subparser.add_parser(
|
||||
"add-secret", help="allow a machine to access a secret"
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"machine", help="the name of the machine", type=machine_name_type
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_secret_parser.set_defaults(func=add_secret_command)
|
||||
|
||||
# Parser
|
||||
remove_secret_parser = subparser.add_parser(
|
||||
"remove-secret", help="remove a group's access to a secret"
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"machine", help="the name of the group", type=machine_name_type
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||
|
||||
@@ -253,24 +253,25 @@ def rename_command(args: argparse.Namespace) -> None:
|
||||
|
||||
def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
parser_list = subparser.add_parser("list", help="list secrets")
|
||||
parser_list.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_list.set_defaults(func=list_command)
|
||||
|
||||
parser_get = subparser.add_parser("get", help="get a secret")
|
||||
add_secret_argument(parser_get)
|
||||
parser_get.set_defaults(func=get_command)
|
||||
parser_get.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_get.set_defaults(func=get_command)
|
||||
|
||||
|
||||
parser_set = subparser.add_parser("set", help="set a secret")
|
||||
add_secret_argument(parser_set)
|
||||
parser_set.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_set.add_argument(
|
||||
"--group",
|
||||
type=str,
|
||||
@@ -299,13 +300,29 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
|
||||
default=False,
|
||||
help="edit the secret with $EDITOR instead of pasting it",
|
||||
)
|
||||
parser_set.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_set.set_defaults(func=set_command)
|
||||
|
||||
parser_rename = subparser.add_parser("rename", help="rename a secret")
|
||||
add_secret_argument(parser_rename)
|
||||
parser_rename.add_argument("new_name", type=str, help="the new name of the secret")
|
||||
parser_rename.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_rename.set_defaults(func=rename_command)
|
||||
|
||||
|
||||
parser_remove = subparser.add_parser("remove", help="remove a secret")
|
||||
add_secret_argument(parser_remove)
|
||||
parser_remove.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
parser_remove.set_defaults(func=remove_command)
|
||||
|
||||
@@ -131,6 +131,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
add_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
add_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
add_secret_parser.set_defaults(func=add_secret_command)
|
||||
|
||||
remove_secret_parser = subparser.add_parser(
|
||||
@@ -142,4 +147,9 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
|
||||
remove_secret_parser.add_argument(
|
||||
"secret", help="the name of the secret", type=secret_name_type
|
||||
)
|
||||
remove_secret_parser.add_argument(
|
||||
"flake",
|
||||
type=str,
|
||||
help="name of the flake to create machine for",
|
||||
)
|
||||
remove_secret_parser.set_defaults(func=remove_secret_command)
|
||||
|
||||
@@ -20,7 +20,8 @@ clan_cli = [ "config/jsonschema/*", "webui/assets/**/*"]
|
||||
testpaths = "tests"
|
||||
faulthandler_timeout = 60
|
||||
log_level = "DEBUG"
|
||||
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail -n auto --durations 5 --maxfail=1 --new-first"
|
||||
log_format = "%(pathname)s:%(lineno)d::%(funcName)s\n %(levelname)s: %(message)s\n"
|
||||
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --maxfail=1 --new-first -nauto" # Add --pdb for debugging
|
||||
norecursedirs = "tests/helpers"
|
||||
markers = [ "impure" ]
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import fileinput
|
||||
import logging
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
@@ -10,6 +11,8 @@ from root import CLAN_CORE
|
||||
from clan_cli.dirs import nixpkgs_source
|
||||
from clan_cli.types import FlakeName
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# substitutes string sin a file.
|
||||
# This can be used on the flake.nix or default.nix of a machine
|
||||
@@ -35,6 +38,7 @@ class FlakeForTest(NamedTuple):
|
||||
|
||||
def create_flake(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_dir: Path,
|
||||
flake_name: FlakeName,
|
||||
clan_core_flake: Path | None = None,
|
||||
machines: list[str] = [],
|
||||
@@ -45,56 +49,67 @@ def create_flake(
|
||||
The machine names map to the machines in ./test_machines
|
||||
"""
|
||||
template = Path(__file__).parent / flake_name
|
||||
|
||||
# copy the template to a new temporary location
|
||||
with tempfile.TemporaryDirectory() as tmpdir_:
|
||||
home = Path(tmpdir_)
|
||||
flake = home / flake_name
|
||||
shutil.copytree(template, flake)
|
||||
# lookup the requested machines in ./test_machines and include them
|
||||
if machines:
|
||||
(flake / "machines").mkdir(parents=True, exist_ok=True)
|
||||
for machine_name in machines:
|
||||
machine_path = Path(__file__).parent / "machines" / machine_name
|
||||
shutil.copytree(machine_path, flake / "machines" / machine_name)
|
||||
substitute(flake / "machines" / machine_name / "default.nix", flake)
|
||||
# in the flake.nix file replace the string __CLAN_URL__ with the the clan flake
|
||||
# provided by get_test_flake_toplevel
|
||||
flake_nix = flake / "flake.nix"
|
||||
# this is where we would install the sops key to, when updating
|
||||
substitute(flake_nix, clan_core_flake, flake)
|
||||
if remote:
|
||||
with tempfile.TemporaryDirectory() as workdir:
|
||||
monkeypatch.chdir(workdir)
|
||||
monkeypatch.setenv("HOME", str(home))
|
||||
yield FlakeForTest(flake_name, flake)
|
||||
else:
|
||||
monkeypatch.chdir(flake)
|
||||
home = Path(temporary_dir)
|
||||
flake = home / ".local/state/clan/flake" / flake_name
|
||||
shutil.copytree(template, flake)
|
||||
|
||||
# lookup the requested machines in ./test_machines and include them
|
||||
if machines:
|
||||
(flake / "machines").mkdir(parents=True, exist_ok=True)
|
||||
for machine_name in machines:
|
||||
machine_path = Path(__file__).parent / "machines" / machine_name
|
||||
shutil.copytree(machine_path, flake / "machines" / machine_name)
|
||||
substitute(flake / "machines" / machine_name / "default.nix", flake)
|
||||
# in the flake.nix file replace the string __CLAN_URL__ with the the clan flake
|
||||
# provided by get_test_flake_toplevel
|
||||
flake_nix = flake / "flake.nix"
|
||||
# this is where we would install the sops key to, when updating
|
||||
substitute(flake_nix, clan_core_flake, flake)
|
||||
if remote:
|
||||
with tempfile.TemporaryDirectory() as workdir:
|
||||
monkeypatch.chdir(workdir)
|
||||
monkeypatch.setenv("HOME", str(home))
|
||||
yield FlakeForTest(flake_name, flake)
|
||||
else:
|
||||
monkeypatch.chdir(flake)
|
||||
monkeypatch.setenv("HOME", str(home))
|
||||
yield FlakeForTest(flake_name, flake)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_flake(monkeypatch: pytest.MonkeyPatch) -> Iterator[FlakeForTest]:
|
||||
yield from create_flake(monkeypatch, FlakeName("test_flake"))
|
||||
def test_flake(
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
|
||||
) -> Iterator[FlakeForTest]:
|
||||
yield from create_flake(monkeypatch, temporary_dir, FlakeName("test_flake"))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_flake_with_core(monkeypatch: pytest.MonkeyPatch) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
raise Exception(
|
||||
"clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
)
|
||||
yield from create_flake(monkeypatch, FlakeName("test_flake_with_core"), CLAN_CORE)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_flake_with_core_and_pass(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
def test_flake_with_core(
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
raise Exception(
|
||||
"clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
)
|
||||
yield from create_flake(
|
||||
monkeypatch, FlakeName("test_flake_with_core_and_pass"), CLAN_CORE
|
||||
monkeypatch, temporary_dir, FlakeName("test_flake_with_core"), CLAN_CORE
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_flake_with_core_and_pass(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
temporary_dir: Path,
|
||||
) -> Iterator[FlakeForTest]:
|
||||
if not (CLAN_CORE / "flake.nix").exists():
|
||||
raise Exception(
|
||||
"clan-core flake not found. This test requires the clan-core flake to be present"
|
||||
)
|
||||
yield from create_flake(
|
||||
monkeypatch,
|
||||
temporary_dir,
|
||||
FlakeName("test_flake_with_core_and_pass"),
|
||||
CLAN_CORE,
|
||||
)
|
||||
|
||||
@@ -1,16 +1,23 @@
|
||||
import argparse
|
||||
import inspect
|
||||
import logging
|
||||
import shlex
|
||||
|
||||
from clan_cli import create_parser
|
||||
import logging
|
||||
import sys
|
||||
import shlex
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
import inspect
|
||||
|
||||
def get_caller() -> str:
|
||||
frame = inspect.currentframe()
|
||||
caller_frame = frame.f_back.f_back
|
||||
if frame is None:
|
||||
return "unknown"
|
||||
caller_frame = frame.f_back
|
||||
if caller_frame is None:
|
||||
return "unknown"
|
||||
caller_frame = caller_frame.f_back
|
||||
if caller_frame is None:
|
||||
return "unknown"
|
||||
frame_info = inspect.getframeinfo(caller_frame)
|
||||
ret = f"{frame_info.filename}:{frame_info.lineno}::{frame_info.function}"
|
||||
return ret
|
||||
@@ -22,7 +29,7 @@ class Cli:
|
||||
|
||||
def run(self, args: list[str]) -> argparse.Namespace:
|
||||
cmd = shlex.join(["clan"] + args)
|
||||
log.debug(f"Command: {cmd}")
|
||||
log.debug(f"$ {cmd}")
|
||||
log.debug(f"Caller {get_caller()}")
|
||||
parsed = self.parser.parse_args(args)
|
||||
if hasattr(parsed, "func"):
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
@@ -5,14 +6,17 @@ from typing import Iterator
|
||||
|
||||
import pytest
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temporary_dir() -> Iterator[Path]:
|
||||
if os.getenv("TEST_KEEP_TEMPORARY_DIR"):
|
||||
if os.getenv("TEST_KEEP_TEMPORARY_DIR") is not None:
|
||||
temp_dir = tempfile.mkdtemp(prefix="pytest-")
|
||||
path = Path(temp_dir)
|
||||
log.info("Keeping temporary test directory: ", path)
|
||||
yield path
|
||||
print("=========> Keeping temporary directory: ", path)
|
||||
else:
|
||||
log.debug("TEST_KEEP_TEMPORARY_DIR not set, using TemporaryDirectory")
|
||||
with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath:
|
||||
yield Path(dirpath)
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
import logging
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Iterator
|
||||
|
||||
import logging
|
||||
import pytest
|
||||
from cli import Cli
|
||||
from fixtures_flakes import FlakeForTest
|
||||
@@ -28,7 +27,7 @@ def _test_identities(
|
||||
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
|
||||
assert (sops_folder / what / "foo" / "key.json").exists()
|
||||
with pytest.raises(ClanError):
|
||||
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey])
|
||||
cli.run(["secrets", what, "add", "foo", age_keys[0].pubkey, test_flake.name])
|
||||
|
||||
cli.run(
|
||||
[
|
||||
@@ -43,23 +42,23 @@ def _test_identities(
|
||||
)
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", what, "get", "foo"])
|
||||
cli.run(["secrets", what, "get", "foo", test_flake.name])
|
||||
out = capsys.readouterr() # empty the buffer
|
||||
assert age_keys[0].pubkey in out.out
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", what, "list"])
|
||||
cli.run(["secrets", what, "list", test_flake.name])
|
||||
out = capsys.readouterr() # empty the buffer
|
||||
assert "foo" in out.out
|
||||
|
||||
cli.run(["secrets", what, "remove", "foo"])
|
||||
cli.run(["secrets", what, "remove", "foo", test_flake.name])
|
||||
assert not (sops_folder / what / "foo" / "key.json").exists()
|
||||
|
||||
with pytest.raises(ClanError): # already removed
|
||||
cli.run(["secrets", what, "remove", "foo"])
|
||||
cli.run(["secrets", what, "remove", "foo", test_flake.name])
|
||||
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", what, "list"])
|
||||
cli.run(["secrets", what, "list", test_flake.name])
|
||||
out = capsys.readouterr()
|
||||
assert "foo" not in out.out
|
||||
|
||||
@@ -81,30 +80,36 @@ def test_groups(
|
||||
) -> None:
|
||||
cli = Cli()
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "groups", "list"])
|
||||
cli.run(["secrets", "groups", "list", test_flake.name])
|
||||
assert capsys.readouterr().out == ""
|
||||
|
||||
with pytest.raises(ClanError): # machine does not exist yet
|
||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1"])
|
||||
cli.run(
|
||||
["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name]
|
||||
)
|
||||
with pytest.raises(ClanError): # user does not exist yet
|
||||
cli.run(["secrets", "groups", "add-user", "groupb1", "user1"])
|
||||
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
|
||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1"])
|
||||
cli.run(["secrets", "groups", "add-user", "groupb1", "user1", test_flake.name])
|
||||
cli.run(
|
||||
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
|
||||
)
|
||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
|
||||
|
||||
# Should this fail?
|
||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1"])
|
||||
cli.run(["secrets", "groups", "add-machine", "group1", "machine1", test_flake.name])
|
||||
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey])
|
||||
cli.run(["secrets", "groups", "add-user", "group1", "user1"])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
|
||||
cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "groups", "list"])
|
||||
cli.run(["secrets", "groups", "list", test_flake.name])
|
||||
out = capsys.readouterr().out
|
||||
assert "user1" in out
|
||||
assert "machine1" in out
|
||||
|
||||
cli.run(["secrets", "groups", "remove-user", "group1", "user1"])
|
||||
cli.run(["secrets", "groups", "remove-machine", "group1", "machine1"])
|
||||
cli.run(["secrets", "groups", "remove-user", "group1", "user1", test_flake.name])
|
||||
cli.run(
|
||||
["secrets", "groups", "remove-machine", "group1", "machine1", test_flake.name]
|
||||
)
|
||||
groups = os.listdir(test_flake.path / "sops" / "groups")
|
||||
assert len(groups) == 0
|
||||
|
||||
@@ -122,104 +127,114 @@ def use_key(key: str, monkeypatch: pytest.MonkeyPatch) -> Iterator[None]:
|
||||
|
||||
|
||||
def test_secrets(
|
||||
test_flake: Path,
|
||||
test_flake: FlakeForTest,
|
||||
capsys: pytest.CaptureFixture,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
age_keys: list["KeyPair"],
|
||||
) -> None:
|
||||
cli = Cli()
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "list"])
|
||||
cli.run(["secrets", "list", test_flake.name])
|
||||
assert capsys.readouterr().out == ""
|
||||
|
||||
monkeypatch.setenv("SOPS_NIX_SECRET", "foo")
|
||||
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake / ".." / "age.key"))
|
||||
monkeypatch.setenv("SOPS_AGE_KEY_FILE", str(test_flake.path / ".." / "age.key"))
|
||||
cli.run(["secrets", "key", "generate"])
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "key", "show"])
|
||||
key = capsys.readouterr().out
|
||||
assert key.startswith("age1")
|
||||
cli.run(["secrets", "users", "add", "testuser", key])
|
||||
cli.run(["secrets", "users", "add", "testuser", key, test_flake.name])
|
||||
|
||||
with pytest.raises(ClanError): # does not exist yet
|
||||
cli.run(["secrets", "get", "nonexisting"])
|
||||
cli.run(["secrets", "set", "initialkey"])
|
||||
cli.run(["secrets", "get", "nonexisting", test_flake.name])
|
||||
cli.run(["secrets", "set", "initialkey", test_flake.name])
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "initialkey"])
|
||||
cli.run(["secrets", "get", "initialkey", test_flake.name])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "users", "list"])
|
||||
cli.run(["secrets", "users", "list", test_flake.name])
|
||||
users = capsys.readouterr().out.rstrip().split("\n")
|
||||
assert len(users) == 1, f"users: {users}"
|
||||
owner = users[0]
|
||||
|
||||
monkeypatch.setenv("EDITOR", "cat")
|
||||
cli.run(["secrets", "set", "--edit", "initialkey"])
|
||||
cli.run(["secrets", "set", "--edit", "initialkey", test_flake.name])
|
||||
monkeypatch.delenv("EDITOR")
|
||||
|
||||
cli.run(["secrets", "rename", "initialkey", "key"])
|
||||
cli.run(["secrets", "rename", "initialkey", "key", test_flake.name])
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "list"])
|
||||
cli.run(["secrets", "list", test_flake.name])
|
||||
assert capsys.readouterr().out == "key\n"
|
||||
|
||||
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey])
|
||||
cli.run(["secrets", "machines", "add-secret", "machine1", "key"])
|
||||
cli.run(
|
||||
["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name]
|
||||
)
|
||||
cli.run(["secrets", "machines", "add-secret", "machine1", "key", test_flake.name])
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "machines", "list"])
|
||||
cli.run(["secrets", "machines", "list", test_flake.name])
|
||||
assert capsys.readouterr().out == "machine1\n"
|
||||
|
||||
with use_key(age_keys[0].privkey, monkeypatch):
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
cli.run(["secrets", "get", "key", test_flake.name])
|
||||
|
||||
assert capsys.readouterr().out == "foo"
|
||||
|
||||
cli.run(["secrets", "machines", "remove-secret", "machine1", "key"])
|
||||
cli.run(
|
||||
["secrets", "machines", "remove-secret", "machine1", "key", test_flake.name]
|
||||
)
|
||||
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey])
|
||||
cli.run(["secrets", "users", "add-secret", "user1", "key"])
|
||||
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
|
||||
cli.run(["secrets", "users", "add-secret", "user1", "key", test_flake.name])
|
||||
capsys.readouterr()
|
||||
with use_key(age_keys[1].privkey, monkeypatch):
|
||||
cli.run(["secrets", "get", "key"])
|
||||
cli.run(["secrets", "get", "key", test_flake.name])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
cli.run(["secrets", "users", "remove-secret", "user1", "key"])
|
||||
cli.run(["secrets", "users", "remove-secret", "user1", "key", test_flake.name])
|
||||
|
||||
with pytest.raises(ClanError): # does not exist yet
|
||||
cli.run(["secrets", "groups", "add-secret", "admin-group", "key"])
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user1"])
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", owner])
|
||||
cli.run(["secrets", "groups", "add-secret", "admin-group", "key"])
|
||||
cli.run(
|
||||
["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name]
|
||||
)
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user1", test_flake.name])
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", owner, test_flake.name])
|
||||
cli.run(["secrets", "groups", "add-secret", "admin-group", "key", test_flake.name])
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "set", "--group", "admin-group", "key2"])
|
||||
cli.run(["secrets", "set", "--group", "admin-group", "key2", test_flake.name])
|
||||
|
||||
with use_key(age_keys[1].privkey, monkeypatch):
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
cli.run(["secrets", "get", "key", test_flake.name])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
|
||||
# extend group will update secrets
|
||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey])
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user2"])
|
||||
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
|
||||
cli.run(["secrets", "groups", "add-user", "admin-group", "user2", test_flake.name])
|
||||
|
||||
with use_key(age_keys[2].privkey, monkeypatch): # user2
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
cli.run(["secrets", "get", "key", test_flake.name])
|
||||
assert capsys.readouterr().out == "foo"
|
||||
|
||||
cli.run(["secrets", "groups", "remove-user", "admin-group", "user2"])
|
||||
cli.run(
|
||||
["secrets", "groups", "remove-user", "admin-group", "user2", test_flake.name]
|
||||
)
|
||||
with pytest.raises(ClanError), use_key(age_keys[2].privkey, monkeypatch):
|
||||
# user2 is not in the group anymore
|
||||
capsys.readouterr()
|
||||
cli.run(["secrets", "get", "key"])
|
||||
cli.run(["secrets", "get", "key", test_flake.name])
|
||||
print(capsys.readouterr().out)
|
||||
|
||||
cli.run(["secrets", "groups", "remove-secret", "admin-group", "key"])
|
||||
cli.run(
|
||||
["secrets", "groups", "remove-secret", "admin-group", "key", test_flake.name]
|
||||
)
|
||||
|
||||
cli.run(["secrets", "remove", "key"])
|
||||
cli.run(["secrets", "remove", "key2"])
|
||||
cli.run(["secrets", "remove", "key", test_flake.name])
|
||||
cli.run(["secrets", "remove", "key2", test_flake.name])
|
||||
|
||||
capsys.readouterr() # empty the buffer
|
||||
cli.run(["secrets", "list"])
|
||||
cli.run(["secrets", "list", test_flake.name])
|
||||
assert capsys.readouterr().out == ""
|
||||
|
||||
@@ -16,9 +16,12 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[FlakeForTest]:
|
||||
def flake_with_vm_with_secrets(
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
|
||||
) -> Iterator[FlakeForTest]:
|
||||
yield from create_flake(
|
||||
monkeypatch,
|
||||
temporary_dir,
|
||||
FlakeName("test_flake_with_core_dynamic_machines"),
|
||||
CLAN_CORE,
|
||||
machines=["vm_with_secrets"],
|
||||
@@ -27,10 +30,11 @@ def flake_with_vm_with_secrets(monkeypatch: pytest.MonkeyPatch) -> Iterator[Flak
|
||||
|
||||
@pytest.fixture
|
||||
def remote_flake_with_vm_without_secrets(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path
|
||||
) -> Iterator[FlakeForTest]:
|
||||
yield from create_flake(
|
||||
monkeypatch,
|
||||
temporary_dir,
|
||||
FlakeName("test_flake_with_core_dynamic_machines"),
|
||||
CLAN_CORE,
|
||||
machines=["vm_without_secrets"],
|
||||
|
||||
Reference in New Issue
Block a user