Fixing a multitude of tests

This commit is contained in:
Qubasa
2023-10-23 22:31:12 +02:00
parent adffdc14f6
commit 8ff80b025c
25 changed files with 154 additions and 113 deletions

View File

@@ -39,6 +39,7 @@ in
uploadSecrets = pkgs.writeScript "upload-secrets" '' uploadSecrets = pkgs.writeScript "upload-secrets" ''
#!${pkgs.python3}/bin/python #!${pkgs.python3}/bin/python
import json import json
import sys
from clan_cli.secrets.sops_generate import upload_age_key_from_nix from clan_cli.secrets.sops_generate import upload_age_key_from_nix
# the second toJSON is needed to escape the string for the python # the second toJSON is needed to escape the string for the python
args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })}) args = json.loads(${builtins.toJSON (builtins.toJSON { machine_name = config.clanCore.machineName; })})

View File

@@ -60,11 +60,17 @@ By default tests run in parallel using pytest-parallel.
pytest-parallel however breaks `breakpoint()`. To disable it, use this: pytest-parallel however breaks `breakpoint()`. To disable it, use this:
```console ```console
pytest --workers "" -s pytest -n0 -s
``` ```
You can also run a single test like this: You can also run a single test like this:
```console ```console
pytest --workers "" -s tests/test_secrets_cli.py::test_users pytest -n0 -s tests/test_secrets_cli.py::test_users
``` ```
## Debugging functions
Debugging functions can be found under `src/debug.py`
quite interesting is the function repro_env_break() which drops you into a shell
with the test environment loaded.

View File

@@ -23,10 +23,6 @@ def repro_env_break(work_dir: Path, env: Optional[Dict[str, str]] = None, cmd: O
else: else:
env = env.copy() env = env.copy()
# Error checking
if "bash" in env["SHELL"]:
raise Exception("I assumed you use zsh, not bash")
# Cmd appending # Cmd appending
args = ["xterm", "-e", "zsh", "-df"] args = ["xterm", "-e", "zsh", "-df"]
if cmd is not None: if cmd is not None:
@@ -48,7 +44,9 @@ def write_command(command: str, loc:Path) -> None:
os.chmod(loc, st.st_mode | stat.S_IEXEC) os.chmod(loc, st.st_mode | stat.S_IEXEC)
def spawn_process(func: Callable, **kwargs:Any) -> mp.Process: def spawn_process(func: Callable, **kwargs:Any) -> mp.Process:
mp.set_start_method(method="spawn") if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="spawn")
proc = mp.Process(target=func, kwargs=kwargs) proc = mp.Process(target=func, kwargs=kwargs)
proc.start() proc.start()
return proc return proc

View File

@@ -4,7 +4,6 @@ import argparse
from .create import register_create_parser from .create import register_create_parser
from .list import register_list_parser from .list import register_list_parser
# takes a (sub)parser and configures it # takes a (sub)parser and configures it
def register_parser(parser: argparse.ArgumentParser) -> None: def register_parser(parser: argparse.ArgumentParser) -> None:
subparser = parser.add_subparsers( subparser = parser.add_subparsers(

View File

@@ -71,7 +71,7 @@ class Machine:
env["SECRETS_DIR"] = str(secrets_dir) env["SECRETS_DIR"] = str(secrets_dir)
print(f"uploading secrets... {self.upload_secrets}") print(f"uploading secrets... {self.upload_secrets}")
proc = subprocess.run( proc = subprocess.run(
[self.upload_secrets], [self.upload_secrets, self.flake_dir.name],
env=env, env=env,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
text=True, text=True,

View File

@@ -8,18 +8,19 @@ from clan_cli.errors import ClanError
from ..dirs import specific_flake_dir from ..dirs import specific_flake_dir
from ..machines.machines import Machine from ..machines.machines import Machine
from ..types import FlakeName
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def generate_secrets(machine: Machine) -> None: def generate_secrets(machine: Machine, flake_name: FlakeName) -> None:
env = os.environ.copy() env = os.environ.copy()
env["CLAN_DIR"] = str(machine.flake_dir) env["CLAN_DIR"] = str(machine.flake_dir)
env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module env["PYTHONPATH"] = ":".join(sys.path) # TODO do this in the clanCore module
print(f"generating secrets... {machine.generate_secrets}") print(f"generating secrets... {machine.generate_secrets}")
proc = subprocess.run( proc = subprocess.run(
[machine.generate_secrets], [machine.generate_secrets, flake_name],
env=env, env=env,
) )
@@ -31,7 +32,7 @@ def generate_secrets(machine: Machine) -> None:
def generate_command(args: argparse.Namespace) -> None: def generate_command(args: argparse.Namespace) -> None:
machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake)) machine = Machine(name=args.machine, flake_dir=specific_flake_dir(args.flake))
generate_secrets(machine) generate_secrets(machine, args.flake)
def register_generate_parser(parser: argparse.ArgumentParser) -> None: def register_generate_parser(parser: argparse.ArgumentParser) -> None:

View File

@@ -91,4 +91,10 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
type=str, type=str,
help="the sops file to import (- for stdin)", help="the sops file to import (- for stdin)",
) )
parser.add_argument(
"flake",
type=str,
help="name of the flake",
)
parser.set_defaults(func=import_sops) parser.set_defaults(func=import_sops)

