diff --git a/pkgs/clan-cli/clan_cli/qemu/qga.py b/pkgs/clan-cli/clan_cli/qemu/qga.py new file mode 100644 index 000000000..ff409466a --- /dev/null +++ b/pkgs/clan-cli/clan_cli/qemu/qga.py @@ -0,0 +1,77 @@ +import base64 +import json +import socket +from pathlib import Path +from time import sleep + + +# qga is almost like qmp, but not quite, because: +# - server doesn't send initial message +# - no need to initialize by asking for capabilities +# - results need to be base64 decoded +class QgaSession: + def __init__(self, socket_file: Path | str) -> None: + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # try to reconnect a couple of times if connetion refused + for _ in range(100): + try: + self.sock.connect(str(socket_file)) + return + except ConnectionRefusedError: + sleep(0.1) + self.sock.connect(str(socket_file)) + + def get_response(self) -> dict: + result = self.sock.recv(9999999) + return json.loads(result) + + # only execute, don't wait for response + def exec_cmd(self, cmd: str) -> None: + self.sock.send( + json.dumps( + { + "execute": "guest-exec", + "arguments": { + "path": "/bin/sh", + "arg": ["-l", "-c", cmd], + "capture-output": True, + }, + } + ).encode("utf-8") + ) + + # run, wait for result, return exitcode and output + def run(self, cmd: str) -> tuple[int, str, str]: + self.exec_cmd(cmd) + result_pid = self.get_response() + pid = result_pid["return"]["pid"] + # loop until exited=true + status_payload = json.dumps( + { + "execute": "guest-exec-status", + "arguments": { + "pid": pid, + }, + } + ).encode("utf-8") + while True: + self.sock.send(status_payload) + result = self.get_response() + if "error" in result and result["error"]["desc"].startswith("PID"): + raise Exception("PID could not be found") + if result["return"]["exited"]: + break + sleep(0.1) + + exitcode = result["return"]["exitcode"] + stdout = ( + "" + if "out-data" not in result["return"] + else base64.b64decode(result["return"]["out-data"]).decode("utf-8") + ) + stderr = ( + "" + if "err-data" not in result["return"] + else base64.b64decode(result["return"]["err-data"]).decode("utf-8") + ) + return exitcode, stdout, stderr diff --git a/pkgs/clan-cli/clan_cli/qemu/qmp.py b/pkgs/clan-cli/clan_cli/qemu/qmp.py new file mode 100644 index 000000000..08c13a953 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/qemu/qmp.py @@ -0,0 +1,317 @@ +# mypy: ignore-errors + +""" QEMU Monitor Protocol Python class """ +# Copyright (C) 2009, 2010 Red Hat Inc. +# +# Authors: +# Luiz Capitulino +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. + +import errno +import json +import logging +import socket +from typing import Any + + +class QMPError(Exception): + """ + QMP base exception + """ + + +class QMPConnectError(QMPError): + """ + QMP connection exception + """ + + +class QMPCapabilitiesError(QMPError): + """ + QMP negotiate capabilities exception + """ + + +class QMPTimeoutError(QMPError): + """ + QMP timeout exception + """ + + +class QEMUMonitorProtocol: + """ + Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then + allow to handle commands and events. + """ + + #: Logger object for debugging messages + logger: logging.Logger = logging.getLogger("QMP") + + def __init__( + self, + address: str | tuple[str, int], + server: bool = False, + nickname: str | None = None, + ) -> None: + """ + Create a QEMUMonitorProtocol class. + + @param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + @param server: server mode listens on the socket (bool) + @raise OSError on socket connection errors + @note No connection is established, this is done by the connect() or + accept() methods + """ + self.__events: list[dict[str, Any]] = [] + self.__address: str | tuple[str, int] = address + self.__sock: socket.socket = self.__get_sock() + self.__sockfile: socket.SocketIO | None = None + self._nickname: str | None = nickname + if self._nickname: + self.logger = logging.getLogger("QMP").getChild(self._nickname) + if server: + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__sock.bind(self.__address) + self.__sock.listen(1) + + def __get_sock(self) -> socket.socket: + if isinstance(self.__address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + return socket.socket(family, socket.SOCK_STREAM) + + def __negotiate_capabilities(self) -> dict[str, Any]: + greeting = self.__json_read() + if greeting is None or "QMP" not in greeting: + raise QMPConnectError + # Greeting seems ok, negotiate capabilities + resp = self.cmd("qmp_capabilities") + if resp and "return" in resp: + return greeting + raise QMPCapabilitiesError + + def __json_read(self, only_event: bool = False) -> dict[str, Any] | None: + while True: + data = self.__sockfile.readline() + if not data: + return None + resp = json.loads(data) + if "event" in resp: + self.logger.debug("<<< %s", resp) + self.__events.append(resp) + if not only_event: + continue + return resp + + def __get_events(self, wait: bool | float = False) -> None: + """ + Check for new events in the stream and cache them in __events. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + """ + + # Check for new events regardless and pull them into the cache: + self.__sock.setblocking(0) + try: + self.__json_read() + except OSError as err: + if err.errno == errno.EAGAIN: + # No data available + pass + self.__sock.setblocking(1) + + # Wait for new events, if needed. + # if wait is 0.0, this means "no wait" and is also implicitly false. + if not self.__events and wait: + if isinstance(wait, float): + self.__sock.settimeout(wait) + try: + ret = self.__json_read(only_event=True) + except socket.timeout: + raise QMPTimeoutError("Timeout waiting for event") + except Exception: + raise QMPConnectError("Error while reading from socket") + if ret is None: + raise QMPConnectError("Error while reading from socket") + self.__sock.settimeout(None) + + def __enter__(self) -> "QEMUMonitorProtocol": + # Implement context manager enter function. + return self + + def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> bool: + # Implement context manager exit function. + self.close() + return False + + def connect(self, negotiate: bool = True) -> dict[str, Any] | None: + """ + Connect to the QMP Monitor and perform capabilities negotiation. + + @return QMP greeting dict, or None if negotiate is false + @raise OSError on socket connection errors + @raise QMPConnectError if the greeting is not received + @raise QMPCapabilitiesError if fails to negotiate capabilities + """ + self.__sock.connect(self.__address) + self.__sockfile = self.__sock.makefile() + if negotiate: + return self.__negotiate_capabilities() + return None + + def accept(self, timeout: float | None = 15.0) -> dict[str, Any]: + """ + Await connection from QMP Monitor and perform capabilities negotiation. + + @param timeout: timeout in seconds (nonnegative float number, or + None). The value passed will set the behavior of the + underneath QMP socket as described in [1]. Default value + is set to 15.0. + @return QMP greeting dict + @raise OSError on socket connection errors + @raise QMPConnectError if the greeting is not received + @raise QMPCapabilitiesError if fails to negotiate capabilities + + [1] + https://docs.python.org/3/library/socket.html#socket.socket.settimeout + """ + self.__sock.settimeout(timeout) + self.__sock, _ = self.__sock.accept() + self.__sockfile = self.__sock.makefile() + return self.__negotiate_capabilities() + + def cmd_obj(self, qmp_cmd: dict[str, Any]) -> dict[str, Any] | None: + """ + Send a QMP command to the QMP Monitor. + + @param qmp_cmd: QMP command to be sent as a Python dict + @return QMP response as a Python dict or None if the connection has + been closed + """ + self.logger.debug(">>> %s", qmp_cmd) + try: + self.__sock.sendall(json.dumps(qmp_cmd).encode("utf-8")) + except OSError as err: + if err.errno == errno.EPIPE: + return None + raise err + resp = self.__json_read() + self.logger.debug("<<< %s", resp) + return resp + + def cmd( + self, + name: str, + args: dict[str, Any] | None = None, + cmd_id: dict[str, Any] | list[Any] | str | int | None = None, + ) -> dict[str, Any] | None: + """ + Build a QMP command and send it to the QMP Monitor. + + @param name: command name (string) + @param args: command arguments (dict) + @param cmd_id: command id (dict, list, string or int) + """ + qmp_cmd: dict[str, Any] = {"execute": name} + if args: + qmp_cmd["arguments"] = args + if cmd_id: + qmp_cmd["id"] = cmd_id + return self.cmd_obj(qmp_cmd) + + def command(self, cmd: str, **kwds: Any) -> Any: + """ + Build and send a QMP command to the monitor, report errors if any + """ + ret = self.cmd(cmd, kwds) + if "error" in ret: + raise Exception(ret["error"]["desc"]) + return ret["return"] + + def pull_event(self, wait: bool | float = False) -> dict[str, Any] | None: + """ + Pulls a single event. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + + @return The first available QMP event, or None. + """ + self.__get_events(wait) + + if self.__events: + return self.__events.pop(0) + return None + + def get_events(self, wait: bool | float = False) -> list[dict[str, Any]]: + """ + Get a list of available QMP events. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + + @return The list of available QMP events. + """ + self.__get_events(wait) + return self.__events + + def clear_events(self) -> None: + """ + Clear current list of pending events. + """ + self.__events = [] + + def close(self) -> None: + """ + Close the socket and socket file. + """ + if self.__sock: + self.__sock.close() + if self.__sockfile: + self.__sockfile.close() + + def settimeout(self, timeout: float | None) -> None: + """ + Set the socket timeout. + + @param timeout (float): timeout in seconds, or None. + @note This is a wrap around socket.settimeout + """ + self.__sock.settimeout(timeout) + + def get_sock_fd(self) -> int: + """ + Get the socket file descriptor. + + @return The file descriptor number. + """ + return self.__sock.fileno() + + def is_scm_available(self) -> bool: + """ + Check if the socket allows for SCM_RIGHTS. + + @return True if SCM_RIGHTS is available, otherwise False. + """ + return self.__sock.family == socket.AF_UNIX diff --git a/pkgs/clan-cli/tests/test_vms_cli.py b/pkgs/clan-cli/tests/test_vms_cli.py index 33658da4e..5e2d75b01 100644 --- a/pkgs/clan-cli/tests/test_vms_cli.py +++ b/pkgs/clan-cli/tests/test_vms_cli.py @@ -1,7 +1,4 @@ -import base64 -import json import os -import socket import sys import threading import traceback @@ -15,6 +12,8 @@ from fixtures_flakes import FlakeForTest, generate_flake from root import CLAN_CORE from clan_cli.dirs import vm_state_dir +from clan_cli.qemu.qga import QgaSession +from clan_cli.qemu.qmp import QEMUMonitorProtocol if TYPE_CHECKING: from age_keys import KeyPair @@ -22,79 +21,6 @@ if TYPE_CHECKING: no_kvm = not os.path.exists("/dev/kvm") -# qga is almost like qmp, but not quite, because: -# - server doesn't send initial message -# - no need to initialize by asking for capabilities -# - results need to be base64 decoded -# TODO: move this to an extra file and make it available to other parts like GUI -class QgaSession: - def __init__(self, socket_file: Path | str) -> None: - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # try to reconnect a couple of times if connetion refused - for _ in range(100): - try: - self.sock.connect(str(socket_file)) - return - except ConnectionRefusedError: - sleep(0.1) - self.sock.connect(str(socket_file)) - - def get_response(self) -> dict: - result = self.sock.recv(9999999) - return json.loads(result) - - # only execute, don't wait for response - def exec_cmd(self, cmd: str) -> None: - self.sock.send( - json.dumps( - { - "execute": "guest-exec", - "arguments": { - "path": "/bin/sh", - "arg": ["-l", "-c", cmd], - "capture-output": True, - }, - } - ).encode("utf-8") - ) - - # run, wait for result, return exitcode and output - def run(self, cmd: str) -> tuple[int, str, str]: - self.exec_cmd(cmd) - result_pid = self.get_response() - pid = result_pid["return"]["pid"] - # loop until exited=true - status_payload = json.dumps( - { - "execute": "guest-exec-status", - "arguments": { - "pid": pid, - }, - } - ).encode("utf-8") - while True: - self.sock.send(status_payload) - result = self.get_response() - if "error" in result and result["error"]["desc"].startswith("PID"): - raise Exception("PID could not be found") - if result["return"]["exited"]: - break - sleep(0.1) - - exitcode = result["return"]["exitcode"] - stdout = ( - "" - if "out-data" not in result["return"] - else base64.b64decode(result["return"]["out-data"]).decode("utf-8") - ) - stderr = ( - "" - if "err-data" not in result["return"] - else base64.b64decode(result["return"]["err-data"]).decode("utf-8") - ) - return exitcode, stdout, stderr - - @pytest.mark.impure def test_inspect( test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture @@ -226,19 +152,6 @@ def test_vm_persistence( Type="oneshot", ), ), - # TODO: implement shutdown via qmp instead of this hack - poweroff=dict( - description="Poweroff the machine", - wantedBy=["multi-user.target"], - after=["read_after_reboot.service"], - script=""" - while [ ! -f /var/my-state/poweroff ]; do - sleep 0.1 - done - sleep 0.1 - poweroff - """, - ), ) ), clan=dict(virtualisation=dict(graphics=False)), @@ -322,3 +235,9 @@ def test_vm_persistence( ) print(out) assert exitcode == 0, out + + qmp = QEMUMonitorProtocol( + address=str(os.path.realpath(state_dir / "qmp.sock")), + ) + qmp.connect() + qmp.cmd_obj({"execute": "system_powerdown"})