fix resource leaks in qmp tests

This commit is contained in:
Jörg Thalheim
2024-10-01 19:59:58 +02:00
parent 91408f4c72
commit 082c3c1416
4 changed files with 103 additions and 87 deletions

View File

@@ -1,7 +1,6 @@
import base64
import json
import socket
from pathlib import Path
from time import sleep
from clan_cli.errors import ClanError
@@ -12,17 +11,8 @@ from clan_cli.errors import ClanError
# - no need to initialize by asking for capabilities
# - results need to be base64 decoded
class QgaSession:
def __init__(self, socket_file: Path | str) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# try to reconnect a couple of times if connection refused
for _ in range(100):
try:
self.sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
return
self.sock.connect(str(socket_file))
def __init__(self, sock: socket.socket) -> None:
self.sock = sock
def get_response(self) -> dict:
result = self.sock.recv(9999999)

View File

@@ -4,6 +4,7 @@ import socket
import sys
import threading
import traceback
from collections.abc import Iterator
from pathlib import Path
from time import sleep
@@ -89,26 +90,43 @@ def wait_vm_down(machine_name: str, vm: VmThread, flake_url: str | None = None)
# wait for vm to be up then connect and return qmp instance
@contextlib.contextmanager
def qmp_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> QEMUMonitorProtocol:
) -> Iterator[QEMUMonitorProtocol]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
qmp = QEMUMonitorProtocol(
with QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
)
qmp.connect()
return qmp
) as qmp:
qmp.connect()
yield qmp
# wait for vm to be up then connect and return qga instance
@contextlib.contextmanager
def qga_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> QgaSession:
) -> Iterator[QgaSession]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
# try to reconnect a couple of times if connection refused
socket_file = os.path.realpath(state_dir / "qga.sock")
for _ in range(100):
try:
sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
break
sock.connect(str(socket_file))
yield QgaSession(sock)
finally:
sock.close()

View File

@@ -98,27 +98,34 @@ def test_vm_deployment(
cmd.run(["nix", "flake", "lock"])
vm_m1 = run_vm_in_thread("m1_machine")
vm_m2 = run_vm_in_thread("m2_machine")
qga_m1 = qga_connect("m1_machine", vm_m1)
qga_m2 = qga_connect("m2_machine", vm_m2)
# check my_secret is deployed
_, out, _ = qga_m1.run("cat /run/secrets/vars/m1_generator/my_secret", check=True)
assert out == "hello\n"
# check shared_secret is deployed on m1
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check shared_secret is deployed on m2
_, out, _ = qga_m2.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check no_deploy_secret is not deployed
returncode, out, _ = qga_m1.run(
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", check=False
)
assert returncode != 0
qga_m1.exec_cmd("poweroff")
qga_m2.exec_cmd("poweroff")
wait_vm_down("m1_machine", vm_m1)
wait_vm_down("m2_machine", vm_m2)
with (
qga_connect("m1_machine", vm_m1) as qga_m1,
qga_connect("m2_machine", vm_m2) as qga_m2,
):
# check my_secret is deployed
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/m1_generator/my_secret", check=True
)
assert out == "hello\n"
# check shared_secret is deployed on m1
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check shared_secret is deployed on m2
_, out, _ = qga_m2.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
)
assert out == "hello\n"
# check no_deploy_secret is not deployed
returncode, out, _ = qga_m1.run(
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret",
check=False,
)
assert returncode != 0
qga_m1.exec_cmd("poweroff")
qga_m2.exec_cmd("poweroff")
wait_vm_down("m1_machine", vm_m1)
wait_vm_down("m2_machine", vm_m2)
vm_m1.join()
vm_m2.join()

View File

@@ -83,15 +83,15 @@ def test_vm_qmp(
vm = run_vm_in_thread("my_machine")
# connect with qmp
qmp = qmp_connect("my_machine", vm)
with qmp_connect("my_machine", vm) as qmp:
# verify that issuing a command works
# result = qmp.cmd_obj({"execute": "query-status"})
result = qmp.command("query-status")
assert result["status"] == "running", result
# verify that issuing a command works
# result = qmp.cmd_obj({"execute": "query-status"})
result = qmp.command("query-status")
assert result["status"] == "running", result
# shutdown machine (prevent zombie qemu processes)
qmp.command("system_powerdown")
# shutdown machine (prevent zombie qemu processes)
qmp.command("system_powerdown")
vm.join()
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@@ -130,48 +130,49 @@ def test_vm_persistence(
vm = run_vm_in_thread("my_machine")
# wait for the VM to start and connect qga
qga = qga_connect("my_machine", vm)
with qga_connect("my_machine", vm) as qga:
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
wait_vm_down("my_machine", vm)
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
wait_vm_down("my_machine", vm)
vm.join()
# start vm again
## start vm again
vm = run_vm_in_thread("my_machine")
# connect second time
qga = qga_connect("my_machine", vm)
# check state exists
qga.run("cat /var/my-state/test", check=True)
# ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True)
# ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True)
# ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True)
## connect second time
with qga_connect("my_machine", vm) as qga:
# check state exists
qga.run("cat /var/my-state/test", check=True)
# ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True)
# ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True)
# ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True)
# ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test")
assert exitcode == 0, err
assert out == "dream2nix\n", out
# ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test")
assert exitcode == 0, err
assert out == "dream2nix\n", out
# check for errors
exitcode, out, err = qga.run("cat /var/my-state/error")
assert exitcode == 1, out
# check for errors
exitcode, out, err = qga.run("cat /var/my-state/error")
assert exitcode == 1, out
# check all systemd services are OK, or print details
exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )"
)
assert exitcode == 0, out
# check all systemd services are OK, or print details
exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )"
)
assert exitcode == 0, out
# use qmp to shutdown the machine (prevent zombie qemu processes)
qmp = qmp_connect("my_machine", vm)
qmp.command("system_powerdown")
with qmp_connect("my_machine", vm) as qmp:
qmp.command("system_powerdown")
vm.join()