expose the option to run commands in virtual machines

This commit is contained in:
Jörg Thalheim
2024-10-08 18:51:58 +02:00
committed by Mic92
parent 6cae6341c9
commit cf3c67d830
4 changed files with 145 additions and 99 deletions

View File

@@ -1,9 +1,9 @@
import base64
import json
import socket
from time import sleep
import time
import types
from clan_cli.errors import ClanError
from clan_cli.qemu.qmp import QEMUMonitorProtocol
# qga is almost like qmp, but not quite, because:
@@ -11,51 +11,47 @@ from clan_cli.errors import ClanError
# - no need to initialize by asking for capabilities
# - results need to be base64 decoded
class QgaSession:
def __init__(self, sock: socket.socket) -> None:
self.sock = sock
def __init__(self, address: str) -> None:
self.client = QEMUMonitorProtocol(address)
self.client.connect(negotiate=False)
def get_response(self) -> dict:
result = self.sock.recv(9999999)
return json.loads(result)
def __enter__(self) -> "QgaSession":
# Implement context manager enter function.
return self
# only execute, don't wait for response
def exec_cmd(self, cmd: str) -> None:
self.sock.send(
json.dumps(
{
"execute": "guest-exec",
"arguments": {
"path": "/bin/sh",
"arg": ["-l", "-c", cmd],
"capture-output": True,
},
}
).encode("utf-8")
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
# Implement context manager exit function.
self.client.close()
def run_nonblocking(self, cmd: list[str]) -> int:
result_pid = self.client.cmd(
"guest-exec", {"path": cmd[0], "arg": cmd[1:], "capture-output": True}
)
if result_pid is None:
msg = "Could not get PID from QGA"
raise ClanError(msg)
return result_pid["return"]["pid"]
# run, wait for result, return exitcode and output
def run(self, cmd: str, check: bool = False) -> tuple[int, str, str]:
self.exec_cmd(cmd)
result_pid = self.get_response()
pid = result_pid["return"]["pid"]
def run(self, cmd: list[str], check: bool = False) -> tuple[int, str, str]:
pid = self.run_nonblocking(cmd)
# loop until exited=true
status_payload = json.dumps(
{
"execute": "guest-exec-status",
"arguments": {
"pid": pid,
},
}
).encode("utf-8")
while True:
self.sock.send(status_payload)
result = self.get_response()
result = self.client.cmd("guest-exec-status", {"pid": pid})
if result is None:
msg = "Could not get status from QGA"
raise ClanError(msg)
if "error" in result and result["error"]["desc"].startswith("PID"):
msg = "PID could not be found"
raise ClanError(msg)
if result["return"]["exited"]:
break
sleep(0.1)
time.sleep(0.1)
exitcode = result["return"]["exitcode"]
stdout = (