From 082c3c1416d9b1853ba35c01c4cdd46a94788b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 1 Oct 2024 19:59:58 +0200 Subject: [PATCH] fix resource leaks in qmp tests --- pkgs/clan-cli/clan_cli/qemu/qga.py | 14 +--- pkgs/clan-cli/tests/helpers/vms.py | 32 ++++++-- pkgs/clan-cli/tests/test_vars_deployment.py | 55 +++++++------ pkgs/clan-cli/tests/test_vms_cli.py | 89 +++++++++++---------- 4 files changed, 103 insertions(+), 87 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/qemu/qga.py b/pkgs/clan-cli/clan_cli/qemu/qga.py index 9cc8b254c..0cae576a1 100644 --- a/pkgs/clan-cli/clan_cli/qemu/qga.py +++ b/pkgs/clan-cli/clan_cli/qemu/qga.py @@ -1,7 +1,6 @@ import base64 import json import socket -from pathlib import Path from time import sleep from clan_cli.errors import ClanError @@ -12,17 +11,8 @@ from clan_cli.errors import ClanError # - no need to initialize by asking for capabilities # - results need to be base64 decoded class QgaSession: - def __init__(self, socket_file: Path | str) -> None: - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # try to reconnect a couple of times if connection refused - for _ in range(100): - try: - self.sock.connect(str(socket_file)) - except ConnectionRefusedError: - sleep(0.1) - else: - return - self.sock.connect(str(socket_file)) + def __init__(self, sock: socket.socket) -> None: + self.sock = sock def get_response(self) -> dict: result = self.sock.recv(9999999) diff --git a/pkgs/clan-cli/tests/helpers/vms.py b/pkgs/clan-cli/tests/helpers/vms.py index 35f065e8f..0b0247ea6 100644 --- a/pkgs/clan-cli/tests/helpers/vms.py +++ b/pkgs/clan-cli/tests/helpers/vms.py @@ -4,6 +4,7 @@ import socket import sys import threading import traceback +from collections.abc import Iterator from pathlib import Path from time import sleep @@ -89,26 +90,43 @@ def wait_vm_down(machine_name: str, vm: VmThread, flake_url: str | None = None) # wait for vm to be up then connect and return qmp instance +@contextlib.contextmanager def qmp_connect( machine_name: str, vm: VmThread, flake_url: str | None = None -) -> QEMUMonitorProtocol: +) -> Iterator[QEMUMonitorProtocol]: if flake_url is None: flake_url = str(Path.cwd()) state_dir = vm_state_dir(flake_url, machine_name) wait_vm_up(machine_name, vm, flake_url) - qmp = QEMUMonitorProtocol( + with QEMUMonitorProtocol( address=str(os.path.realpath(state_dir / "qmp.sock")), - ) - qmp.connect() - return qmp + ) as qmp: + qmp.connect() + yield qmp # wait for vm to be up then connect and return qga instance +@contextlib.contextmanager def qga_connect( machine_name: str, vm: VmThread, flake_url: str | None = None -) -> QgaSession: +) -> Iterator[QgaSession]: if flake_url is None: flake_url = str(Path.cwd()) state_dir = vm_state_dir(flake_url, machine_name) wait_vm_up(machine_name, vm, flake_url) - return QgaSession(os.path.realpath(state_dir / "qga.sock")) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + # try to reconnect a couple of times if connection refused + socket_file = os.path.realpath(state_dir / "qga.sock") + for _ in range(100): + try: + sock.connect(str(socket_file)) + except ConnectionRefusedError: + sleep(0.1) + else: + break + sock.connect(str(socket_file)) + yield QgaSession(sock) + finally: + sock.close() diff --git a/pkgs/clan-cli/tests/test_vars_deployment.py b/pkgs/clan-cli/tests/test_vars_deployment.py index 7164e5e06..5cfb9d527 100644 --- a/pkgs/clan-cli/tests/test_vars_deployment.py +++ b/pkgs/clan-cli/tests/test_vars_deployment.py @@ -98,27 +98,34 @@ def test_vm_deployment( cmd.run(["nix", "flake", "lock"]) vm_m1 = run_vm_in_thread("m1_machine") vm_m2 = run_vm_in_thread("m2_machine") - qga_m1 = qga_connect("m1_machine", vm_m1) - qga_m2 = qga_connect("m2_machine", vm_m2) - # check my_secret is deployed - _, out, _ = qga_m1.run("cat /run/secrets/vars/m1_generator/my_secret", check=True) - assert out == "hello\n" - # check shared_secret is deployed on m1 - _, out, _ = qga_m1.run( - "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True - ) - assert out == "hello\n" - # check shared_secret is deployed on m2 - _, out, _ = qga_m2.run( - "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True - ) - assert out == "hello\n" - # check no_deploy_secret is not deployed - returncode, out, _ = qga_m1.run( - "test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", check=False - ) - assert returncode != 0 - qga_m1.exec_cmd("poweroff") - qga_m2.exec_cmd("poweroff") - wait_vm_down("m1_machine", vm_m1) - wait_vm_down("m2_machine", vm_m2) + with ( + qga_connect("m1_machine", vm_m1) as qga_m1, + qga_connect("m2_machine", vm_m2) as qga_m2, + ): + # check my_secret is deployed + _, out, _ = qga_m1.run( + "cat /run/secrets/vars/m1_generator/my_secret", check=True + ) + assert out == "hello\n" + # check shared_secret is deployed on m1 + _, out, _ = qga_m1.run( + "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True + ) + assert out == "hello\n" + # check shared_secret is deployed on m2 + _, out, _ = qga_m2.run( + "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True + ) + assert out == "hello\n" + # check no_deploy_secret is not deployed + returncode, out, _ = qga_m1.run( + "test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", + check=False, + ) + assert returncode != 0 + qga_m1.exec_cmd("poweroff") + qga_m2.exec_cmd("poweroff") + wait_vm_down("m1_machine", vm_m1) + wait_vm_down("m2_machine", vm_m2) + vm_m1.join() + vm_m2.join() diff --git a/pkgs/clan-cli/tests/test_vms_cli.py b/pkgs/clan-cli/tests/test_vms_cli.py index 756a89322..529923fc2 100644 --- a/pkgs/clan-cli/tests/test_vms_cli.py +++ b/pkgs/clan-cli/tests/test_vms_cli.py @@ -83,15 +83,15 @@ def test_vm_qmp( vm = run_vm_in_thread("my_machine") # connect with qmp - qmp = qmp_connect("my_machine", vm) + with qmp_connect("my_machine", vm) as qmp: + # verify that issuing a command works + # result = qmp.cmd_obj({"execute": "query-status"}) + result = qmp.command("query-status") + assert result["status"] == "running", result - # verify that issuing a command works - # result = qmp.cmd_obj({"execute": "query-status"}) - result = qmp.command("query-status") - assert result["status"] == "running", result - - # shutdown machine (prevent zombie qemu processes) - qmp.command("system_powerdown") + # shutdown machine (prevent zombie qemu processes) + qmp.command("system_powerdown") + vm.join() @pytest.mark.skipif(no_kvm, reason="Requires KVM") @@ -130,48 +130,49 @@ def test_vm_persistence( vm = run_vm_in_thread("my_machine") # wait for the VM to start and connect qga - qga = qga_connect("my_machine", vm) + with qga_connect("my_machine", vm) as qga: + # create state via qmp command instead of systemd service + qga.run("echo 'dream2nix' > /var/my-state/root", check=True) + qga.run("echo 'dream2nix' > /var/my-state/test", check=True) + qga.run("chown test /var/my-state/test", check=True) + qga.run("chown test /var/user-state", check=True) + qga.run("touch /var/my-state/rebooting", check=True) + qga.exec_cmd("poweroff") - # create state via qmp command instead of systemd service - qga.run("echo 'dream2nix' > /var/my-state/root", check=True) - qga.run("echo 'dream2nix' > /var/my-state/test", check=True) - qga.run("chown test /var/my-state/test", check=True) - qga.run("chown test /var/user-state", check=True) - qga.run("touch /var/my-state/rebooting", check=True) - qga.exec_cmd("poweroff") + # wait for socket to be down (systemd service 'poweroff' rebooting machine) + wait_vm_down("my_machine", vm) - # wait for socket to be down (systemd service 'poweroff' rebooting machine) - wait_vm_down("my_machine", vm) + vm.join() - # start vm again + ## start vm again vm = run_vm_in_thread("my_machine") - # connect second time - qga = qga_connect("my_machine", vm) - # check state exists - qga.run("cat /var/my-state/test", check=True) - # ensure root file is owned by root - qga.run("stat -c '%U' /var/my-state/root", check=True) - # ensure test file is owned by test - qga.run("stat -c '%U' /var/my-state/test", check=True) - # ensure /var/user-state is owned by test - qga.run("stat -c '%U' /var/user-state", check=True) + ## connect second time + with qga_connect("my_machine", vm) as qga: + # check state exists + qga.run("cat /var/my-state/test", check=True) + # ensure root file is owned by root + qga.run("stat -c '%U' /var/my-state/root", check=True) + # ensure test file is owned by test + qga.run("stat -c '%U' /var/my-state/test", check=True) + # ensure /var/user-state is owned by test + qga.run("stat -c '%U' /var/user-state", check=True) - # ensure that the file created by the service is still there and has the expected content - exitcode, out, err = qga.run("cat /var/my-state/test") - assert exitcode == 0, err - assert out == "dream2nix\n", out + # ensure that the file created by the service is still there and has the expected content + exitcode, out, err = qga.run("cat /var/my-state/test") + assert exitcode == 0, err + assert out == "dream2nix\n", out - # check for errors - exitcode, out, err = qga.run("cat /var/my-state/error") - assert exitcode == 1, out + # check for errors + exitcode, out, err = qga.run("cat /var/my-state/error") + assert exitcode == 1, out - # check all systemd services are OK, or print details - exitcode, out, err = qga.run( - "systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )" - ) - assert exitcode == 0, out + # check all systemd services are OK, or print details + exitcode, out, err = qga.run( + "systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )" + ) + assert exitcode == 0, out - # use qmp to shutdown the machine (prevent zombie qemu processes) - qmp = qmp_connect("my_machine", vm) - qmp.command("system_powerdown") + with qmp_connect("my_machine", vm) as qmp: + qmp.command("system_powerdown") + vm.join()