qemu: init python modules for qmp and qga

This commit is contained in:
DavHau
2024-02-01 12:32:21 +07:00
parent 57bca781b0
commit 5bdbb43490
3 changed files with 402 additions and 89 deletions

View File

@@ -1,7 +1,4 @@
import base64
import json
import os
import socket
import sys
import threading
import traceback
@@ -15,6 +12,8 @@ from fixtures_flakes import FlakeForTest, generate_flake
from root import CLAN_CORE
from clan_cli.dirs import vm_state_dir
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
if TYPE_CHECKING:
from age_keys import KeyPair
@@ -22,79 +21,6 @@ if TYPE_CHECKING:
no_kvm = not os.path.exists("/dev/kvm")
# qga is almost like qmp, but not quite, because:
# - server doesn't send initial message
# - no need to initialize by asking for capabilities
# - results need to be base64 decoded
# TODO: move this to an extra file and make it available to other parts like GUI
class QgaSession:
def __init__(self, socket_file: Path | str) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# try to reconnect a couple of times if connetion refused
for _ in range(100):
try:
self.sock.connect(str(socket_file))
return
except ConnectionRefusedError:
sleep(0.1)
self.sock.connect(str(socket_file))
def get_response(self) -> dict:
result = self.sock.recv(9999999)
return json.loads(result)
# only execute, don't wait for response
def exec_cmd(self, cmd: str) -> None:
self.sock.send(
json.dumps(
{
"execute": "guest-exec",
"arguments": {
"path": "/bin/sh",
"arg": ["-l", "-c", cmd],
"capture-output": True,
},
}
).encode("utf-8")
)
# run, wait for result, return exitcode and output
def run(self, cmd: str) -> tuple[int, str, str]:
self.exec_cmd(cmd)
result_pid = self.get_response()
pid = result_pid["return"]["pid"]
# loop until exited=true
status_payload = json.dumps(
{
"execute": "guest-exec-status",
"arguments": {
"pid": pid,
},
}
).encode("utf-8")
while True:
self.sock.send(status_payload)
result = self.get_response()
if "error" in result and result["error"]["desc"].startswith("PID"):
raise Exception("PID could not be found")
if result["return"]["exited"]:
break
sleep(0.1)
exitcode = result["return"]["exitcode"]
stdout = (
""
if "out-data" not in result["return"]
else base64.b64decode(result["return"]["out-data"]).decode("utf-8")
)
stderr = (
""
if "err-data" not in result["return"]
else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
)
return exitcode, stdout, stderr
@pytest.mark.impure
def test_inspect(
test_flake_with_core: FlakeForTest, capsys: pytest.CaptureFixture
@@ -226,19 +152,6 @@ def test_vm_persistence(
Type="oneshot",
),
),
# TODO: implement shutdown via qmp instead of this hack
poweroff=dict(
description="Poweroff the machine",
wantedBy=["multi-user.target"],
after=["read_after_reboot.service"],
script="""
while [ ! -f /var/my-state/poweroff ]; do
sleep 0.1
done
sleep 0.1
poweroff
""",
),
)
),
clan=dict(virtualisation=dict(graphics=False)),
@@ -322,3 +235,9 @@ def test_vm_persistence(
)
print(out)
assert exitcode == 0, out
qmp = QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
)
qmp.connect()
qmp.cmd_obj({"execute": "system_powerdown"})