View File

@@ -4,6 +4,7 @@ import json
import os import os
import shlex import shlex
import sys import sys
import re
from pathlib import Path from pathlib import Path
from typing import Iterator, Dict from typing import Iterator, Dict
from uuid import UUID from uuid import UUID
@@ -17,6 +18,17 @@ from ..errors import ClanError
from ..debug import repro_env_break from ..debug import repro_env_break
def is_path_or_url(s: str) -> str | None:
# check if s is a valid path
if os.path.exists(s):
return "path"
# check if s is a valid URL
elif re.match(r"^https?://[a-zA-Z0-9.-]+/[a-zA-Z0-9.-]+", s):
return "URL"
# otherwise, return None
else:
return None
class BuildVmTask(BaseTask): class BuildVmTask(BaseTask):
def __init__(self, uuid: UUID, vm: VmConfig) -> None: def __init__(self, uuid: UUID, vm: VmConfig) -> None:
super().__init__(uuid, num_cmds=7) super().__init__(uuid, num_cmds=7)
@@ -78,19 +90,25 @@ class BuildVmTask(BaseTask):
) # TODO do this in the clanCore module ) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir) env["SECRETS_DIR"] = str(secrets_dir)
cmd = next(cmds) res = is_path_or_url(str(self.vm.flake_url))
repro_env_break(work_dir=flake_dir, env=env, cmd=[vm_config["generateSecrets"], clan_name]) if res is None:
if Path(self.vm.flake_url).is_dir(): raise ClanError(
cmd.run( f"flake_url must be a valid path or URL, got {self.vm.flake_url}"
[vm_config["generateSecrets"], clan_name],
env=env,
) )
else: elif res == "path": # Only generate secrets for local clans
self.log.warning("won't generate secrets for non local clan") cmd = next(cmds)
if Path(self.vm.flake_url).is_dir():
cmd.run(
[vm_config["generateSecrets"], clan_name],
env=env,
)
else:
self.log.warning("won't generate secrets for non local clan")
cmd = next(cmds) cmd = next(cmds)
cmd.run( cmd.run(
[vm_config["uploadSecrets"]], [vm_config["uploadSecrets"], clan_name],
env=env, env=env,
) )

View File

