Files
clan-core/pkgs/clan-cli/tests/helpers/vms.py
2024-10-01 20:54:19 +02:00

133 lines
4.1 KiB
Python

import contextlib
import os
import socket
import sys
import threading
import traceback
from collections.abc import Iterator
from pathlib import Path
from time import sleep
from clan_cli.dirs import vm_state_dir
from clan_cli.errors import ClanError
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from . import cli
def find_free_port() -> int:
"""Find an unused localhost port from 1024-65535 and return it."""
with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
class VmThread(threading.Thread):
def __init__(self, machine_name: str, ssh_port: int | None = None) -> None:
super().__init__()
self.machine_name = machine_name
self.ssh_port = ssh_port
self.exception: Exception | None = None
self.daemon = True
def run(self) -> None:
try:
cli.run(
["vms", "run", self.machine_name, "--publish", f"{self.ssh_port}:22"]
)
except Exception as ex:
# print exception details
print(traceback.format_exc(), file=sys.stderr)
print(sys.exc_info()[2], file=sys.stderr)
self.exception = ex
def run_vm_in_thread(machine_name: str, ssh_port: int | None = None) -> VmThread:
# runs machine and prints exceptions
if ssh_port is None:
ssh_port = find_free_port()
vm_thread = VmThread(machine_name, ssh_port)
vm_thread.start()
return vm_thread
# wait for qmp socket to exist
def wait_vm_up(machine_name: str, vm: VmThread, flake_url: str | None = None) -> None:
if flake_url is None:
flake_url = str(Path.cwd())
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
timeout: float = 600
while True:
if vm.exception:
msg = "VM failed to start"
raise ClanError(msg) from vm.exception
if timeout <= 0:
msg = f"qmp socket {socket_file} not found. Is the VM running?"
raise TimeoutError(msg)
if socket_file.exists():
break
sleep(0.1)
timeout -= 0.1
# wait for vm to be down by checking if qmp socket is down
def wait_vm_down(machine_name: str, vm: VmThread, flake_url: str | None = None) -> None:
if flake_url is None:
flake_url = str(Path.cwd())
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
timeout: float = 300
while socket_file.exists():
if vm.exception:
msg = "VM failed to start"
raise ClanError(msg) from vm.exception
if timeout <= 0:
msg = f"qmp socket {socket_file} still exists. Is the VM down?"
raise TimeoutError(msg)
sleep(0.1)
timeout -= 0.1
# wait for vm to be up then connect and return qmp instance
@contextlib.contextmanager
def qmp_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> Iterator[QEMUMonitorProtocol]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
with QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
) as qmp:
qmp.connect()
yield qmp
# wait for vm to be up then connect and return qga instance
@contextlib.contextmanager
def qga_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> Iterator[QgaSession]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
# try to reconnect a couple of times if connection refused
socket_file = os.path.realpath(state_dir / "qga.sock")
for _ in range(100):
try:
sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
break
sock.connect(str(socket_file))
yield QgaSession(sock)
finally:
sock.close()