test_vars_deployment: simplify test to just start one vm

This commit is contained in:
Jörg Thalheim
2025-08-15 13:30:30 +02:00
parent b69ad0eca5
commit 36812d5f95
3 changed files with 21 additions and 71 deletions

View File

@@ -11,5 +11,4 @@ pytest_plugins = [
"clan_cli.tests.runtime",
"clan_cli.tests.fixtures_flakes",
"clan_cli.tests.stdout",
"clan_cli.tests.nix_config",
]

View File

@@ -1,24 +0,0 @@
import json
import subprocess
from dataclasses import dataclass
import pytest
@dataclass
class ConfigItem:
aliases: list[str]
defaultValue: bool # noqa: N815
description: str
documentDefault: bool # noqa: N815
experimentalFeature: str # noqa: N815
value: str | bool | list[str] | dict[str, str]
@pytest.fixture(scope="session")
def nix_config() -> dict[str, ConfigItem]:
proc = subprocess.run(
["nix", "config", "show", "--json"], check=True, stdout=subprocess.PIPE
)
data = json.loads(proc.stdout)
return {name: ConfigItem(**c) for name, c in data.items()}

View File

@@ -7,24 +7,23 @@ import pytest
from clan_cli.tests.age_keys import SopsSetup
from clan_cli.tests.fixtures_flakes import ClanFlake
from clan_cli.tests.helpers import cli
from clan_cli.tests.nix_config import ConfigItem
from clan_cli.vms.run import inspect_vm, spawn_vm
from clan_lib import cmd
from clan_lib.flake import Flake
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_eval, run
from clan_lib.nix import nix_config, nix_eval, run
@pytest.mark.impure
@pytest.mark.skipif(sys.platform == "darwin", reason="preload doesn't work on darwin")
def test_vm_deployment(
flake: ClanFlake,
nix_config: dict[str, ConfigItem],
sops_setup: SopsSetup,
) -> None:
# machine 1
config = nix_config()
machine1_config = flake.machines["m1_machine"]
machine1_config["nixpkgs"]["hostPlatform"] = nix_config["system"].value
machine1_config["nixpkgs"]["hostPlatform"] = config["system"]
machine1_config["clan"]["virtualisation"]["graphics"] = False
machine1_config["services"]["getty"]["autologinUser"] = "root"
machine1_config["services"]["openssh"]["enable"] = True
@@ -48,19 +47,6 @@ def test_vm_deployment(
echo hello > "$out"/shared_secret
echo hello > "$out"/no_deploy_secret
"""
# machine 2
machine2_config = flake.machines["m2_machine"]
machine2_config["nixpkgs"]["hostPlatform"] = nix_config["system"].value
machine2_config["clan"]["virtualisation"]["graphics"] = False
machine2_config["services"]["getty"]["autologinUser"] = "root"
machine2_config["services"]["openssh"]["enable"] = True
machine2_config["users"]["users"]["root"]["openssh"]["authorizedKeys"]["keys"] = [
# put your key here when debugging and pass ssh_port in run_vm_in_thread call below
]
machine2_config["networking"]["firewall"]["enable"] = False
machine2_config["clan"]["core"]["vars"]["generators"]["my_shared_generator"] = (
m1_shared_generator.copy()
)
flake.refresh()
@@ -68,12 +54,11 @@ def test_vm_deployment(
cli.run(["vars", "generate", "--flake", str(flake.path)])
# check sops secrets not empty
for machine in ["m1_machine", "m2_machine"]:
sops_secrets = json.loads(
run(
nix_eval(
[
f"{flake.path}#nixosConfigurations.{machine}.config.sops.secrets",
f"{flake.path}#nixosConfigurations.m1_machine.config.sops.secrets",
]
)
).stdout.strip()
@@ -87,11 +72,10 @@ def test_vm_deployment(
)
).stdout.strip()
assert "no-such-path" not in my_secret_path
for machine in ["m1_machine", "m2_machine"]:
shared_secret_path = run(
nix_eval(
[
f"{flake.path}#nixosConfigurations.{machine}.config.clan.core.vars.generators.my_shared_generator.files.shared_secret.path",
f"{flake.path}#nixosConfigurations.m1_machine.config.clan.core.vars.generators.my_shared_generator.files.shared_secret.path",
]
)
).stdout.strip()
@@ -100,15 +84,11 @@ def test_vm_deployment(
cmd.run(["nix", "flake", "lock"], cmd.RunOpts(cwd=flake.path))
vm1_config = inspect_vm(machine=Machine("m1_machine", Flake(str(flake.path))))
vm2_config = inspect_vm(machine=Machine("m2_machine", Flake(str(flake.path))))
with ExitStack() as stack:
vm1 = stack.enter_context(spawn_vm(vm1_config, stdin=subprocess.DEVNULL))
vm2 = stack.enter_context(spawn_vm(vm2_config, stdin=subprocess.DEVNULL))
qga_m1 = stack.enter_context(vm1.qga_connect())
qga_m2 = stack.enter_context(vm2.qga_connect())
# run these always successful commands to make sure all vms have started before continuing
qga_m1.run(["echo"])
qga_m2.run(["echo"])
# check my_secret is deployed
result = qga_m1.run(["cat", "/run/secrets/vars/m1_generator/my_secret"])
assert result.stdout == "hello\n"
@@ -117,11 +97,6 @@ def test_vm_deployment(
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"]
)
assert result.stdout == "hello\n"
# check shared_secret is deployed on m2
result = qga_m2.run(
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"]
)
assert result.stdout == "hello\n"
# check no_deploy_secret is not deployed
result = qga_m1.run(
["test", "-e", "/run/secrets/vars/my_shared_generator/no_deploy_secret"],