@@ -3,7 +3,7 @@ import logging
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, Body from fastapi import APIRouter, Body
from clan_cli.debug import repro_env_break
from ...config.machine import ( from ...config.machine import (
config_for_machine, config_for_machine,
schema_for_machine, schema_for_machine,
@@ -33,6 +33,7 @@ async def list_machines(flake_name: FlakeName) -> MachinesResponse:
machines = [] machines = []
for m in _list_machines(flake_name): for m in _list_machines(flake_name):
machines.append(Machine(name=m, status=Status.UNKNOWN)) machines.append(Machine(name=m, status=Status.UNKNOWN))
return MachinesResponse(machines=machines) return MachinesResponse(machines=machines)

View File

@@ -12,7 +12,7 @@ from typing import Iterator
# XXX: can we dynamically load this using nix develop? # XXX: can we dynamically load this using nix develop?
import uvicorn import uvicorn
from pydantic import AnyUrl, IPvAnyAddress from pydantic import AnyUrl, IPvAnyAddress
from pydantic.tools import parse_obj_as
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -25,7 +25,8 @@ def open_browser(base_url: AnyUrl, sub_url: str) -> None:
break break
except OSError: except OSError:
time.sleep(i) time.sleep(i)
url = AnyUrl(f"{base_url}/{sub_url.removeprefix('/')}") url = parse_obj_as(
AnyUrl,f"{base_url}/{sub_url.removeprefix('/')}")
_open_browser(url) _open_browser(url)

View File

@@ -38,7 +38,7 @@ class FlakeForTest(NamedTuple):
def create_flake( def create_flake(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
temporary_dir: Path, temporary_home: Path,
flake_name: FlakeName, flake_name: FlakeName,
clan_core_flake: Path | None = None, clan_core_flake: Path | None = None,
machines: list[str] = [], machines: list[str] = [],
@@ -51,7 +51,7 @@ def create_flake(
template = Path(__file__).parent / flake_name template = Path(__file__).parent / flake_name
# copy the template to a new temporary location # copy the template to a new temporary location
home = Path(temporary_dir) home = Path(temporary_home)
flake = home / ".local/state/clan/flake" / flake_name flake = home / ".local/state/clan/flake" / flake_name
shutil.copytree(template, flake) shutil.copytree(template, flake)
@@ -87,21 +87,21 @@ def test_flake(
@pytest.fixture @pytest.fixture
def test_flake_with_core( def test_flake_with_core(
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( raise Exception(
"clan-core flake not found. This test requires the clan-core flake to be present" "clan-core flake not found. This test requires the clan-core flake to be present"
) )
yield from create_flake( yield from create_flake(
monkeypatch, temporary_dir, FlakeName("test_flake_with_core"), CLAN_CORE monkeypatch, temporary_home, FlakeName("test_flake_with_core"), CLAN_CORE
) )
@pytest.fixture @pytest.fixture
def test_flake_with_core_and_pass( def test_flake_with_core_and_pass(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
temporary_dir: Path, temporary_home: Path,
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
if not (CLAN_CORE / "flake.nix").exists(): if not (CLAN_CORE / "flake.nix").exists():
raise Exception( raise Exception(
@@ -109,7 +109,8 @@ def test_flake_with_core_and_pass(
) )
yield from create_flake( yield from create_flake(
monkeypatch, monkeypatch,
temporary_dir, temporary_home,
FlakeName("test_flake_with_core_and_pass"), FlakeName("test_flake_with_core_and_pass"),
CLAN_CORE, CLAN_CORE,
) )

View File

@@ -7,15 +7,15 @@ from clan_cli.errors import ClanError
def test_get_clan_flake_toplevel( def test_get_clan_flake_toplevel(
monkeypatch: pytest.MonkeyPatch, temporary_dir: Path monkeypatch: pytest.MonkeyPatch, temporary_home: Path
) -> None: ) -> None:
monkeypatch.chdir(temporary_dir) monkeypatch.chdir(temporary_home)
with pytest.raises(ClanError): with pytest.raises(ClanError):
print(_get_clan_flake_toplevel()) print(_get_clan_flake_toplevel())
(temporary_dir / ".git").touch() (temporary_home / ".git").touch()
assert _get_clan_flake_toplevel() == temporary_dir assert _get_clan_flake_toplevel() == temporary_home
subdir = temporary_dir / "subdir" subdir = temporary_home / "subdir"
subdir.mkdir() subdir.mkdir()
monkeypatch.chdir(subdir) monkeypatch.chdir(subdir)
(subdir / ".clan-flake").touch() (subdir / ".clan-flake").touch()

View File

@@ -1,13 +1,13 @@
import json import json
from pathlib import Path from pathlib import Path
from fixtures_flakes import FlakeForTest
import pytest import pytest
from api import TestClient from api import TestClient
@pytest.mark.impure @pytest.mark.impure
def test_inspect_ok(api: TestClient, test_flake_with_core: Path) -> None: def test_inspect_ok(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
params = {"url": str(test_flake_with_core)} params = {"url": str(test_flake_with_core.path)}
response = api.get( response = api.get(
"/api/flake/attrs", "/api/flake/attrs",
params=params, params=params,
@@ -32,8 +32,8 @@ def test_inspect_err(api: TestClient) -> None:
@pytest.mark.impure @pytest.mark.impure
def test_inspect_flake(api: TestClient, test_flake_with_core: Path) -> None: def test_inspect_flake(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
params = {"url": str(test_flake_with_core)} params = {"url": str(test_flake_with_core.path)}
response = api.get( response = api.get(
"/api/flake", "/api/flake",
params=params, params=params,

View File

@@ -9,7 +9,7 @@
let let
clan = clan-core.lib.buildClan { clan = clan-core.lib.buildClan {
directory = self; directory = self;
clanName = "test_with_core_clan"; clanName = "test_flake_with_core";
machines = { machines = {
vm1 = { lib, ... }: { vm1 = { lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__"; clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";

View File

@@ -9,7 +9,7 @@
let let
clan = clan-core.lib.buildClan { clan = clan-core.lib.buildClan {
directory = self; directory = self;
clanName = "test_with_core_and_pass_clan"; clanName = "test_flake_with_core_and_pass";
machines = { machines = {
vm1 = { lib, ... }: { vm1 = { lib, ... }: {
clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__"; clan.networking.deploymentAddress = "__CLAN_DEPLOYMENT_ADDRESS__";

View File

@@ -9,7 +9,7 @@
let let
clan = clan-core.lib.buildClan { clan = clan-core.lib.buildClan {
directory = self; directory = self;
clanName = "core_dynamic_machine_clan"; clanName = "test_flake_with_core_dynamic_machines";
machines = machines =
let let
machineModules = builtins.readDir (self + "/machines"); machineModules = builtins.readDir (self + "/machines");

View File

@@ -1,5 +1,7 @@
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from fixtures_flakes import FlakeForTest
from clan_cli.debug import repro_env_break
import pytest import pytest
from cli import Cli from cli import Cli
@@ -10,7 +12,7 @@ if TYPE_CHECKING:
def test_import_sops( def test_import_sops(
test_root: Path, test_root: Path,
test_flake: Path, test_flake: FlakeForTest,
capsys: pytest.CaptureFixture, capsys: pytest.CaptureFixture,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
age_keys: list["KeyPair"], age_keys: list["KeyPair"],
@@ -18,16 +20,15 @@ def test_import_sops(
cli = Cli() cli = Cli()
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[1].privkey)
cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey]) cli.run(["secrets", "machines", "add", "machine1", age_keys[0].pubkey, test_flake.name])
cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey]) cli.run(["secrets", "users", "add", "user1", age_keys[1].pubkey, test_flake.name])
cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey]) cli.run(["secrets", "users", "add", "user2", age_keys[2].pubkey, test_flake.name])
cli.run(["secrets", "groups", "add-user", "group1", "user1"]) cli.run(["secrets", "groups", "add-user", "group1", "user1", test_flake.name])
cli.run(["secrets", "groups", "add-user", "group1", "user2"]) cli.run(["secrets", "groups", "add-user", "group1", "user2", test_flake.name])
# To edit: # To edit:
# SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml # SOPS_AGE_KEY=AGE-SECRET-KEY-1U5ENXZQAY62NC78Y2WC0SEGRRMAEEKH79EYY5TH4GPFWJKEAY0USZ6X7YQ sops --age age14tva0txcrl0zes05x7gkx56qd6wd9q3nwecjac74xxzz4l47r44sv3fz62 ./data/secrets.yaml
cli.run( cmd = [
[
"secrets", "secrets",
"import-sops", "import-sops",
"--group", "--group",
@@ -35,13 +36,17 @@ def test_import_sops(
"--machine", "--machine",
"machine1", "machine1",
str(test_root.joinpath("data", "secrets.yaml")), str(test_root.joinpath("data", "secrets.yaml")),
test_flake.name
] ]
repro_env_break(work_dir=test_flake.path, cmd=cmd)
cli.run(
cmd
) )
capsys.readouterr() capsys.readouterr()
cli.run(["secrets", "users", "list"]) cli.run(["secrets", "users", "list", test_flake.name])
users = sorted(capsys.readouterr().out.rstrip().split()) users = sorted(capsys.readouterr().out.rstrip().split())
assert users == ["user1", "user2"] assert users == ["user1", "user2"]
capsys.readouterr() capsys.readouterr()
cli.run(["secrets", "get", "secret-key"]) cli.run(["secrets", "get", "secret-key", test_flake.name])
assert capsys.readouterr().out == "secret-value" assert capsys.readouterr().out == "secret-value"

View File

@@ -1,46 +1,47 @@
from pathlib import Path from pathlib import Path
from api import TestClient from api import TestClient
from fixtures_flakes import FlakeForTest
from clan_cli.debug import repro_env_break
def test_machines(api: TestClient, test_flake: FlakeForTest) -> None:
def test_machines(api: TestClient, test_flake: Path) -> None: response = api.get(f"/api/{test_flake.name}/machines")
response = api.get("/api/machines")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"machines": []} assert response.json() == {"machines": []}
response = api.post("/api/machines", json={"name": "test"}) response = api.post(f"/api/{test_flake.name}/machines", json={"name": "test"})
assert response.status_code == 201 assert response.status_code == 201
assert response.json() == {"machine": {"name": "test", "status": "unknown"}} assert response.json() == {"machine": {"name": "test", "status": "unknown"}}
response = api.get("/api/machines/test") response = api.get(f"/api/{test_flake.name}/machines/test")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"machine": {"name": "test", "status": "unknown"}} assert response.json() == {"machine": {"name": "test", "status": "unknown"}}
response = api.get("/api/machines") response = api.get(f"/api/{test_flake.name}/machines")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"machines": [{"name": "test", "status": "unknown"}]} assert response.json() == {"machines": [{"name": "test", "status": "unknown"}]}
def test_configure_machine(api: TestClient, test_flake: Path) -> None: def test_configure_machine(api: TestClient, test_flake: FlakeForTest) -> None:
# ensure error 404 if machine does not exist when accessing the config # ensure error 404 if machine does not exist when accessing the config
response = api.get("/api/machines/machine1/config") response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
assert response.status_code == 404 assert response.status_code == 404
# ensure error 404 if machine does not exist when writing to the config # ensure error 404 if machine does not exist when writing to the config
response = api.put("/api/machines/machine1/config", json={}) response = api.put(f"/api/{test_flake.name}/machines/machine1/config", json={})
assert response.status_code == 404 assert response.status_code == 404
# create the machine # create the machine
response = api.post("/api/machines", json={"name": "machine1"}) response = api.post(f"/api/{test_flake.name}/machines", json={"name": "machine1"})
assert response.status_code == 201 assert response.status_code == 201
# ensure an empty config is returned by default for a new machine # ensure an empty config is returned by default for a new machine
response = api.get("/api/machines/machine1/config") response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"config": {}} assert response.json() == {"config": {}}
# get jsonschema for machine # get jsonschema for machine
response = api.get("/api/machines/machine1/schema") response = api.get(f"/api/{test_flake.name}/machines/machine1/schema")
assert response.status_code == 200 assert response.status_code == 200
json_response = response.json() json_response = response.json()
assert "schema" in json_response and "properties" in json_response["schema"] assert "schema" in json_response and "properties" in json_response["schema"]
@@ -91,6 +92,11 @@ def test_configure_machine(api: TestClient, test_flake: Path) -> None:
devices=["/dev/fake_disk"], devices=["/dev/fake_disk"],
), ),
), ),
f"/api/{test_flake.name}machines/machine1/config",
json=dict(
clan=dict(
jitsi=True,
)
), ),
) )
@@ -110,8 +116,8 @@ def test_configure_machine(api: TestClient, test_flake: Path) -> None:
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"config": config2} assert response.json() == {"config": config2}
# ensure that the config has actually been updated # get the config again
response = api.get("/api/machines/machine1/config") response = api.get(f"/api/{test_flake.name}/machines/machine1/config")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"config": config2} assert response.json() == {"config": config2}

View File

@@ -21,8 +21,8 @@ def test_generate_secret(
monkeypatch.chdir(test_flake_with_core.path) monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli() cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey]) cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake_with_core.name])
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
has_secret(test_flake_with_core.name, "vm1-age.key") has_secret(test_flake_with_core.name, "vm1-age.key")
has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret") has_secret(test_flake_with_core.name, "vm1-zerotier-identity-secret")
network_id = machine_get_fact( network_id = machine_get_fact(
@@ -43,7 +43,7 @@ def test_generate_secret(
secret1_mtime = identity_secret.lstat().st_mtime_ns secret1_mtime = identity_secret.lstat().st_mtime_ns
# test idempotency # test idempotency
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1", test_flake_with_core.name])
assert age_key.lstat().st_mtime_ns == age_key_mtime assert age_key.lstat().st_mtime_ns == age_key_mtime
assert identity_secret.lstat().st_mtime_ns == secret1_mtime assert identity_secret.lstat().st_mtime_ns == secret1_mtime

View File

@@ -14,15 +14,15 @@ from clan_cli.ssh import HostGroup
def test_upload_secret( def test_upload_secret(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
test_flake_with_core_and_pass: FlakeForTest, test_flake_with_core_and_pass: FlakeForTest,
temporary_dir: Path, temporary_home: Path,
host_group: HostGroup, host_group: HostGroup,
) -> None: ) -> None:
monkeypatch.chdir(test_flake_with_core_and_pass.path) monkeypatch.chdir(test_flake_with_core_and_pass.path)
gnupghome = temporary_dir / "gpg" gnupghome = temporary_home / "gpg"
gnupghome.mkdir(mode=0o700) gnupghome.mkdir(mode=0o700)
monkeypatch.setenv("GNUPGHOME", str(gnupghome)) monkeypatch.setenv("GNUPGHOME", str(gnupghome))
monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_dir / "pass")) monkeypatch.setenv("PASSWORD_STORE_DIR", str(temporary_home / "pass"))
gpg_key_spec = temporary_dir / "gpg_key_spec" gpg_key_spec = temporary_home / "gpg_key_spec"
gpg_key_spec.write_text( gpg_key_spec.write_text(
""" """
Key-Type: 1 Key-Type: 1
@@ -39,18 +39,18 @@ def test_upload_secret(
check=True, check=True,
) )
subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True) subprocess.run(nix_shell(["pass"], ["pass", "init", "test@local"]), check=True)
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
network_id = machine_get_fact( network_id = machine_get_fact(
test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id" test_flake_with_core_and_pass.name, "vm1", "zerotier-network-id"
) )
assert len(network_id) == 16 assert len(network_id) == 16
identity_secret = ( identity_secret = (
temporary_dir / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg" temporary_home / "pass" / "machines" / "vm1" / "zerotier-identity-secret.gpg"
) )
secret1_mtime = identity_secret.lstat().st_mtime_ns secret1_mtime = identity_secret.lstat().st_mtime_ns
# test idempotency # test idempotency
cli.run(["secrets", "generate", "vm1"]) cli.run(["secrets", "generate", "vm1", test_flake_with_core_and_pass.name])
assert identity_secret.lstat().st_mtime_ns == secret1_mtime assert identity_secret.lstat().st_mtime_ns == secret1_mtime
flake = test_flake_with_core_and_pass.path.joinpath("flake.nix") flake = test_flake_with_core_and_pass.path.joinpath("flake.nix")
@@ -58,7 +58,7 @@ def test_upload_secret(
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}" addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr) new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text) flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1"]) cli.run(["secrets", "upload", "vm1", test_flake_with_core_and_pass.name])
zerotier_identity_secret = ( zerotier_identity_secret = (
test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret" test_flake_with_core_and_pass.path / "secrets" / "zerotier-identity-secret"
) )

View File

@@ -3,7 +3,7 @@ from typing import TYPE_CHECKING
import pytest import pytest
from cli import Cli from cli import Cli
from fixtures_flakes import FlakeForTest
from clan_cli.ssh import HostGroup from clan_cli.ssh import HostGroup
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -13,29 +13,29 @@ if TYPE_CHECKING:
@pytest.mark.impure @pytest.mark.impure
def test_secrets_upload( def test_secrets_upload(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: Path, test_flake_with_core: FlakeForTest,
host_group: HostGroup, host_group: HostGroup,
age_keys: list["KeyPair"], age_keys: list["KeyPair"],
) -> None: ) -> None:
monkeypatch.chdir(test_flake_with_core) monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli() cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey]) cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake_with_core.name])
cli.run(["secrets", "machines", "add", "vm1", age_keys[1].pubkey]) cli.run(["secrets", "machines", "add", "vm1", age_keys[1].pubkey, test_flake_with_core.name])
monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey) monkeypatch.setenv("SOPS_NIX_SECRET", age_keys[0].privkey)
cli.run(["secrets", "set", "vm1-age.key"]) cli.run(["secrets", "set", "vm1-age.key", test_flake_with_core.name])
flake = test_flake_with_core.joinpath("flake.nix") flake = test_flake_with_core.path.joinpath("flake.nix")
host = host_group.hosts[0] host = host_group.hosts[0]
addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}" addr = f"{host.user}@{host.host}:{host.port}?StrictHostKeyChecking=no&UserKnownHostsFile=/dev/null&IdentityFile={host.key}"
new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr) new_text = flake.read_text().replace("__CLAN_DEPLOYMENT_ADDRESS__", addr)
flake.write_text(new_text) flake.write_text(new_text)
cli.run(["secrets", "upload", "vm1"]) cli.run(["secrets", "upload", "vm1", test_flake_with_core.name])
# the flake defines this path as the location where the sops key should be installed # the flake defines this path as the location where the sops key should be installed
sops_key = test_flake_with_core.joinpath("key.txt") sops_key = test_flake_with_core.path.joinpath("key.txt")
assert sops_key.exists() assert sops_key.exists()
assert sops_key.read_text() == age_keys[0].privkey assert sops_key.read_text() == age_keys[0].privkey

View File

@@ -2,13 +2,13 @@ from pathlib import Path
import pytest import pytest
from api import TestClient from api import TestClient
from fixtures_flakes import FlakeForTest
@pytest.mark.impure @pytest.mark.impure
def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: def test_inspect(api: TestClient, test_flake_with_core: FlakeForTest) -> None:
response = api.post( response = api.post(
"/api/vms/inspect", "/api/vms/inspect",
json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), json=dict(flake_url=str(test_flake_with_core.path), flake_attr="vm1"),
) )
assert response.status_code == 200, f"Failed to inspect vm: {response.text}" assert response.status_code == 200, f"Failed to inspect vm: {response.text}"

View File

@@ -9,6 +9,7 @@ from fixtures_flakes import FlakeForTest, create_flake
from httpx import SyncByteStream from httpx import SyncByteStream
from root import CLAN_CORE from root import CLAN_CORE
from clan_cli.debug import repro_env_break
from clan_cli.types import FlakeName from clan_cli.types import FlakeName
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -17,7 +18,8 @@ if TYPE_CHECKING:
@pytest.fixture @pytest.fixture
def flake_with_vm_with_secrets( def flake_with_vm_with_secrets(
monkeypatch: pytest.MonkeyPatch, temporary_home: Path monkeypatch: pytest.MonkeyPatch,
temporary_home: Path
) -> Iterator[FlakeForTest]: ) -> Iterator[FlakeForTest]:
yield from create_flake( yield from create_flake(
monkeypatch, monkeypatch,
@@ -42,15 +44,6 @@ def remote_flake_with_vm_without_secrets(
) )
@pytest.fixture
def create_user_with_age_key(
monkeypatch: pytest.MonkeyPatch,
test_flake: FlakeForTest,
age_keys: list["KeyPair"],
) -> None:
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake.name])
def generic_create_vm_test(api: TestClient, flake: Path, vm: str) -> None: def generic_create_vm_test(api: TestClient, flake: Path, vm: str) -> None:
@@ -97,8 +90,13 @@ def test_create_local(
api: TestClient, api: TestClient,
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
flake_with_vm_with_secrets: FlakeForTest, flake_with_vm_with_secrets: FlakeForTest,
create_user_with_age_key: None, age_keys: list["KeyPair"],
) -> None: ) -> None:
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli()
cmd = ["secrets", "users", "add", "user1", age_keys[0].pubkey, flake_with_vm_with_secrets.name]
cli.run(cmd)
generic_create_vm_test(api, flake_with_vm_with_secrets.path, "vm_with_secrets") generic_create_vm_test(api, flake_with_vm_with_secrets.path, "vm_with_secrets")

View File

@@ -1,7 +1,7 @@
import os import os
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from fixtures_flakes import FlakeForTest
import pytest import pytest
from cli import Cli from cli import Cli
@@ -12,9 +12,9 @@ no_kvm = not os.path.exists("/dev/kvm")
@pytest.mark.impure @pytest.mark.impure
def test_inspect(test_flake_with_core: Path, capsys: pytest.CaptureFixture) -> None: def test_inspect(test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture) -> None:
cli = Cli() cli = Cli()
cli.run(["vms", "inspect", "vm1"]) cli.run(["vms", "inspect", "vm1", test_flake_with_core.name])
out = capsys.readouterr() # empty the buffer out = capsys.readouterr() # empty the buffer
assert "Cores" in out.out assert "Cores" in out.out
@@ -23,11 +23,11 @@ def test_inspect(test_flake_with_core: Path, capsys: pytest.CaptureFixture) -> N
@pytest.mark.impure @pytest.mark.impure
def test_create( def test_create(
monkeypatch: pytest.MonkeyPatch, monkeypatch: pytest.MonkeyPatch,
test_flake_with_core: Path, test_flake_with_core: FlakeForTest,
age_keys: list["KeyPair"], age_keys: list["KeyPair"],
) -> None: ) -> None:
monkeypatch.chdir(test_flake_with_core) monkeypatch.chdir(test_flake_with_core.path)
monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey) monkeypatch.setenv("SOPS_AGE_KEY", age_keys[0].privkey)
cli = Cli() cli = Cli()
cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey]) cli.run(["secrets", "users", "add", "user1", age_keys[0].pubkey, test_flake_with_core.name])
cli.run(["vms", "create", "vm1"]) cli.run(["vms", "create", "vm1", test_flake_with_core.name])

