From 1ed04fb51ef784ed41caa9b1340c2fc9ac402fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Tue, 8 Oct 2024 19:11:35 +0200 Subject: [PATCH] vms/run: better defaults for run api --- pkgs/clan-cli/clan_cli/qemu/qga.py | 38 ++++++++++++++------- pkgs/clan-cli/clan_cli/vms/run.py | 7 ++-- pkgs/clan-cli/tests/test_vars_deployment.py | 22 ++++++------ pkgs/clan-cli/tests/test_vms_cli.py | 34 +++++++++--------- 4 files changed, 54 insertions(+), 47 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/qemu/qga.py b/pkgs/clan-cli/clan_cli/qemu/qga.py index 7a9ccaa97..76f3905c6 100644 --- a/pkgs/clan-cli/clan_cli/qemu/qga.py +++ b/pkgs/clan-cli/clan_cli/qemu/qga.py @@ -1,11 +1,19 @@ import base64 import time import types +from dataclasses import dataclass from clan_cli.errors import ClanError from clan_cli.qemu.qmp import QEMUMonitorProtocol +@dataclass +class VmCommandResult: + returncode: int + stdout: str | None + stderr: str | None + + # qga is almost like qmp, but not quite, because: # - server doesn't send initial message # - no need to initialize by asking for capabilities @@ -35,10 +43,17 @@ class QgaSession: if result_pid is None: msg = "Could not get PID from QGA" raise ClanError(msg) - return result_pid["return"]["pid"] + try: + return result_pid["return"]["pid"] + except KeyError as e: + if "error" in result_pid: + msg = f"Could not run command: {result_pid['error']['desc']}" + raise ClanError(msg) from e + msg = f"PID could not be found: {result_pid}" + raise ClanError(msg) from e # run, wait for result, return exitcode and output - def run(self, cmd: list[str], check: bool = False) -> tuple[int, str, str]: + def run(self, cmd: list[str], check: bool = True) -> VmCommandResult: pid = self.run_nonblocking(cmd) # loop until exited=true while True: @@ -54,17 +69,14 @@ class QgaSession: time.sleep(0.1) exitcode = result["return"]["exitcode"] - stdout = ( - "" - if "out-data" not in result["return"] - else base64.b64decode(result["return"]["out-data"]).decode("utf-8") - ) - stderr = ( - "" - if "err-data" not in result["return"] - else base64.b64decode(result["return"]["err-data"]).decode("utf-8") - ) + err_data = result["return"].get("err-data") + stdout = None + stderr = None + if out_data := result["return"].get("out-data"): + stdout = base64.b64decode(out_data).decode("utf-8") + if err_data is not None: + stderr = base64.b64decode(err_data).decode("utf-8") if check and exitcode != 0: msg = f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}" raise ClanError(msg) - return exitcode, stdout, stderr + return VmCommandResult(exitcode, stdout, stderr) diff --git a/pkgs/clan-cli/clan_cli/vms/run.py b/pkgs/clan-cli/clan_cli/vms/run.py index fce9cb920..4937f7e3b 100644 --- a/pkgs/clan-cli/clan_cli/vms/run.py +++ b/pkgs/clan-cli/clan_cli/vms/run.py @@ -5,10 +5,10 @@ import logging import os import subprocess import time -from dataclasses import dataclass from collections.abc import Iterator from concurrent.futures import ThreadPoolExecutor from contextlib import ExitStack, contextmanager +from dataclasses import dataclass from pathlib import Path from tempfile import TemporaryDirectory @@ -324,9 +324,8 @@ def run_vm( runtime_config: RuntimeConfig, ) -> CmdOut: stdin = None - # if command is not None: - # stdin = subprocess.DEVNULL - stdin = subprocess.DEVNULL + if runtime_config.command is not None: + stdin = subprocess.DEVNULL with ( spawn_vm( vm_config, diff --git a/pkgs/clan-cli/tests/test_vars_deployment.py b/pkgs/clan-cli/tests/test_vars_deployment.py index a347f7c89..133a837d3 100644 --- a/pkgs/clan-cli/tests/test_vars_deployment.py +++ b/pkgs/clan-cli/tests/test_vars_deployment.py @@ -107,23 +107,21 @@ def test_vm_deployment( qga_m1 = stack.enter_context(vm1.qga_connect()) qga_m2 = stack.enter_context(vm2.qga_connect()) # check my_secret is deployed - _, out, _ = qga_m1.run( - ["cat", "/run/secrets/vars/m1_generator/my_secret"], check=True - ) - assert out == "hello\n" + result = qga_m1.run(["cat", "/run/secrets/vars/m1_generator/my_secret"]) + assert result.stdout == "hello\n" # check shared_secret is deployed on m1 - _, out, _ = qga_m1.run( - ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True + result = qga_m1.run( + ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"] ) - assert out == "hello\n" + assert result.stdout == "hello\n" # check shared_secret is deployed on m2 - _, out, _ = qga_m2.run( - ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True + result = qga_m2.run( + ["cat", "/run/secrets/vars/my_shared_generator/shared_secret"] ) - assert out == "hello\n" + assert result.stdout == "hello\n" # check no_deploy_secret is not deployed - returncode, out, _ = qga_m1.run( + result = qga_m1.run( ["test", "-e", "/run/secrets/vars/my_shared_generator/no_deploy_secret"], check=False, ) - assert returncode != 0 + assert result.returncode != 0 diff --git a/pkgs/clan-cli/tests/test_vms_cli.py b/pkgs/clan-cli/tests/test_vms_cli.py index eb3ca3469..7957add0a 100644 --- a/pkgs/clan-cli/tests/test_vms_cli.py +++ b/pkgs/clan-cli/tests/test_vms_cli.py @@ -90,38 +90,36 @@ def test_vm_persistence( with spawn_vm(vm_config) as vm, vm.qga_connect() as qga: # create state via qmp command instead of systemd service - qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/root"], check=True) - qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/test"], check=True) - qga.run(["sh", "-c", "chown test /var/my-state/test"], check=True) - qga.run(["sh", "-c", "chown test /var/user-state"], check=True) - qga.run(["sh", "-c", "touch /var/my-state/rebooting"], check=True) + qga.run(["/bin/sh", "-c", "echo 'dream2nix' > /var/my-state/root"]) + qga.run(["/bin/sh", "-c", "echo 'dream2nix' > /var/my-state/test"]) + qga.run(["/bin/sh", "-c", "chown test /var/my-state/test"]) + qga.run(["/bin/sh", "-c", "chown test /var/user-state"]) + qga.run_nonblocking(["shutdown", "-h", "now"]) ## start vm again with spawn_vm(vm_config) as vm, vm.qga_connect() as qga: # check state exists - qga.run(["cat", "/var/my-state/test"], check=True) + qga.run(["cat", "/var/my-state/test"]) # ensure root file is owned by root - qga.run(["stat", "-c", "%U", "/var/my-state/root"], check=True) + qga.run(["stat", "-c", "%U", "/var/my-state/root"]) # ensure test file is owned by test - qga.run(["stat", "-c", "%U", "/var/my-state/test"], check=True) + qga.run(["stat", "-c", "%U", "/var/my-state/test"]) # ensure /var/user-state is owned by test - qga.run(["stat", "-c", "%U", "/var/user-state"], check=True) + qga.run(["stat", "-c", "%U", "/var/user-state"]) # ensure that the file created by the service is still there and has the expected content - exitcode, out, err = qga.run(["cat", "/var/my-state/test"]) - assert exitcode == 0, err - assert out == "dream2nix\n", out + result = qga.run(["cat", "/var/my-state/test"]) + assert result.stdout == "dream2nix\n", result.stdout # check for errors - exitcode, out, err = qga.run(["cat", "/var/my-state/error"]) - assert exitcode == 1, out + result = qga.run(["cat", "/var/my-state/error"], check=False) + assert result.returncode == 1, result.stdout # check all systemd services are OK, or print details - exitcode, out, err = qga.run( + result = qga.run( [ - "sh", + "/bin/sh", "-c", "systemctl --failed | tee /tmp/log | grep -q '0 loaded units listed' || ( cat /tmp/log && false )", - ] + ], ) - assert exitcode == 0, out