expose the option to run commands in virtual machines

This commit is contained in:
Jörg Thalheim
2024-10-08 18:51:58 +02:00
committed by Mic92
parent 53bde4f9f7
commit 112d7bf2be
4 changed files with 145 additions and 99 deletions

View File

@@ -1,9 +1,9 @@
import base64 import base64
import json import time
import socket import types
from time import sleep
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.qemu.qmp import QEMUMonitorProtocol
# qga is almost like qmp, but not quite, because: # qga is almost like qmp, but not quite, because:
@@ -11,51 +11,47 @@ from clan_cli.errors import ClanError
# - no need to initialize by asking for capabilities # - no need to initialize by asking for capabilities
# - results need to be base64 decoded # - results need to be base64 decoded
class QgaSession: class QgaSession:
def __init__(self, sock: socket.socket) -> None: def __init__(self, address: str) -> None:
self.sock = sock self.client = QEMUMonitorProtocol(address)
self.client.connect(negotiate=False)
def get_response(self) -> dict: def __enter__(self) -> "QgaSession":
result = self.sock.recv(9999999) # Implement context manager enter function.
return json.loads(result) return self
# only execute, don't wait for response def __exit__(
def exec_cmd(self, cmd: str) -> None: self,
self.sock.send( exc_type: type[BaseException] | None,
json.dumps( exc_value: BaseException | None,
{ traceback: types.TracebackType | None,
"execute": "guest-exec", ) -> None:
"arguments": { # Implement context manager exit function.
"path": "/bin/sh", self.client.close()
"arg": ["-l", "-c", cmd],
"capture-output": True, def run_nonblocking(self, cmd: list[str]) -> int:
}, result_pid = self.client.cmd(
} "guest-exec", {"path": cmd[0], "arg": cmd[1:], "capture-output": True}
).encode("utf-8")
) )
if result_pid is None:
msg = "Could not get PID from QGA"
raise ClanError(msg)
return result_pid["return"]["pid"]
# run, wait for result, return exitcode and output # run, wait for result, return exitcode and output
def run(self, cmd: str, check: bool = False) -> tuple[int, str, str]: def run(self, cmd: list[str], check: bool = False) -> tuple[int, str, str]:
self.exec_cmd(cmd) pid = self.run_nonblocking(cmd)
result_pid = self.get_response()
pid = result_pid["return"]["pid"]
# loop until exited=true # loop until exited=true
status_payload = json.dumps(
{
"execute": "guest-exec-status",
"arguments": {
"pid": pid,
},
}
).encode("utf-8")
while True: while True:
self.sock.send(status_payload) result = self.client.cmd("guest-exec-status", {"pid": pid})
result = self.get_response() if result is None:
msg = "Could not get status from QGA"
raise ClanError(msg)
if "error" in result and result["error"]["desc"].startswith("PID"): if "error" in result and result["error"]["desc"].startswith("PID"):
msg = "PID could not be found" msg = "PID could not be found"
raise ClanError(msg) raise ClanError(msg)
if result["return"]["exited"]: if result["return"]["exited"]:
break break
sleep(0.1) time.sleep(0.1)
exitcode = result["return"]["exitcode"] exitcode = result["return"]["exitcode"]
stdout = ( stdout = (

View File

@@ -3,10 +3,11 @@ import importlib
import json import json
import logging import logging
import os import os
import socket
import subprocess import subprocess
import time import time
from dataclasses import dataclass
from collections.abc import Iterator from collections.abc import Iterator
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack, contextmanager from contextlib import ExitStack, contextmanager
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
@@ -121,11 +122,14 @@ def start_vm(
extra_env: dict[str, str], extra_env: dict[str, str],
stdout: int | None = None, stdout: int | None = None,
stderr: int | None = None, stderr: int | None = None,
stdin: int | None = None,
) -> Iterator[subprocess.Popen]: ) -> Iterator[subprocess.Popen]:
env = os.environ.copy() env = os.environ.copy()
env.update(extra_env) env.update(extra_env)
cmd = nix_shell(packages, args) cmd = nix_shell(packages, args)
with subprocess.Popen(cmd, env=env, stdout=stdout, stderr=stderr) as process: with subprocess.Popen(
cmd, env=env, stdout=stdout, stderr=stderr, stdin=stdin
) as process:
try: try:
yield process yield process
finally: finally:
@@ -142,12 +146,12 @@ class QemuVm:
self, self,
machine: Machine, machine: Machine,
process: subprocess.Popen, process: subprocess.Popen,
socketdir: Path,
) -> None: ) -> None:
self.machine = machine self.machine = machine
self.process = process self.process = process
self.state_dir = vm_state_dir(self.machine.flake, self.machine.name) self.qmp_socket_file = socketdir / "qmp.sock"
self.qmp_socket_file = self.state_dir / "qmp.sock" self.qga_socket_file = socketdir / "qga.sock"
self.qga_socket_file = self.state_dir / "qga.sock"
# wait for vm to be up then connect and return qmp instance # wait for vm to be up then connect and return qmp instance
@contextmanager @contextmanager
@@ -160,22 +164,23 @@ class QemuVm:
@contextmanager @contextmanager
def qga_connect(self, timeout_sec: float = 100) -> Iterator[QgaSession]: def qga_connect(self, timeout_sec: float = 100) -> Iterator[QgaSession]:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) start_time = time.time()
try: # try to reconnect a couple of times if connection refused
# try to reconnect a couple of times if connection refused session = None
socket_file = os.path.realpath(self.qga_socket_file) while time.time() - start_time < timeout_sec:
start_time = time.time() try:
while time.time() - start_time < timeout_sec: session = QgaSession(str(self.qga_socket_file))
try: break
sock.connect(str(socket_file)) except ConnectionRefusedError:
except ConnectionRefusedError: time.sleep(0.1)
time.sleep(0.1) continue
else: if session is None:
break msg = (
sock.connect(str(socket_file)) f"Timeout after {timeout_sec} seconds. Could not connect to QGA socket"
yield QgaSession(sock) )
finally: raise ClanError(msg)
sock.close() with session:
yield session
def wait_up(self, timeout_sec: float = 60) -> None: def wait_up(self, timeout_sec: float = 60) -> None:
start_time = time.time() start_time = time.time()
@@ -201,6 +206,7 @@ def spawn_vm(
portmap: list[tuple[int, int]] | None = None, portmap: list[tuple[int, int]] | None = None,
stdout: int | None = None, stdout: int | None = None,
stderr: int | None = None, stderr: int | None = None,
stdin: int | None = None,
) -> Iterator[QemuVm]: ) -> Iterator[QemuVm]:
if portmap is None: if portmap is None:
portmap = [] portmap = []
@@ -268,6 +274,7 @@ def spawn_vm(
qmp_socket_file=qmp_socket_file, qmp_socket_file=qmp_socket_file,
qga_socket_file=qga_socket_file, qga_socket_file=qga_socket_file,
portmap=portmap, portmap=portmap,
interactive=stdin is None,
) )
packages = ["nixpkgs#qemu"] packages = ["nixpkgs#qemu"]
@@ -287,7 +294,7 @@ def spawn_vm(
qemu_cmd.args, packages, extra_env, stdout=stdout, stderr=stderr qemu_cmd.args, packages, extra_env, stdout=stdout, stderr=stderr
) as process, ) as process,
): ):
qemu_vm = QemuVm(machine, process) qemu_vm = QemuVm(machine, process, socketdir)
qemu_vm.wait_up() qemu_vm.wait_up()
try: try:
@@ -302,41 +309,62 @@ def spawn_vm(
# TODO: add a timeout here instead of waiting indefinitely # TODO: add a timeout here instead of waiting indefinitely
@dataclass
class RuntimeConfig:
cachedir: Path | None = None
socketdir: Path | None = None
nix_options: list[str] | None = None
portmap: list[tuple[int, int]] | None = None
command: list[str] | None = None
no_block: bool = False
def run_vm( def run_vm(
vm_config: VmConfig, vm_config: VmConfig,
*, runtime_config: RuntimeConfig,
cachedir: Path | None = None, ) -> CmdOut:
socketdir: Path | None = None, stdin = None
nix_options: list[str] | None = None, # if command is not None:
portmap: list[tuple[int, int]] | None = None, # stdin = subprocess.DEVNULL
) -> None: stdin = subprocess.DEVNULL
with spawn_vm( with (
vm_config, spawn_vm(
cachedir=cachedir, vm_config,
socketdir=socketdir, cachedir=runtime_config.cachedir,
nix_options=nix_options, socketdir=runtime_config.socketdir,
portmap=portmap, nix_options=runtime_config.nix_options,
stdout=subprocess.PIPE, portmap=runtime_config.portmap,
stderr=subprocess.PIPE, stdout=subprocess.PIPE,
) as vm: stderr=subprocess.PIPE,
stdout_buf, stderr_buf = handle_output(vm.process, Log.BOTH) stdin=stdin,
) as vm,
ThreadPoolExecutor(max_workers=1) as executor,
):
future = executor.submit(handle_output, vm.process, Log.BOTH)
args: list[str] = vm.process.args # type: ignore[assignment] args: list[str] = vm.process.args # type: ignore[assignment]
if runtime_config.command is not None:
with vm.qga_connect() as qga:
if runtime_config.no_block:
qga.run_nonblocking(runtime_config.command)
else:
qga.run(runtime_config.command)
stdout_buf, stderr_buf = future.result()
rc = vm.wait_down()
cmd_out = CmdOut( cmd_out = CmdOut(
stdout=stdout_buf, stdout=stdout_buf,
stderr=stderr_buf, stderr=stderr_buf,
cwd=Path.cwd(), cwd=Path.cwd(),
command_list=args, command_list=args,
returncode=vm.process.returncode, returncode=vm.process.returncode,
msg=f"Could not start vm {vm_config.machine_name}", msg=f"VM {vm_config.machine_name} exited with code {rc}",
env={}, env={},
) )
if vm.process.returncode != 0:
raise ClanCmdError(cmd_out)
rc = vm.wait_down()
if rc != 0: if rc != 0:
msg = f"VM exited with code {rc}" raise ClanCmdError(cmd_out)
raise ClanError(msg) return cmd_out
def run_command( def run_command(
@@ -348,7 +376,14 @@ def run_command(
portmap = [p.split(":") for p in args.publish] portmap = [p.split(":") for p in args.publish]
run_vm(vm, nix_options=args.option, portmap=portmap) runtime_config = RuntimeConfig(
nix_options=args.option,
portmap=portmap,
command=args.command,
no_block=args.no_block,
)
run_vm(vm, runtime_config)
def register_run_parser(parser: argparse.ArgumentParser) -> None: def register_run_parser(parser: argparse.ArgumentParser) -> None:
@@ -365,4 +400,15 @@ def register_run_parser(parser: argparse.ArgumentParser) -> None:
default=[], default=[],
help="Forward ports from host to guest", help="Forward ports from host to guest",
) )
parser.add_argument(
"--no-block",
action="store_true",
help="Do no block when running the command",
default=False,
)
parser.add_argument(
"command",
nargs=argparse.REMAINDER,
help="command to run in the vm",
)
parser.set_defaults(func=lambda args: run_command(args)) parser.set_defaults(func=lambda args: run_command(args))

View File

@@ -108,22 +108,22 @@ def test_vm_deployment(
qga_m2 = stack.enter_context(vm2.qga_connect()) qga_m2 = stack.enter_context(vm2.qga_connect())
# check my_secret is deployed # check my_secret is deployed
_, out, _ = qga_m1.run( _, out, _ = qga_m1.run(
"cat /run/secrets/vars/m1_generator/my_secret", check=True ["cat", "/run/secrets/vars/m1_generator/my_secret"], check=True
) )
assert out == "hello\n" assert out == "hello\n"
# check shared_secret is deployed on m1 # check shared_secret is deployed on m1
_, out, _ = qga_m1.run( _, out, _ = qga_m1.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True
) )
assert out == "hello\n" assert out == "hello\n"
# check shared_secret is deployed on m2 # check shared_secret is deployed on m2
_, out, _ = qga_m2.run( _, out, _ = qga_m2.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True
) )
assert out == "hello\n" assert out == "hello\n"
# check no_deploy_secret is not deployed # check no_deploy_secret is not deployed
returncode, out, _ = qga_m1.run( returncode, out, _ = qga_m1.run(
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", ["test", "-e", "/run/secrets/vars/my_shared_generator/no_deploy_secret"],
check=False, check=False,
) )
assert returncode != 0 assert returncode != 0

View File

@@ -53,7 +53,7 @@ def test_run(
"user1", "user1",
] ]
) )
cli.run(["vms", "run", "vm1"]) cli.run(["vms", "run", "--no-block", "vm1", "shutdown", "-h", "now"])
@pytest.mark.skipif(no_kvm, reason="Requires KVM") @pytest.mark.skipif(no_kvm, reason="Requires KVM")
@@ -90,34 +90,38 @@ def test_vm_persistence(
with spawn_vm(vm_config) as vm, vm.qga_connect() as qga: with spawn_vm(vm_config) as vm, vm.qga_connect() as qga:
# create state via qmp command instead of systemd service # create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True) qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/root"], check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True) qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/test"], check=True)
qga.run("chown test /var/my-state/test", check=True) qga.run(["sh", "-c", "chown test /var/my-state/test"], check=True)
qga.run("chown test /var/user-state", check=True) qga.run(["sh", "-c", "chown test /var/user-state"], check=True)
qga.run("touch /var/my-state/rebooting", check=True) qga.run(["sh", "-c", "touch /var/my-state/rebooting"], check=True)
## start vm again ## start vm again
with spawn_vm(vm_config) as vm, vm.qga_connect() as qga: with spawn_vm(vm_config) as vm, vm.qga_connect() as qga:
# check state exists # check state exists
qga.run("cat /var/my-state/test", check=True) qga.run(["cat", "/var/my-state/test"], check=True)
# ensure root file is owned by root # ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True) qga.run(["stat", "-c", "%U", "/var/my-state/root"], check=True)
# ensure test file is owned by test # ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True) qga.run(["stat", "-c", "%U", "/var/my-state/test"], check=True)
# ensure /var/user-state is owned by test # ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True) qga.run(["stat", "-c", "%U", "/var/user-state"], check=True)
# ensure that the file created by the service is still there and has the expected content # ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test") exitcode, out, err = qga.run(["cat", "/var/my-state/test"])
assert exitcode == 0, err assert exitcode == 0, err
assert out == "dream2nix\n", out assert out == "dream2nix\n", out
# check for errors # check for errors
exitcode, out, err = qga.run("cat /var/my-state/error") exitcode, out, err = qga.run(["cat", "/var/my-state/error"])
assert exitcode == 1, out assert exitcode == 1, out
# check all systemd services are OK, or print details # check all systemd services are OK, or print details
exitcode, out, err = qga.run( exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )" [
"sh",
"-c",
"systemctl --failed | tee /tmp/log | grep -q '0 loaded units listed' || ( cat /tmp/log && false )",
]
) )
assert exitcode == 0, out assert exitcode == 0, out