From cf3c67d83079db862e9f3a7e12412c40d4333cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 8 Oct 2024 18:51:58 +0200 Subject: [PATCH] expose the option to run commands in virtual machines --- pkgs/clan-cli/clan_cli/qemu/qga.py | 68 +++++----- pkgs/clan-cli/clan_cli/vms/run.py | 138 +++++++++++++------- pkgs/clan-cli/tests/test_vars_deployment.py | 8 +- pkgs/clan-cli/tests/test_vms_cli.py | 30 +++-- 4 files changed, 145 insertions(+), 99 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/qemu/qga.py b/pkgs/clan-cli/clan_cli/qemu/qga.py index 0cae576a1..7a9ccaa97 100644 --- a/pkgs/clan-cli/clan_cli/qemu/qga.py +++ b/pkgs/clan-cli/clan_cli/qemu/qga.py @@ -1,9 +1,9 @@ import base64 -import json -import socket -from time import sleep +import time +import types from clan_cli.errors import ClanError +from clan_cli.qemu.qmp import QEMUMonitorProtocol # qga is almost like qmp, but not quite, because: @@ -11,51 +11,47 @@ from clan_cli.errors import ClanError # - no need to initialize by asking for capabilities # - results need to be base64 decoded class QgaSession: - def __init__(self, sock: socket.socket) -> None: - self.sock = sock + def __init__(self, address: str) -> None: + self.client = QEMUMonitorProtocol(address) + self.client.connect(negotiate=False) - def get_response(self) -> dict: - result = self.sock.recv(9999999) - return json.loads(result) + def __enter__(self) -> "QgaSession": + # Implement context manager enter function. + return self - # only execute, don't wait for response - def exec_cmd(self, cmd: str) -> None: - self.sock.send( - json.dumps( - { - "execute": "guest-exec", - "arguments": { - "path": "/bin/sh", - "arg": ["-l", "-c", cmd], - "capture-output": True, - }, - } - ).encode("utf-8") + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: types.TracebackType | None, + ) -> None: + # Implement context manager exit function. + self.client.close() + + def run_nonblocking(self, cmd: list[str]) -> int: + result_pid = self.client.cmd( + "guest-exec", {"path": cmd[0], "arg": cmd[1:], "capture-output": True} ) + if result_pid is None: + msg = "Could not get PID from QGA" + raise ClanError(msg) + return result_pid["return"]["pid"] # run, wait for result, return exitcode and output - def run(self, cmd: str, check: bool = False) -> tuple[int, str, str]: - self.exec_cmd(cmd) - result_pid = self.get_response() - pid = result_pid["return"]["pid"] + def run(self, cmd: list[str], check: bool = False) -> tuple[int, str, str]: + pid = self.run_nonblocking(cmd) # loop until exited=true - status_payload = json.dumps( - { - "execute": "guest-exec-status", - "arguments": { - "pid": pid, - }, - } - ).encode("utf-8") while True: - self.sock.send(status_payload) - result = self.get_response() + result = self.client.cmd("guest-exec-status", {"pid": pid}) + if result is None: + msg = "Could not get status from QGA" + raise ClanError(msg) if "error" in result and result["error"]["desc"].startswith("PID"): msg = "PID could not be found" raise ClanError(msg) if result["return"]["exited"]: break - sleep(0.1) + time.sleep(0.1) exitcode = result["return"]["exitcode"] stdout = ( diff --git a/pkgs/clan-cli/clan_cli/vms/run.py b/pkgs/clan-cli/clan_cli/vms/run.py index dd44135f2..fce9cb920 100644 --- a/pkgs/clan-cli/clan_cli/vms/run.py +++ b/pkgs/clan-cli/clan_cli/vms/run.py @@ -3,10 +3,11 @@ import importlib import json import logging import os -import socket import subprocess import time +from dataclasses import dataclass from collections.abc import Iterator +from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack, contextmanager from pathlib import Path from tempfile import TemporaryDirectory @@ -121,11 +122,14 @@ def start_vm( extra_env: dict[str, str], stdout: int | None = None, stderr: int | None = None, + stdin: int | None = None, ) -> Iterator[subprocess.Popen]: env = os.environ.copy() env.update(extra_env) cmd = nix_shell(packages, args) - with subprocess.Popen(cmd, env=env, stdout=stdout, stderr=stderr) as process: + with subprocess.Popen( + cmd, env=env, stdout=stdout, stderr=stderr, stdin=stdin + ) as process: try: yield process finally: @@ -142,12 +146,12 @@ class QemuVm: self, machine: Machine, process: subprocess.Popen, + socketdir: Path, ) -> None: self.machine = machine self.process = process - self.state_dir = vm_state_dir(self.machine.flake, self.machine.name) - self.qmp_socket_file = self.state_dir / "qmp.sock" - self.qga_socket_file = self.state_dir / "qga.sock" + self.qmp_socket_file = socketdir / "qmp.sock" + self.qga_socket_file = socketdir / "qga.sock" # wait for vm to be up then connect and return qmp instance @contextmanager @@ -160,22 +164,23 @@ class QemuVm: @contextmanager def qga_connect(self, timeout_sec: float = 100) -> Iterator[QgaSession]: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - # try to reconnect a couple of times if connection refused - socket_file = os.path.realpath(self.qga_socket_file) - start_time = time.time() - while time.time() - start_time < timeout_sec: - try: - sock.connect(str(socket_file)) - except ConnectionRefusedError: - time.sleep(0.1) - else: - break - sock.connect(str(socket_file)) - yield QgaSession(sock) - finally: - sock.close() + start_time = time.time() + # try to reconnect a couple of times if connection refused + session = None + while time.time() - start_time < timeout_sec: + try: + session = QgaSession(str(self.qga_socket_file)) + break + except ConnectionRefusedError: + time.sleep(0.1) + continue + if session is None: + msg = ( + f"Timeout after {timeout_sec} seconds. Could not connect to QGA socket" + ) + raise ClanError(msg) + with session: + yield session def wait_up(self, timeout_sec: float = 60) -> None: start_time = time.time() @@ -201,6 +206,7 @@ def spawn_vm( portmap: list[tuple[int, int]] | None = None, stdout: int | None = None, stderr: int | None = None, + stdin: int | None = None, ) -> Iterator[QemuVm]: if portmap is None: portmap = [] @@ -268,6 +274,7 @@ def spawn_vm( qmp_socket_file=qmp_socket_file, qga_socket_file=qga_socket_file, portmap=portmap, + interactive=stdin is None, ) packages = ["nixpkgs#qemu"] @@ -287,7 +294,7 @@ def spawn_vm( qemu_cmd.args, packages, extra_env, stdout=stdout, stderr=stderr ) as process, ): - qemu_vm = QemuVm(machine, process) + qemu_vm = QemuVm(machine, process, socketdir) qemu_vm.wait_up() try: @@ -302,41 +309,62 @@ def spawn_vm( # TODO: add a timeout here instead of waiting indefinitely +@dataclass +class RuntimeConfig: + cachedir: Path | None = None + socketdir: Path | None = None + nix_options: list[str] | None = None + portmap: list[tuple[int, int]] | None = None + command: list[str] | None = None + no_block: bool = False + + def run_vm( vm_config: VmConfig, - *, - cachedir: Path | None = None, - socketdir: Path | None = None, - nix_options: list[str] | None = None, - portmap: list[tuple[int, int]] | None = None, -) -> None: - with spawn_vm( - vm_config, - cachedir=cachedir, - socketdir=socketdir, - nix_options=nix_options, - portmap=portmap, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) as vm: - stdout_buf, stderr_buf = handle_output(vm.process, Log.BOTH) + runtime_config: RuntimeConfig, +) -> CmdOut: + stdin = None + # if command is not None: + # stdin = subprocess.DEVNULL + stdin = subprocess.DEVNULL + with ( + spawn_vm( + vm_config, + cachedir=runtime_config.cachedir, + socketdir=runtime_config.socketdir, + nix_options=runtime_config.nix_options, + portmap=runtime_config.portmap, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=stdin, + ) as vm, + ThreadPoolExecutor(max_workers=1) as executor, + ): + future = executor.submit(handle_output, vm.process, Log.BOTH) args: list[str] = vm.process.args # type: ignore[assignment] + + if runtime_config.command is not None: + with vm.qga_connect() as qga: + if runtime_config.no_block: + qga.run_nonblocking(runtime_config.command) + else: + qga.run(runtime_config.command) + + stdout_buf, stderr_buf = future.result() + + rc = vm.wait_down() cmd_out = CmdOut( stdout=stdout_buf, stderr=stderr_buf, cwd=Path.cwd(), command_list=args, returncode=vm.process.returncode, - msg=f"Could not start vm {vm_config.machine_name}", + msg=f"VM {vm_config.machine_name} exited with code {rc}", env={}, ) - - if vm.process.returncode != 0: - raise ClanCmdError(cmd_out) - rc = vm.wait_down() if rc != 0: - msg = f"VM exited with code {rc}" - raise ClanError(msg) + raise ClanCmdError(cmd_out) + return cmd_out def run_command( @@ -348,7 +376,14 @@ def run_command( portmap = [p.split(":") for p in args.publish] - run_vm(vm, nix_options=args.option, portmap=portmap) + runtime_config = RuntimeConfig( + nix_options=args.option, + portmap=portmap, + command=args.command, + no_block=args.no_block, + ) + + run_vm(vm, runtime_config) def register_run_parser(parser: argparse.ArgumentParser) -> None: @@ -365,4 +400,15 @@ def register_run_parser(parser: argparse.ArgumentParser) -> None: default=[], help="Forward ports from host to guest", ) + parser.add_argument( + "--no-block", + action="store_true", + help="Do no block when running the command", + default=False, + ) + parser.add_argument( + "command", + nargs=argparse.REMAINDER, + help="command to run in the vm", + ) parser.set_defaults(func=lambda args: run_command(args)) diff --git a/pkgs/clan-cli/tests/test_vars_deployment.py b/pkgs/clan-cli/tests/test_vars_deployment.py index d9d127497..a347f7c89 100644 --- a/pkgs/clan-cli/tests/test_vars_deployment.py +++ b/pkgs/clan-cli/tests/test_vars_deployment.py @@ -108,22 +108,22 @@ def test_vm_deployment( qga_m2 = stack.enter_context(vm2.qga_connect()) # check my_secret is deployed _, out, _ = qga_m1.run( - "cat /run/secrets/vars/m1_generator/my_secret", check=True + ["cat", "/run/secrets/vars/m1_generator/my_secret"], check=True ) assert out == "hello\n" # check shared_secret is deployed on m1 _, out, _ = qga_m1.run( - "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True + ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True ) assert out == "hello\n" # check shared_secret is deployed on m2 _, out, _ = qga_m2.run( - "cat /run/secrets/vars/my_shared_generator/shared_secret", check=True + ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True ) assert out == "hello\n" # check no_deploy_secret is not deployed returncode, out, _ = qga_m1.run( - "test -e /run/secrets/vars/my_shared_generator/no_deploy_secret", + ["test", "-e", "/run/secrets/vars/my_shared_generator/no_deploy_secret"], check=False, ) assert returncode != 0 diff --git a/pkgs/clan-cli/tests/test_vms_cli.py b/pkgs/clan-cli/tests/test_vms_cli.py index 115b7bef6..eb3ca3469 100644 --- a/pkgs/clan-cli/tests/test_vms_cli.py +++ b/pkgs/clan-cli/tests/test_vms_cli.py @@ -53,7 +53,7 @@ def test_run( "user1", ] ) - cli.run(["vms", "run", "vm1"]) + cli.run(["vms", "run", "--no-block", "vm1", "shutdown", "-h", "now"]) @pytest.mark.skipif(no_kvm, reason="Requires KVM") @@ -90,34 +90,38 @@ def test_vm_persistence( with spawn_vm(vm_config) as vm, vm.qga_connect() as qga: # create state via qmp command instead of systemd service - qga.run("echo 'dream2nix' > /var/my-state/root", check=True) - qga.run("echo 'dream2nix' > /var/my-state/test", check=True) - qga.run("chown test /var/my-state/test", check=True) - qga.run("chown test /var/user-state", check=True) - qga.run("touch /var/my-state/rebooting", check=True) + qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/root"], check=True) + qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/test"], check=True) + qga.run(["sh", "-c", "chown test /var/my-state/test"], check=True) + qga.run(["sh", "-c", "chown test /var/user-state"], check=True) + qga.run(["sh", "-c", "touch /var/my-state/rebooting"], check=True) ## start vm again with spawn_vm(vm_config) as vm, vm.qga_connect() as qga: # check state exists - qga.run("cat /var/my-state/test", check=True) + qga.run(["cat", "/var/my-state/test"], check=True) # ensure root file is owned by root - qga.run("stat -c '%U' /var/my-state/root", check=True) + qga.run(["stat", "-c", "%U", "/var/my-state/root"], check=True) # ensure test file is owned by test - qga.run("stat -c '%U' /var/my-state/test", check=True) + qga.run(["stat", "-c", "%U", "/var/my-state/test"], check=True) # ensure /var/user-state is owned by test - qga.run("stat -c '%U' /var/user-state", check=True) + qga.run(["stat", "-c", "%U", "/var/user-state"], check=True) # ensure that the file created by the service is still there and has the expected content - exitcode, out, err = qga.run("cat /var/my-state/test") + exitcode, out, err = qga.run(["cat", "/var/my-state/test"]) assert exitcode == 0, err assert out == "dream2nix\n", out # check for errors - exitcode, out, err = qga.run("cat /var/my-state/error") + exitcode, out, err = qga.run(["cat", "/var/my-state/error"]) assert exitcode == 1, out # check all systemd services are OK, or print details exitcode, out, err = qga.run( - "systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )" + [ + "sh", + "-c", + "systemctl --failed | tee /tmp/log | grep -q '0 loaded units listed' || ( cat /tmp/log && false )", + ] ) assert exitcode == 0, out