clan-cli/tests: improve test helpers for VMs
This commit is contained in:
11
pkgs/clan-cli/tests/helpers/nixos_config.py
Normal file
11
pkgs/clan-cli/tests/helpers/nixos_config.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
from collections import defaultdict
|
||||||
|
from collections.abc import Callable
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
def def_value() -> defaultdict:
|
||||||
|
return defaultdict(def_value)
|
||||||
|
|
||||||
|
|
||||||
|
# allows defining nested dictionary in a single line
|
||||||
|
nested_dict: Callable[[], dict[str, Any]] = lambda: defaultdict(def_value)
|
||||||
@@ -1,16 +1,14 @@
|
|||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
from collections import defaultdict
|
|
||||||
from collections.abc import Callable
|
|
||||||
from io import StringIO
|
from io import StringIO
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from age_keys import SopsSetup
|
from age_keys import SopsSetup
|
||||||
from fixtures_flakes import generate_flake
|
from fixtures_flakes import generate_flake
|
||||||
from helpers import cli
|
from helpers import cli
|
||||||
|
from helpers.nixos_config import nested_dict
|
||||||
from root import CLAN_CORE
|
from root import CLAN_CORE
|
||||||
|
|
||||||
from clan_cli.clan_uri import FlakeId
|
from clan_cli.clan_uri import FlakeId
|
||||||
@@ -20,14 +18,6 @@ from clan_cli.vars.public_modules import in_repo
|
|||||||
from clan_cli.vars.secret_modules import password_store, sops
|
from clan_cli.vars.secret_modules import password_store, sops
|
||||||
|
|
||||||
|
|
||||||
def def_value() -> defaultdict:
|
|
||||||
return defaultdict(def_value)
|
|
||||||
|
|
||||||
|
|
||||||
# allows defining nested dictionary in a single line
|
|
||||||
nested_dict: Callable[[], dict[str, Any]] = lambda: defaultdict(def_value)
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_subgraph() -> None:
|
def test_get_subgraph() -> None:
|
||||||
from clan_cli.vars.generate import _get_subgraph
|
from clan_cli.vars.generate import _get_subgraph
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from typing import TYPE_CHECKING
|
|||||||
import pytest
|
import pytest
|
||||||
from fixtures_flakes import FlakeForTest, generate_flake
|
from fixtures_flakes import FlakeForTest, generate_flake
|
||||||
from helpers import cli
|
from helpers import cli
|
||||||
|
from helpers.nixos_config import nested_dict
|
||||||
from root import CLAN_CORE
|
from root import CLAN_CORE
|
||||||
|
|
||||||
from clan_cli.dirs import vm_state_dir
|
from clan_cli.dirs import vm_state_dir
|
||||||
@@ -35,16 +36,19 @@ def run_vm_in_thread(machine_name: str) -> None:
|
|||||||
t = threading.Thread(target=run, name="run")
|
t = threading.Thread(target=run, name="run")
|
||||||
t.daemon = True
|
t.daemon = True
|
||||||
t.start()
|
t.start()
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
# wait for qmp socket to exist
|
# wait for qmp socket to exist
|
||||||
def wait_vm_up(state_dir: Path) -> None:
|
def wait_vm_up(machine_name: str, flake_url: str | None = None) -> None:
|
||||||
socket_file = state_dir / "qga.sock"
|
if flake_url is None:
|
||||||
|
flake_url = str(Path.cwd())
|
||||||
|
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
|
||||||
timeout: float = 100
|
timeout: float = 100
|
||||||
while True:
|
while True:
|
||||||
if timeout <= 0:
|
if timeout <= 0:
|
||||||
raise TimeoutError(
|
raise TimeoutError(
|
||||||
f"qga socket {socket_file} not found. Is the VM running?"
|
f"qmp socket {socket_file} not found. Is the VM running?"
|
||||||
)
|
)
|
||||||
if socket_file.exists():
|
if socket_file.exists():
|
||||||
break
|
break
|
||||||
@@ -52,22 +56,27 @@ def wait_vm_up(state_dir: Path) -> None:
|
|||||||
timeout -= 0.1
|
timeout -= 0.1
|
||||||
|
|
||||||
|
|
||||||
# wait for vm to be down by checking if qga socket is down
|
# wait for vm to be down by checking if qmp socket is down
|
||||||
def wait_vm_down(state_dir: Path) -> None:
|
def wait_vm_down(machine_name: str, flake_url: str | None = None) -> None:
|
||||||
socket_file = state_dir / "qga.sock"
|
if flake_url is None:
|
||||||
|
flake_url = str(Path.cwd())
|
||||||
|
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
|
||||||
timeout: float = 300
|
timeout: float = 300
|
||||||
while socket_file.exists():
|
while socket_file.exists():
|
||||||
if timeout <= 0:
|
if timeout <= 0:
|
||||||
raise TimeoutError(
|
raise TimeoutError(
|
||||||
f"qga socket {socket_file} still exists. Is the VM down?"
|
f"qmp socket {socket_file} still exists. Is the VM down?"
|
||||||
)
|
)
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
timeout -= 0.1
|
timeout -= 0.1
|
||||||
|
|
||||||
|
|
||||||
# wait for vm to be up then connect and return qmp instance
|
# wait for vm to be up then connect and return qmp instance
|
||||||
def qmp_connect(state_dir: Path) -> QEMUMonitorProtocol:
|
def qmp_connect(machine_name: str, flake_url: str | None = None) -> QEMUMonitorProtocol:
|
||||||
wait_vm_up(state_dir)
|
if flake_url is None:
|
||||||
|
flake_url = str(Path.cwd())
|
||||||
|
state_dir = vm_state_dir(flake_url, machine_name)
|
||||||
|
wait_vm_up(machine_name, flake_url)
|
||||||
qmp = QEMUMonitorProtocol(
|
qmp = QEMUMonitorProtocol(
|
||||||
address=str(os.path.realpath(state_dir / "qmp.sock")),
|
address=str(os.path.realpath(state_dir / "qmp.sock")),
|
||||||
)
|
)
|
||||||
@@ -76,8 +85,11 @@ def qmp_connect(state_dir: Path) -> QEMUMonitorProtocol:
|
|||||||
|
|
||||||
|
|
||||||
# wait for vm to be up then connect and return qga instance
|
# wait for vm to be up then connect and return qga instance
|
||||||
def qga_connect(state_dir: Path) -> QgaSession:
|
def qga_connect(machine_name: str, flake_url: str | None = None) -> QgaSession:
|
||||||
wait_vm_up(state_dir)
|
if flake_url is None:
|
||||||
|
flake_url = str(Path.cwd())
|
||||||
|
state_dir = vm_state_dir(flake_url, machine_name)
|
||||||
|
wait_vm_up(machine_name, flake_url)
|
||||||
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
|
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
|
||||||
|
|
||||||
|
|
||||||
@@ -144,14 +156,11 @@ def test_vm_qmp(
|
|||||||
# 'clan vms run' must be executed from within the flake
|
# 'clan vms run' must be executed from within the flake
|
||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
|
|
||||||
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
|
|
||||||
state_dir = vm_state_dir(str(flake.path), "my_machine")
|
|
||||||
|
|
||||||
# start the VM
|
# start the VM
|
||||||
run_vm_in_thread("my_machine")
|
run_vm_in_thread("my_machine")
|
||||||
|
|
||||||
# connect with qmp
|
# connect with qmp
|
||||||
qmp = qmp_connect(state_dir)
|
qmp = qmp_connect("my_machine")
|
||||||
|
|
||||||
# verify that issuing a command works
|
# verify that issuing a command works
|
||||||
# result = qmp.cmd_obj({"execute": "query-status"})
|
# result = qmp.cmd_obj({"execute": "query-status"})
|
||||||
@@ -169,121 +178,60 @@ def test_vm_persistence(
|
|||||||
temporary_home: Path,
|
temporary_home: Path,
|
||||||
) -> None:
|
) -> None:
|
||||||
# set up a clan flake with some systemd services to test persistence
|
# set up a clan flake with some systemd services to test persistence
|
||||||
|
config = nested_dict()
|
||||||
|
# logrotate-checkconf doesn't work in VM because /nix/store is owned by nobody
|
||||||
|
config["my_machine"]["systemd"]["services"]["logrotate-checkconf"]["enable"] = False
|
||||||
|
config["my_machine"]["services"]["getty"]["autologinUser"] = "root"
|
||||||
|
config["my_machine"]["clan"]["virtualisation"] = {"graphics": False}
|
||||||
|
config["my_machine"]["clan"]["networking"] = {"targetHost": "client"}
|
||||||
|
config["my_machine"]["clan"]["core"]["state"]["my_state"]["folders"] = [
|
||||||
|
# to be owned by root
|
||||||
|
"/var/my-state",
|
||||||
|
# to be owned by user 'test'
|
||||||
|
"/var/user-state",
|
||||||
|
]
|
||||||
|
config["my_machine"]["users"]["users"] = {
|
||||||
|
"test": {"password": "test", "isNormalUser": True},
|
||||||
|
"root": {"password": "root"},
|
||||||
|
}
|
||||||
|
|
||||||
flake = generate_flake(
|
flake = generate_flake(
|
||||||
temporary_home,
|
temporary_home,
|
||||||
flake_template=CLAN_CORE / "templates" / "new-clan",
|
flake_template=CLAN_CORE / "templates" / "new-clan",
|
||||||
machine_configs=dict(
|
machine_configs=config,
|
||||||
my_machine=dict(
|
|
||||||
services=dict(getty=dict(autologinUser="root")),
|
|
||||||
clanCore=dict(
|
|
||||||
state=dict(
|
|
||||||
my_state=dict(
|
|
||||||
folders=[
|
|
||||||
# to be owned by root
|
|
||||||
"/var/my-state",
|
|
||||||
# to be owned by user 'test'
|
|
||||||
"/var/user-state",
|
|
||||||
]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
),
|
|
||||||
# create test user to test if state can be owned by user
|
|
||||||
users=dict(
|
|
||||||
users=dict(
|
|
||||||
test=dict(
|
|
||||||
password="test",
|
|
||||||
isNormalUser=True,
|
|
||||||
),
|
|
||||||
root=dict(password="root"),
|
|
||||||
)
|
|
||||||
),
|
|
||||||
# create a systemd service to create a file in the state folder
|
|
||||||
# and another to read it after reboot
|
|
||||||
systemd=dict(
|
|
||||||
services=dict(
|
|
||||||
create_state=dict(
|
|
||||||
description="Create a file in the state folder",
|
|
||||||
wantedBy=["multi-user.target"],
|
|
||||||
script="""
|
|
||||||
if [ ! -f /var/my-state/root ]; then
|
|
||||||
echo "Creating a file in the state folder"
|
|
||||||
echo "dream2nix" > /var/my-state/root
|
|
||||||
# create /var/my-state/test owned by user test
|
|
||||||
echo "dream2nix" > /var/my-state/test
|
|
||||||
chown test /var/my-state/test
|
|
||||||
# make sure /var/user-state is owned by test
|
|
||||||
chown test /var/user-state
|
|
||||||
fi
|
|
||||||
""",
|
|
||||||
serviceConfig=dict(
|
|
||||||
Type="oneshot",
|
|
||||||
),
|
|
||||||
),
|
|
||||||
reboot=dict(
|
|
||||||
description="Reboot the machine",
|
|
||||||
wantedBy=["multi-user.target"],
|
|
||||||
after=["my-state.service"],
|
|
||||||
script="""
|
|
||||||
if [ ! -f /var/my-state/rebooting ]; then
|
|
||||||
echo "Rebooting the machine"
|
|
||||||
touch /var/my-state/rebooting
|
|
||||||
poweroff
|
|
||||||
else
|
|
||||||
touch /var/my-state/rebooted
|
|
||||||
fi
|
|
||||||
""",
|
|
||||||
),
|
|
||||||
read_after_reboot=dict(
|
|
||||||
description="Read a file in the state folder",
|
|
||||||
wantedBy=["multi-user.target"],
|
|
||||||
after=["reboot.service"],
|
|
||||||
# TODO: currently state folders itself cannot be owned by users
|
|
||||||
script="""
|
|
||||||
if ! cat /var/my-state/test; then
|
|
||||||
echo "cannot read from state file" > /var/my-state/error
|
|
||||||
# ensure root file is owned by root
|
|
||||||
elif [ "$(stat -c '%U' /var/my-state/root)" != "root" ]; then
|
|
||||||
echo "state file /var/my-state/root is not owned by user root" > /var/my-state/error
|
|
||||||
# ensure test file is owned by test
|
|
||||||
elif [ "$(stat -c '%U' /var/my-state/test)" != "test" ]; then
|
|
||||||
echo "state file /var/my-state/test is not owned by user test" > /var/my-state/error
|
|
||||||
# ensure /var/user-state is owned by test
|
|
||||||
elif [ "$(stat -c '%U' /var/user-state)" != "test" ]; then
|
|
||||||
echo "state folder /var/user-state is not owned by user test" > /var/my-state/error
|
|
||||||
fi
|
|
||||||
|
|
||||||
""",
|
|
||||||
serviceConfig=dict(
|
|
||||||
Type="oneshot",
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
),
|
|
||||||
clan=dict(
|
|
||||||
virtualisation=dict(graphics=False),
|
|
||||||
networking=dict(targetHost="client"),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
monkeypatch.chdir(flake.path)
|
|
||||||
|
|
||||||
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
|
monkeypatch.chdir(flake.path)
|
||||||
state_dir = vm_state_dir(str(flake.path), "my_machine")
|
|
||||||
|
|
||||||
run_vm_in_thread("my_machine")
|
run_vm_in_thread("my_machine")
|
||||||
|
|
||||||
# wait for the VM to start
|
# wait for the VM to start and connect qga
|
||||||
wait_vm_up(state_dir)
|
qga = qga_connect("my_machine")
|
||||||
|
|
||||||
|
# create state via qmp command instead of systemd service
|
||||||
|
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
|
||||||
|
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
|
||||||
|
qga.run("chown test /var/my-state/test", check=True)
|
||||||
|
qga.run("chown test /var/user-state", check=True)
|
||||||
|
qga.run("touch /var/my-state/rebooting", check=True)
|
||||||
|
qga.exec_cmd("poweroff")
|
||||||
|
|
||||||
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
|
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
|
||||||
wait_vm_down(state_dir)
|
wait_vm_down("my_machine")
|
||||||
|
|
||||||
# start vm again
|
# start vm again
|
||||||
run_vm_in_thread("my_machine")
|
run_vm_in_thread("my_machine")
|
||||||
|
|
||||||
# connect second time
|
# connect second time
|
||||||
qga = qga_connect(state_dir)
|
qga = qga_connect("my_machine")
|
||||||
|
# check state exists
|
||||||
|
qga.run("cat /var/my-state/test", check=True)
|
||||||
|
# ensure root file is owned by root
|
||||||
|
qga.run("stat -c '%U' /var/my-state/root", check=True)
|
||||||
|
# ensure test file is owned by test
|
||||||
|
qga.run("stat -c '%U' /var/my-state/test", check=True)
|
||||||
|
# ensure /var/user-state is owned by test
|
||||||
|
qga.run("stat -c '%U' /var/user-state", check=True)
|
||||||
|
|
||||||
# ensure that the file created by the service is still there and has the expected content
|
# ensure that the file created by the service is still there and has the expected content
|
||||||
exitcode, out, err = qga.run("cat /var/my-state/test")
|
exitcode, out, err = qga.run("cat /var/my-state/test")
|
||||||
@@ -301,5 +249,5 @@ def test_vm_persistence(
|
|||||||
assert exitcode == 0, out
|
assert exitcode == 0, out
|
||||||
|
|
||||||
# use qmp to shutdown the machine (prevent zombie qemu processes)
|
# use qmp to shutdown the machine (prevent zombie qemu processes)
|
||||||
qmp = qmp_connect(state_dir)
|
qmp = qmp_connect("my_machine")
|
||||||
qmp.command("system_powerdown")
|
qmp.command("system_powerdown")
|
||||||
|
|||||||
Reference in New Issue
Block a user