Merge pull request 'Fix leaks in vm tests' (#2192) from fix-warning into main

This commit is contained in:
clan-bot
2024-10-02 06:42:45 +00:00
12 changed files with 121 additions and 142 deletions

View File

@@ -194,7 +194,7 @@ def generate_facts(
prompt: Callable[[str, str], str] = prompt_func,
) -> bool:
was_regenerated = False
with TemporaryDirectory() as tmp:
with TemporaryDirectory(prefix="facts-generate-") as tmp:
tmpdir = Path(tmp)
for machine in machines:

View File

@@ -19,7 +19,7 @@ def upload_secrets(machine: Machine) -> None:
if secret_facts_store.update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
with TemporaryDirectory(prefix="facts-upload-") as tempdir:
secret_facts_store.upload(Path(tempdir))
host = machine.target_host

View File

@@ -86,7 +86,7 @@ def create_machine(opts: CreateOptions) -> None:
)
raise ClanError(msg, description=description)
with TemporaryDirectory() as tmpdir:
with TemporaryDirectory(prefix="machine-template-") as tmpdir:
tmpdirp = Path(tmpdir)
command = nix_command(
[

View File

@@ -1,7 +1,6 @@
import base64
import json
import socket
from pathlib import Path
from time import sleep
from clan_cli.errors import ClanError
@@ -12,17 +11,8 @@ from clan_cli.errors import ClanError
# - no need to initialize by asking for capabilities
# - results need to be base64 decoded
class QgaSession:
def __init__(self, socket_file: Path | str) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# try to reconnect a couple of times if connection refused
for _ in range(100):
try:
self.sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
return
self.sock.connect(str(socket_file))
def __init__(self, sock: socket.socket) -> None:
self.sock = sock
def get_response(self) -> dict:
result = self.sock.recv(9999999)

View File

@@ -19,7 +19,7 @@ def upload_secrets(machine: Machine) -> None:
if secret_store.update_check():
log.info("Secrets already up to date")
return
with TemporaryDirectory() as tempdir:
with TemporaryDirectory(prefix="vars-upload-") as tempdir:
secret_store.upload(Path(tempdir))
host = machine.target_host

View File

@@ -134,7 +134,7 @@ python3.pkgs.buildPythonApplication {
chmod +w -R ./src
cd ./src
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1 PYTHONWARNINGS=error
${pythonWithTestDeps}/bin/python -m pytest -m "not impure and not with_core" ./tests
touch $out
'';
@@ -147,7 +147,7 @@ python3.pkgs.buildPythonApplication {
cd ./src
export CLAN_CORE=${clan-core-path}
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1 PYTHONWARNINGS=error
${pythonWithTestDeps}/bin/python -m pytest -m "not impure and with_core" ./tests
touch $out
'';

View File

@@ -33,6 +33,7 @@ mkShell {
shellHook = ''
export GIT_ROOT="$(git rev-parse --show-toplevel)"
export PKG_ROOT="$GIT_ROOT/pkgs/clan-cli"
export PYTHONWARNINGS=error
# Add current package to PYTHONPATH
export PYTHONPATH="$PKG_ROOT''${PYTHONPATH:+:$PYTHONPATH:}"

View File

@@ -4,6 +4,7 @@ import socket
import sys
import threading
import traceback
from collections.abc import Iterator
from pathlib import Path
from time import sleep
@@ -89,26 +90,43 @@ def wait_vm_down(machine_name: str, vm: VmThread, flake_url: str | None = None)
# wait for vm to be up then connect and return qmp instance
@contextlib.contextmanager
def qmp_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> QEMUMonitorProtocol:
) -> Iterator[QEMUMonitorProtocol]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
qmp = QEMUMonitorProtocol(
with QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
)
qmp.connect()
return qmp
) as qmp:
qmp.connect()
yield qmp
# wait for vm to be up then connect and return qga instance
@contextlib.contextmanager
def qga_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> QgaSession:
) -> Iterator[QgaSession]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
# try to reconnect a couple of times if connection refused
socket_file = os.path.realpath(state_dir / "qga.sock")
for _ in range(100):
try:
sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
break
sock.connect(str(socket_file))
yield QgaSession(sock)
finally:
sock.close()

View File

@@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
@pytest.fixture
def temporary_home(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath:
with tempfile.TemporaryDirectory(prefix="pytest-home-") as dirpath:
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
monkeypatch.setenv("HOME", str(dirpath))
monkeypatch.setenv("XDG_CONFIG_HOME", str(Path(dirpath) / ".config"))
@@ -34,5 +34,10 @@ def temporary_home(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
monkeypatch.setenv("XDG_RUNTIME_DIR", str(runtime_dir))
monkeypatch.chdir(str(dirpath))
log.debug("Temp HOME directory: %s", str(dirpath))
yield Path(dirpath)
@pytest.fixture
def temp_dir() -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath:
yield Path(dirpath)

View File

@@ -3,7 +3,6 @@ import subprocess
from dataclasses import dataclass
from io import StringIO
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from age_keys import SopsSetup
@@ -24,7 +23,7 @@ from root import CLAN_CORE
from stdout import CaptureOutput
def test_dependencies_as_files() -> None:
def test_dependencies_as_files(temp_dir: Path) -> None:
from clan_cli.vars.generate import dependencies_as_dir
decrypted_dependencies = {
@@ -37,19 +36,17 @@ def test_dependencies_as_files() -> None:
"var_2b": b"var_2b",
},
}
with TemporaryDirectory() as tmpdir:
dep_tmpdir = Path(tmpdir)
dependencies_as_dir(decrypted_dependencies, dep_tmpdir)
assert dep_tmpdir.is_dir()
assert (dep_tmpdir / "gen_1" / "var_1a").read_bytes() == b"var_1a"
assert (dep_tmpdir / "gen_1" / "var_1b").read_bytes() == b"var_1b"
assert (dep_tmpdir / "gen_2" / "var_2a").read_bytes() == b"var_2a"
assert (dep_tmpdir / "gen_2" / "var_2b").read_bytes() == b"var_2b"
# ensure the files are not world readable
assert (dep_tmpdir / "gen_1" / "var_1a").stat().st_mode & 0o777 == 0o600
assert (dep_tmpdir / "gen_1" / "var_1b").stat().st_mode & 0o777 == 0o600
assert (dep_tmpdir / "gen_2" / "var_2a").stat().st_mode & 0o777 == 0o600
assert (dep_tmpdir / "gen_2" / "var_2b").stat().st_mode & 0o777 == 0o600
dependencies_as_dir(decrypted_dependencies, temp_dir)
assert temp_dir.is_dir()
assert (temp_dir / "gen_1" / "var_1a").read_bytes() == b"var_1a"
assert (temp_dir / "gen_1" / "var_1b").read_bytes() == b"var_1b"
assert (temp_dir / "gen_2" / "var_2a").read_bytes() == b"var_2a"
assert (temp_dir / "gen_2" / "var_2b").read_bytes() == b"var_2b"
# ensure the files are not world readable
assert (temp_dir / "gen_1" / "var_1a").stat().st_mode & 0o777 == 0o600
assert (temp_dir / "gen_1" / "var_1b").stat().st_mode & 0o777 == 0o600
assert (temp_dir / "gen_2" / "var_2a").stat().st_mode & 0o777 == 0o600
assert (temp_dir / "gen_2" / "var_2b").stat().st_mode & 0o777 == 0o600
def test_required_generators() -> None:

View File

@@ -98,27 +98,34 @@ def test_vm_deployment(
cmd.run(["nix", "flake", "lock"])
vm_m1 = run_vm_in_thread("m1_machine")
vm_m2 = run_vm_in_thread("m2_machine")
qga_m1 = qga_connect("m1_machine", vm_m1)
qga_m2 = qga_connect("m2_machine", vm_m2)
# check my_secret is deployed
_, out, _ = qga_m1.run("cat /run/secrets/vars/m1_generator/my_secret", check=True)
assert out == "hello\n"
# check shared_secret is deployed on m1
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check shared_secret is deployed on m2
_, out, _ = qga_m2.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check no_deploy_secret is not deployed
returncode, out, _ = qga_m1.run(
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", check=False
)
assert returncode != 0
qga_m1.exec_cmd("poweroff")
qga_m2.exec_cmd("poweroff")
wait_vm_down("m1_machine", vm_m1)
wait_vm_down("m2_machine", vm_m2)
with (
qga_connect("m1_machine", vm_m1) as qga_m1,
qga_connect("m2_machine", vm_m2) as qga_m2,
):
# check my_secret is deployed
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/m1_generator/my_secret", check=True
)
assert out == "hello\n"
# check shared_secret is deployed on m1
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check shared_secret is deployed on m2
_, out, _ = qga_m2.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check no_deploy_secret is not deployed
returncode, out, _ = qga_m1.run(
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret",
check=False,
)
assert returncode != 0
qga_m1.exec_cmd("poweroff")
qga_m2.exec_cmd("poweroff")
wait_vm_down("m1_machine", vm_m1)
wait_vm_down("m2_machine", vm_m2)
vm_m1.join()
vm_m2.join()

View File

@@ -54,46 +54,6 @@ def test_run(
cli.run(["vms", "run", "vm1"])
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@pytest.mark.impure
def test_vm_qmp(
monkeypatch: pytest.MonkeyPatch,
temporary_home: Path,
) -> None:
# set up a simple clan flake
flake = generate_flake(
temporary_home,
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={
"my_machine": {
"clan": {
"virtualisation": {"graphics": False},
"networking": {"targetHost": "client"},
},
"services": {"getty": {"autologinUser": "root"}},
}
},
monkeypatch=monkeypatch,
)
# 'clan vms run' must be executed from within the flake
monkeypatch.chdir(flake.path)
# start the VM
vm = run_vm_in_thread("my_machine")
# connect with qmp
qmp = qmp_connect("my_machine", vm)
# verify that issuing a command works
# result = qmp.cmd_obj({"execute": "query-status"})
result = qmp.command("query-status")
assert result["status"] == "running", result
# shutdown machine (prevent zombie qemu processes)
qmp.command("system_powerdown")
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@pytest.mark.impure
def test_vm_persistence(
@@ -130,48 +90,49 @@ def test_vm_persistence(
vm = run_vm_in_thread("my_machine")
# wait for the VM to start and connect qga
qga = qga_connect("my_machine", vm)
with qga_connect("my_machine", vm) as qga:
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
wait_vm_down("my_machine", vm)
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
wait_vm_down("my_machine", vm)
vm.join()
# start vm again
## start vm again
vm = run_vm_in_thread("my_machine")
# connect second time
qga = qga_connect("my_machine", vm)
# check state exists
qga.run("cat /var/my-state/test", check=True)
# ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True)
# ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True)
# ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True)
## connect second time
with qga_connect("my_machine", vm) as qga:
# check state exists
qga.run("cat /var/my-state/test", check=True)
# ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True)
# ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True)
# ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True)
# ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test")
assert exitcode == 0, err
assert out == "dream2nix\n", out
# ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test")
assert exitcode == 0, err
assert out == "dream2nix\n", out
# check for errors
exitcode, out, err = qga.run("cat /var/my-state/error")
assert exitcode == 1, out
# check for errors
exitcode, out, err = qga.run("cat /var/my-state/error")
assert exitcode == 1, out
# check all systemd services are OK, or print details
exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )"
)
assert exitcode == 0, out
# check all systemd services are OK, or print details
exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )"
)
assert exitcode == 0, out
# use qmp to shutdown the machine (prevent zombie qemu processes)
qmp = qmp_connect("my_machine", vm)
qmp.command("system_powerdown")
with qmp_connect("my_machine", vm) as qmp:
qmp.command("system_powerdown")
vm.join()