vms/run: better defaults for run api
This commit is contained in:
@@ -1,11 +1,19 @@
|
||||
import base64
|
||||
import time
|
||||
import types
|
||||
from dataclasses import dataclass
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.qemu.qmp import QEMUMonitorProtocol
|
||||
|
||||
|
||||
@dataclass
|
||||
class VmCommandResult:
|
||||
returncode: int
|
||||
stdout: str | None
|
||||
stderr: str | None
|
||||
|
||||
|
||||
# qga is almost like qmp, but not quite, because:
|
||||
# - server doesn't send initial message
|
||||
# - no need to initialize by asking for capabilities
|
||||
@@ -35,10 +43,17 @@ class QgaSession:
|
||||
if result_pid is None:
|
||||
msg = "Could not get PID from QGA"
|
||||
raise ClanError(msg)
|
||||
try:
|
||||
return result_pid["return"]["pid"]
|
||||
except KeyError as e:
|
||||
if "error" in result_pid:
|
||||
msg = f"Could not run command: {result_pid['error']['desc']}"
|
||||
raise ClanError(msg) from e
|
||||
msg = f"PID could not be found: {result_pid}"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
# run, wait for result, return exitcode and output
|
||||
def run(self, cmd: list[str], check: bool = False) -> tuple[int, str, str]:
|
||||
def run(self, cmd: list[str], check: bool = True) -> VmCommandResult:
|
||||
pid = self.run_nonblocking(cmd)
|
||||
# loop until exited=true
|
||||
while True:
|
||||
@@ -54,17 +69,14 @@ class QgaSession:
|
||||
time.sleep(0.1)
|
||||
|
||||
exitcode = result["return"]["exitcode"]
|
||||
stdout = (
|
||||
""
|
||||
if "out-data" not in result["return"]
|
||||
else base64.b64decode(result["return"]["out-data"]).decode("utf-8")
|
||||
)
|
||||
stderr = (
|
||||
""
|
||||
if "err-data" not in result["return"]
|
||||
else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
|
||||
)
|
||||
err_data = result["return"].get("err-data")
|
||||
stdout = None
|
||||
stderr = None
|
||||
if out_data := result["return"].get("out-data"):
|
||||
stdout = base64.b64decode(out_data).decode("utf-8")
|
||||
if err_data is not None:
|
||||
stderr = base64.b64decode(err_data).decode("utf-8")
|
||||
if check and exitcode != 0:
|
||||
msg = f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}"
|
||||
raise ClanError(msg)
|
||||
return exitcode, stdout, stderr
|
||||
return VmCommandResult(exitcode, stdout, stderr)
|
||||
|
||||
@@ -5,10 +5,10 @@ import logging
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from collections.abc import Iterator
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
@@ -324,8 +324,7 @@ def run_vm(
|
||||
runtime_config: RuntimeConfig,
|
||||
) -> CmdOut:
|
||||
stdin = None
|
||||
# if command is not None:
|
||||
# stdin = subprocess.DEVNULL
|
||||
if runtime_config.command is not None:
|
||||
stdin = subprocess.DEVNULL
|
||||
with (
|
||||
spawn_vm(
|
||||
|
||||
@@ -107,23 +107,21 @@ def test_vm_deployment(
|
||||
qga_m1 = stack.enter_context(vm1.qga_connect())
|
||||
qga_m2 = stack.enter_context(vm2.qga_connect())
|
||||
# check my_secret is deployed
|
||||
_, out, _ = qga_m1.run(
|
||||
["cat", "/run/secrets/vars/m1_generator/my_secret"], check=True
|
||||
)
|
||||
assert out == "hello\n"
|
||||
result = qga_m1.run(["cat", "/run/secrets/vars/m1_generator/my_secret"])
|
||||
assert result.stdout == "hello\n"
|
||||
# check shared_secret is deployed on m1
|
||||
_, out, _ = qga_m1.run(
|
||||
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True
|
||||
result = qga_m1.run(
|
||||
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"]
|
||||
)
|
||||
assert out == "hello\n"
|
||||
assert result.stdout == "hello\n"
|
||||
# check shared_secret is deployed on m2
|
||||
_, out, _ = qga_m2.run(
|
||||
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"], check=True
|
||||
result = qga_m2.run(
|
||||
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"]
|
||||
)
|
||||
assert out == "hello\n"
|
||||
assert result.stdout == "hello\n"
|
||||
# check no_deploy_secret is not deployed
|
||||
returncode, out, _ = qga_m1.run(
|
||||
result = qga_m1.run(
|
||||
["test", "-e", "/run/secrets/vars/my_shared_generator/no_deploy_secret"],
|
||||
check=False,
|
||||
)
|
||||
assert returncode != 0
|
||||
assert result.returncode != 0
|
||||
|
||||
@@ -90,38 +90,36 @@ def test_vm_persistence(
|
||||
|
||||
with spawn_vm(vm_config) as vm, vm.qga_connect() as qga:
|
||||
# create state via qmp command instead of systemd service
|
||||
qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/root"], check=True)
|
||||
qga.run(["sh", "-c", "echo 'dream2nix' > /var/my-state/test"], check=True)
|
||||
qga.run(["sh", "-c", "chown test /var/my-state/test"], check=True)
|
||||
qga.run(["sh", "-c", "chown test /var/user-state"], check=True)
|
||||
qga.run(["sh", "-c", "touch /var/my-state/rebooting"], check=True)
|
||||
qga.run(["/bin/sh", "-c", "echo 'dream2nix' > /var/my-state/root"])
|
||||
qga.run(["/bin/sh", "-c", "echo 'dream2nix' > /var/my-state/test"])
|
||||
qga.run(["/bin/sh", "-c", "chown test /var/my-state/test"])
|
||||
qga.run(["/bin/sh", "-c", "chown test /var/user-state"])
|
||||
qga.run_nonblocking(["shutdown", "-h", "now"])
|
||||
|
||||
## start vm again
|
||||
with spawn_vm(vm_config) as vm, vm.qga_connect() as qga:
|
||||
# check state exists
|
||||
qga.run(["cat", "/var/my-state/test"], check=True)
|
||||
qga.run(["cat", "/var/my-state/test"])
|
||||
# ensure root file is owned by root
|
||||
qga.run(["stat", "-c", "%U", "/var/my-state/root"], check=True)
|
||||
qga.run(["stat", "-c", "%U", "/var/my-state/root"])
|
||||
# ensure test file is owned by test
|
||||
qga.run(["stat", "-c", "%U", "/var/my-state/test"], check=True)
|
||||
qga.run(["stat", "-c", "%U", "/var/my-state/test"])
|
||||
# ensure /var/user-state is owned by test
|
||||
qga.run(["stat", "-c", "%U", "/var/user-state"], check=True)
|
||||
qga.run(["stat", "-c", "%U", "/var/user-state"])
|
||||
|
||||
# ensure that the file created by the service is still there and has the expected content
|
||||
exitcode, out, err = qga.run(["cat", "/var/my-state/test"])
|
||||
assert exitcode == 0, err
|
||||
assert out == "dream2nix\n", out
|
||||
result = qga.run(["cat", "/var/my-state/test"])
|
||||
assert result.stdout == "dream2nix\n", result.stdout
|
||||
|
||||
# check for errors
|
||||
exitcode, out, err = qga.run(["cat", "/var/my-state/error"])
|
||||
assert exitcode == 1, out
|
||||
result = qga.run(["cat", "/var/my-state/error"], check=False)
|
||||
assert result.returncode == 1, result.stdout
|
||||
|
||||
# check all systemd services are OK, or print details
|
||||
exitcode, out, err = qga.run(
|
||||
result = qga.run(
|
||||
[
|
||||
"sh",
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"systemctl --failed | tee /tmp/log | grep -q '0 loaded units listed' || ( cat /tmp/log && false )",
|
||||
]
|
||||
],
|
||||
)
|
||||
assert exitcode == 0, out
|
||||
|
||||
Reference in New Issue
Block a user