View File

@@ -10,12 +10,12 @@ from ports import PortFunction
@pytest.mark.timeout(10) @pytest.mark.timeout(10)
def test_start_server(unused_tcp_port: PortFunction, temporary_dir: Path) -> None: def test_start_server(unused_tcp_port: PortFunction, temporary_home: Path) -> None:
port = unused_tcp_port() port = unused_tcp_port()
fifo = temporary_dir / "fifo" fifo = temporary_home / "fifo"
os.mkfifo(fifo) os.mkfifo(fifo)
notify_script = temporary_dir / "firefox" notify_script = temporary_home / "firefox"
bash = shutil.which("bash") bash = shutil.which("bash")
assert bash is not None assert bash is not None
notify_script.write_text( notify_script.write_text(
@@ -27,8 +27,8 @@ echo "1" > {fifo}
notify_script.chmod(0o700) notify_script.chmod(0o700)
env = os.environ.copy() env = os.environ.copy()
print(str(temporary_dir.absolute())) print(str(temporary_home.absolute()))
env["PATH"] = ":".join([str(temporary_dir.absolute())] + env["PATH"].split(":")) env["PATH"] = ":".join([str(temporary_home.absolute())] + env["PATH"].split(":"))
with subprocess.Popen( with subprocess.Popen(
[sys.executable, "-m", "clan_cli.webui", "--port", str(port)], env=env [sys.executable, "-m", "clan_cli.webui", "--port", str(port)], env=env
) as p: ) as p: