fix resource leaks in qmp tests

This commit is contained in:
Jörg Thalheim
2024-10-01 19:59:58 +02:00
parent 2f38955066
commit 8b205c78bf
4 changed files with 103 additions and 87 deletions

View File

@@ -1,7 +1,6 @@
import base64 import base64
import json import json
import socket import socket
from pathlib import Path
from time import sleep from time import sleep
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -12,17 +11,8 @@ from clan_cli.errors import ClanError
# - no need to initialize by asking for capabilities # - no need to initialize by asking for capabilities
# - results need to be base64 decoded # - results need to be base64 decoded
class QgaSession: class QgaSession:
def __init__(self, socket_file: Path | str) -> None: def __init__(self, sock: socket.socket) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock = sock
# try to reconnect a couple of times if connection refused
for _ in range(100):
try:
self.sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
return
self.sock.connect(str(socket_file))
def get_response(self) -> dict: def get_response(self) -> dict:
result = self.sock.recv(9999999) result = self.sock.recv(9999999)

View File

@@ -4,6 +4,7 @@ import socket
import sys import sys
import threading import threading
import traceback import traceback
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
@@ -89,26 +90,43 @@ def wait_vm_down(machine_name: str, vm: VmThread, flake_url: str | None = None)
# wait for vm to be up then connect and return qmp instance # wait for vm to be up then connect and return qmp instance
@contextlib.contextmanager
def qmp_connect( def qmp_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None machine_name: str, vm: VmThread, flake_url: str | None = None
) -> QEMUMonitorProtocol: ) -> Iterator[QEMUMonitorProtocol]:
if flake_url is None: if flake_url is None:
flake_url = str(Path.cwd()) flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name) state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url) wait_vm_up(machine_name, vm, flake_url)
qmp = QEMUMonitorProtocol( with QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")), address=str(os.path.realpath(state_dir / "qmp.sock")),
) ) as qmp:
qmp.connect() qmp.connect()
return qmp yield qmp
# wait for vm to be up then connect and return qga instance # wait for vm to be up then connect and return qga instance
@contextlib.contextmanager
def qga_connect( def qga_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None machine_name: str, vm: VmThread, flake_url: str | None = None
) -> QgaSession: ) -> Iterator[QgaSession]:
if flake_url is None: if flake_url is None:
flake_url = str(Path.cwd()) flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name) state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url) wait_vm_up(machine_name, vm, flake_url)
return QgaSession(os.path.realpath(state_dir / "qga.sock"))
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
# try to reconnect a couple of times if connection refused
socket_file = os.path.realpath(state_dir / "qga.sock")
for _ in range(100):
try:
sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
break
sock.connect(str(socket_file))
yield QgaSession(sock)
finally:
sock.close()

View File

@@ -98,27 +98,34 @@ def test_vm_deployment(
cmd.run(["nix", "flake", "lock"]) cmd.run(["nix", "flake", "lock"])
vm_m1 = run_vm_in_thread("m1_machine") vm_m1 = run_vm_in_thread("m1_machine")
vm_m2 = run_vm_in_thread("m2_machine") vm_m2 = run_vm_in_thread("m2_machine")
qga_m1 = qga_connect("m1_machine", vm_m1) with (
qga_m2 = qga_connect("m2_machine", vm_m2) qga_connect("m1_machine", vm_m1) as qga_m1,
# check my_secret is deployed qga_connect("m2_machine", vm_m2) as qga_m2,
_, out, _ = qga_m1.run("cat /run/secrets/vars/m1_generator/my_secret", check=True) ):
assert out == "hello\n" # check my_secret is deployed
# check shared_secret is deployed on m1 _, out, _ = qga_m1.run(
_, out, _ = qga_m1.run( "cat /run/secrets/vars/m1_generator/my_secret", check=True
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True )
) assert out == "hello\n"
assert out == "hello\n" # check shared_secret is deployed on m1
# check shared_secret is deployed on m2 _, out, _ = qga_m1.run(
_, out, _ = qga_m2.run( "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True )
) assert out == "hello\n"
assert out == "hello\n" # check shared_secret is deployed on m2
# check no_deploy_secret is not deployed _, out, _ = qga_m2.run(
returncode, out, _ = qga_m1.run( "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", check=False )
) assert out == "hello\n"
assert returncode != 0 # check no_deploy_secret is not deployed
qga_m1.exec_cmd("poweroff") returncode, out, _ = qga_m1.run(
qga_m2.exec_cmd("poweroff") "test -e /run/secrets/vars/my_shared_generator/no_deploy_secret",
wait_vm_down("m1_machine", vm_m1) check=False,
wait_vm_down("m2_machine", vm_m2) )
assert returncode != 0
qga_m1.exec_cmd("poweroff")
qga_m2.exec_cmd("poweroff")
wait_vm_down("m1_machine", vm_m1)
wait_vm_down("m2_machine", vm_m2)
vm_m1.join()
vm_m2.join()

View File

@@ -83,15 +83,15 @@ def test_vm_qmp(
vm = run_vm_in_thread("my_machine") vm = run_vm_in_thread("my_machine")
# connect with qmp # connect with qmp
qmp = qmp_connect("my_machine", vm) with qmp_connect("my_machine", vm) as qmp:
# verify that issuing a command works
# result = qmp.cmd_obj({"execute": "query-status"})
result = qmp.command("query-status")
assert result["status"] == "running", result
# verify that issuing a command works # shutdown machine (prevent zombie qemu processes)
# result = qmp.cmd_obj({"execute": "query-status"}) qmp.command("system_powerdown")
result = qmp.command("query-status") vm.join()
assert result["status"] == "running", result
# shutdown machine (prevent zombie qemu processes)
qmp.command("system_powerdown")
@pytest.mark.skipif(no_kvm, reason="Requires KVM") @pytest.mark.skipif(no_kvm, reason="Requires KVM")
@@ -130,48 +130,49 @@ def test_vm_persistence(
vm = run_vm_in_thread("my_machine") vm = run_vm_in_thread("my_machine")
# wait for the VM to start and connect qga # wait for the VM to start and connect qga
qga = qga_connect("my_machine", vm) with qga_connect("my_machine", vm) as qga:
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# create state via qmp command instead of systemd service # wait for socket to be down (systemd service 'poweroff' rebooting machine)
qga.run("echo 'dream2nix' > /var/my-state/root", check=True) wait_vm_down("my_machine", vm)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# wait for socket to be down (systemd service 'poweroff' rebooting machine) vm.join()
wait_vm_down("my_machine", vm)
# start vm again ## start vm again
vm = run_vm_in_thread("my_machine") vm = run_vm_in_thread("my_machine")
# connect second time ## connect second time
qga = qga_connect("my_machine", vm) with qga_connect("my_machine", vm) as qga:
# check state exists # check state exists
qga.run("cat /var/my-state/test", check=True) qga.run("cat /var/my-state/test", check=True)
# ensure root file is owned by root # ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True) qga.run("stat -c '%U' /var/my-state/root", check=True)
# ensure test file is owned by test # ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True) qga.run("stat -c '%U' /var/my-state/test", check=True)
# ensure /var/user-state is owned by test # ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True) qga.run("stat -c '%U' /var/user-state", check=True)
# ensure that the file created by the service is still there and has the expected content # ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test") exitcode, out, err = qga.run("cat /var/my-state/test")
assert exitcode == 0, err assert exitcode == 0, err
assert out == "dream2nix\n", out assert out == "dream2nix\n", out
# check for errors # check for errors
exitcode, out, err = qga.run("cat /var/my-state/error") exitcode, out, err = qga.run("cat /var/my-state/error")
assert exitcode == 1, out assert exitcode == 1, out
# check all systemd services are OK, or print details # check all systemd services are OK, or print details
exitcode, out, err = qga.run( exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )" "systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )"
) )
assert exitcode == 0, out assert exitcode == 0, out
# use qmp to shutdown the machine (prevent zombie qemu processes) with qmp_connect("my_machine", vm) as qmp:
qmp = qmp_connect("my_machine", vm) qmp.command("system_powerdown")
qmp.command("system_powerdown") vm.join()