clan_cli: Added lazy qmp
This commit is contained in:
@@ -2,10 +2,10 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
from collections.abc import Generator
|
from collections.abc import Generator
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from os import path
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
from tempfile import NamedTemporaryFile
|
||||||
|
|
||||||
|
from clan_cli.dirs import vm_state_dir
|
||||||
from qemu.qmp import QEMUMonitorProtocol
|
from qemu.qmp import QEMUMonitorProtocol
|
||||||
|
|
||||||
from ..cmd import run
|
from ..cmd import run
|
||||||
@@ -17,37 +17,25 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
class VMAttr:
|
class VMAttr:
|
||||||
def __init__(self, machine_name: str) -> None:
|
def __init__(self, state_dir: Path) -> None:
|
||||||
self.temp_dir = TemporaryDirectory(prefix="clan_vm-", suffix=f"-{machine_name}")
|
self._qmp_socket: Path = state_dir / "qmp.sock"
|
||||||
self._qmp_socket: Path = Path(self.temp_dir.name) / "qmp.sock"
|
self._qga_socket: Path = state_dir / "qga.sock"
|
||||||
self._qga_socket: Path = Path(self.temp_dir.name) / "qga.sock"
|
|
||||||
self._qmp: QEMUMonitorProtocol | None = None
|
self._qmp: QEMUMonitorProtocol | None = None
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def qmp(self) -> Generator[QEMUMonitorProtocol, None, None]:
|
def qmp(self) -> Generator[QEMUMonitorProtocol, None, None]:
|
||||||
if self._qmp is None:
|
if self._qmp is None:
|
||||||
log.debug(f"qmp_socket: {self._qmp_socket}")
|
log.debug(f"qmp_socket: {self._qmp_socket}")
|
||||||
self._qmp = QEMUMonitorProtocol(path.realpath(self._qmp_socket))
|
rpath = self._qmp_socket.resolve()
|
||||||
|
if not rpath.exists():
|
||||||
|
raise ClanError(f"qmp socket {rpath} does not exist")
|
||||||
|
self._qmp = QEMUMonitorProtocol(str(rpath))
|
||||||
self._qmp.connect()
|
self._qmp.connect()
|
||||||
try:
|
try:
|
||||||
yield self._qmp
|
yield self._qmp
|
||||||
finally:
|
finally:
|
||||||
self._qmp.close()
|
self._qmp.close()
|
||||||
|
|
||||||
@property
|
|
||||||
def qmp_socket(self) -> Path:
|
|
||||||
if self._qmp is None:
|
|
||||||
log.debug(f"qmp_socket: {self._qmp_socket}")
|
|
||||||
self._qmp = QEMUMonitorProtocol(path.realpath(self._qmp_socket))
|
|
||||||
return self._qmp_socket
|
|
||||||
|
|
||||||
@property
|
|
||||||
def qga_socket(self) -> Path:
|
|
||||||
if self._qmp is None:
|
|
||||||
log.debug(f"qmp_socket: {self.qga_socket}")
|
|
||||||
self._qmp = QEMUMonitorProtocol(path.realpath(self._qmp_socket))
|
|
||||||
return self._qga_socket
|
|
||||||
|
|
||||||
|
|
||||||
class Machine:
|
class Machine:
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -70,7 +58,9 @@ class Machine:
|
|||||||
|
|
||||||
self._deployment_info: None | dict[str, str] = deployment_info
|
self._deployment_info: None | dict[str, str] = deployment_info
|
||||||
|
|
||||||
self.vm: VMAttr = VMAttr(name)
|
state_dir = vm_state_dir(flake_url=str(self.flake), vm_name=self.name)
|
||||||
|
|
||||||
|
self.vm: VMAttr = VMAttr(state_dir)
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return f"Machine(name={self.name}, flake={self.flake})"
|
return f"Machine(name={self.name}, flake={self.flake})"
|
||||||
|
|||||||
@@ -1,10 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
from collections.abc import Generator
|
|
||||||
from contextlib import contextmanager
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
@@ -24,34 +21,6 @@ if TYPE_CHECKING:
|
|||||||
no_kvm = not os.path.exists("/dev/kvm")
|
no_kvm = not os.path.exists("/dev/kvm")
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def monkeypatch_tempdir_with_custom_path(
|
|
||||||
*, monkeypatch: pytest.MonkeyPatch, custom_path: str, prefix_condition: str
|
|
||||||
) -> Generator[None, None, None]:
|
|
||||||
# Custom wrapper function that checks the prefix and either modifies the behavior or falls back to the original
|
|
||||||
class CustomTemporaryDirectory(tempfile.TemporaryDirectory):
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
suffix: str | None = None,
|
|
||||||
prefix: str | None = None,
|
|
||||||
dir: str | None = None, # noqa: A002
|
|
||||||
) -> None:
|
|
||||||
if prefix == prefix_condition:
|
|
||||||
self.name = custom_path # Use the custom path
|
|
||||||
self._finalizer = None # Prevent cleanup attempts on the custom path by the original finalizer
|
|
||||||
else:
|
|
||||||
super().__init__(suffix=suffix, prefix=prefix, dir=dir)
|
|
||||||
|
|
||||||
# Use ExitStack to ensure unpatching
|
|
||||||
try:
|
|
||||||
# Patch the TemporaryDirectory with our custom class
|
|
||||||
monkeypatch.setattr(tempfile, "TemporaryDirectory", CustomTemporaryDirectory)
|
|
||||||
yield # This allows the code within the 'with' block of this context manager to run
|
|
||||||
finally:
|
|
||||||
# Unpatch the TemporaryDirectory
|
|
||||||
monkeypatch.undo()
|
|
||||||
|
|
||||||
|
|
||||||
def run_vm_in_thread(machine_name: str) -> None:
|
def run_vm_in_thread(machine_name: str) -> None:
|
||||||
# runs machine and prints exceptions
|
# runs machine and prints exceptions
|
||||||
def run() -> None:
|
def run() -> None:
|
||||||
@@ -71,14 +40,14 @@ def run_vm_in_thread(machine_name: str) -> None:
|
|||||||
# wait for qmp socket to exist
|
# wait for qmp socket to exist
|
||||||
def wait_vm_up(state_dir: Path) -> None:
|
def wait_vm_up(state_dir: Path) -> None:
|
||||||
socket_file = state_dir / "qga.sock"
|
socket_file = state_dir / "qga.sock"
|
||||||
timeout: float = 50
|
timeout = 5.0
|
||||||
while timeout > 0:
|
while True:
|
||||||
|
if timeout <= 0:
|
||||||
|
raise TimeoutError(f"qga socket {socket_file} not found")
|
||||||
if socket_file.exists():
|
if socket_file.exists():
|
||||||
break
|
break
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
timeout -= 0.1
|
timeout -= 0.1
|
||||||
if timeout <= 0:
|
|
||||||
raise TimeoutError(f"{socket_file} did not appear")
|
|
||||||
|
|
||||||
|
|
||||||
# wait for vm to be down by checking if qga socket is down
|
# wait for vm to be down by checking if qga socket is down
|
||||||
@@ -161,18 +130,18 @@ def test_vm_qmp(
|
|||||||
monkeypatch.chdir(flake.path)
|
monkeypatch.chdir(flake.path)
|
||||||
|
|
||||||
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
|
# the state dir is a point of reference for qemu interactions as it links to the qga/qmp sockets
|
||||||
qmp_state_dir = temporary_home / "vm-tmp"
|
state_dir = vm_state_dir(str(flake.path), "my_machine")
|
||||||
|
|
||||||
with monkeypatch_tempdir_with_custom_path(
|
|
||||||
monkeypatch=monkeypatch,
|
|
||||||
custom_path=str(qmp_state_dir),
|
|
||||||
prefix_condition="machine_vm-",
|
|
||||||
):
|
|
||||||
# start the VM
|
# start the VM
|
||||||
run_vm_in_thread("my_machine")
|
run_vm_in_thread("my_machine")
|
||||||
|
|
||||||
# connect with qmp
|
# connect with qmp
|
||||||
qmp = qmp_connect(qmp_state_dir)
|
qmp = qmp_connect(state_dir)
|
||||||
|
|
||||||
|
# verify that issuing a command works
|
||||||
|
# result = qmp.cmd_obj({"execute": "query-status"})
|
||||||
|
result = qmp.command("query-status")
|
||||||
|
assert result["status"] == "running", result
|
||||||
|
|
||||||
# shutdown machine (prevent zombie qemu processes)
|
# shutdown machine (prevent zombie qemu processes)
|
||||||
qmp.command("system_powerdown")
|
qmp.command("system_powerdown")
|
||||||
|
|||||||
@@ -169,8 +169,13 @@ class VM(GObject.Object):
|
|||||||
vm=vm,
|
vm=vm,
|
||||||
)
|
)
|
||||||
self.process.proc.join()
|
self.process.proc.join()
|
||||||
|
|
||||||
GLib.idle_add(self.emit, "build_vm", self, False)
|
GLib.idle_add(self.emit, "build_vm", self, False)
|
||||||
|
|
||||||
|
if self.process.proc.exitcode != 0:
|
||||||
|
log.error(f"Failed to build VM {self.get_id()}")
|
||||||
|
return
|
||||||
|
|
||||||
self.process = spawn(
|
self.process = spawn(
|
||||||
on_except=None,
|
on_except=None,
|
||||||
log_dir=Path(str(self.log_dir.name)),
|
log_dir=Path(str(self.log_dir.name)),
|
||||||
@@ -189,8 +194,6 @@ class VM(GObject.Object):
|
|||||||
if self._watcher_id == 0:
|
if self._watcher_id == 0:
|
||||||
raise ClanError("Failed to add watcher")
|
raise ClanError("Failed to add watcher")
|
||||||
|
|
||||||
self.machine.qmp_connect()
|
|
||||||
|
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
if self.is_running():
|
if self.is_running():
|
||||||
log.warn("VM is already running")
|
log.warn("VM is already running")
|
||||||
@@ -249,7 +252,12 @@ class VM(GObject.Object):
|
|||||||
def __stop(self) -> None:
|
def __stop(self) -> None:
|
||||||
log.info(f"Stopping VM {self.get_id()}")
|
log.info(f"Stopping VM {self.get_id()}")
|
||||||
|
|
||||||
self.machine.qmp_command("system_powerdown")
|
try:
|
||||||
|
with self.machine.vm.qmp() as qmp:
|
||||||
|
qmp.command("system_powerdown")
|
||||||
|
except ClanError as e:
|
||||||
|
log.debug(e)
|
||||||
|
|
||||||
self._stop_timer_init = datetime.now()
|
self._stop_timer_init = datetime.now()
|
||||||
self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog)
|
self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog)
|
||||||
if self._stop_watcher_id == 0:
|
if self._stop_watcher_id == 0:
|
||||||
|
|||||||
Reference in New Issue
Block a user