Merge pull request 'Rework vm process handling' (#2193) from fix-warning into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/2193
This commit is contained in:
Mic92
2024-10-08 17:51:55 +00:00
25 changed files with 371 additions and 337 deletions

View File

@@ -36,6 +36,7 @@
type = "filesystem";
format = "vfat";
mountpoint = "/boot";
mountOptions = [ "umask=0077" ];
};
};
root = {

View File

@@ -1 +1 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIBIbwIVnLy+uoDZ6uK/OCc1QK46SIGeC3mVc85dqLYQw lass@ignavia
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIEOdlvNTJPxTpMjuuNytGEUAO8NUuvL2nm9dpWZULCR6 nixbld@turingmachine

View File

@@ -1 +1 @@
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIBIbwIVnLy+uoDZ6uK/OCc1QK46SIGeC3mVc85dqLYQw lass@ignavia
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAILns3iEVA7MaN+K8qVRFywVOjBZsGyfRuBl26nGL/tXe nixbld@turingmachine

View File

@@ -47,8 +47,8 @@ let
];
boot.initrd.systemd.emergencyAccess = true;
# sysusers would be faster because it doesn't need perl, but it cannot create normal users
systemd.sysusers.enable = true;
# userborn would be faster because it doesn't need perl, but it cannot create normal users
services.userborn.enable = true;
users.mutableUsers = false;
users.allowNoPasswordLogin = true;

View File

@@ -45,9 +45,7 @@
security.sudo.wheelNeedsPassword = false;
users.users.user = lib.mkIf (config.clan.services.waypipe.user == "user") {
# workaround sysusers
isSystemUser = true;
uid = 998;
uid = 1000;
group = "users";
initialPassword = "";
extraGroups = [

View File

@@ -24,7 +24,7 @@ from . import (
from .clan_uri import FlakeId
from .custom_logger import setup_logging
from .dirs import get_clan_flake_toplevel_or_env
from .errors import ClanCmdError, ClanError
from .errors import ClanError
from .facts import cli as facts
from .flash import cli as flash_cli
from .hyperlink import help_hyperlink
@@ -401,7 +401,7 @@ def main() -> None:
if len(sys.argv) == 1:
parser.print_help()
if getattr(args, "debug", False):
if debug := getattr(args, "debug", False):
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
log.debug("Debug log activated")
else:
@@ -413,21 +413,11 @@ def main() -> None:
try:
args.func(args)
except ClanError as e:
if isinstance(e, ClanCmdError):
msg = ""
if e.cmd.msg:
msg += f"{e.cmd.msg}: "
msg += f"command exited with code {e.cmd.returncode}: {e.cmd.command}"
log.error(msg) # noqa: TRY400
if debug:
log.exception("Exited with error")
else:
log.error("%s", e) # noqa: TRY400
sys.exit(1)
if not e.msg: # should not be empty, print stack trace
raise
msg = e.msg
if e.description:
msg += f": {e.description}"
log.error(msg) # noqa: TRY400
sys.exit(1)
except KeyboardInterrupt:
log.warning("Interrupted by user")
sys.exit(1)

View File

@@ -131,8 +131,6 @@ def run(
if input:
process.communicate(input)
else:
process.wait()
tend = datetime.datetime.now(tz=datetime.UTC)
global TIME_TABLE

View File

@@ -4,6 +4,8 @@ import sys
import urllib
from pathlib import Path
from clan_cli.clan_uri import FlakeId
from .errors import ClanError
log = logging.getLogger(__name__)
@@ -100,8 +102,8 @@ def user_history_file() -> Path:
return user_config_dir() / "clan" / "history"
def vm_state_dir(flake_url: str, vm_name: str) -> Path:
clan_key = clan_key_safe(flake_url)
def vm_state_dir(flake_url: FlakeId, vm_name: str) -> Path:
clan_key = clan_key_safe(str(flake_url))
return user_data_dir() / "clan" / "vmstate" / clan_key / vm_name

View File

@@ -218,6 +218,9 @@ def generate_facts(
def generate_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
if len(args.machines) == 0:
machines = get_all_machines(args.flake, args.option)
else:

View File

@@ -14,7 +14,7 @@ class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
self.dir = vm_state_dir(str(machine.flake), machine.name) / "facts"
self.dir = vm_state_dir(machine.flake, machine.name) / "facts"
log.debug(f"FactStore initialized with dir {self.dir}")
def exists(self, service: str, name: str) -> bool:

View File

@@ -10,7 +10,7 @@ from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.dir = vm_state_dir(str(machine.flake), machine.name) / "secrets"
self.dir = vm_state_dir(machine.flake, machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
def set(

View File

@@ -1,9 +1,17 @@
import base64
import json
import socket
from time import sleep
import time
import types
from dataclasses import dataclass
from clan_cli.errors import ClanError
from clan_cli.qemu.qmp import QEMUMonitorProtocol
@dataclass
class VmCommandResult:
returncode: int
stdout: str | None
stderr: str | None
# qga is almost like qmp, but not quite, because:
@@ -11,64 +19,64 @@ from clan_cli.errors import ClanError
# - no need to initialize by asking for capabilities
# - results need to be base64 decoded
class QgaSession:
def __init__(self, sock: socket.socket) -> None:
self.sock = sock
def __init__(self, address: str) -> None:
self.client = QEMUMonitorProtocol(address)
self.client.connect(negotiate=False)
def get_response(self) -> dict:
result = self.sock.recv(9999999)
return json.loads(result)
def __enter__(self) -> "QgaSession":
# Implement context manager enter function.
return self
# only execute, don't wait for response
def exec_cmd(self, cmd: str) -> None:
self.sock.send(
json.dumps(
{
"execute": "guest-exec",
"arguments": {
"path": "/bin/sh",
"arg": ["-l", "-c", cmd],
"capture-output": True,
},
}
).encode("utf-8")
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
# Implement context manager exit function.
self.client.close()
def run_nonblocking(self, cmd: list[str]) -> int:
result_pid = self.client.cmd(
"guest-exec", {"path": cmd[0], "arg": cmd[1:], "capture-output": True}
)
if result_pid is None:
msg = "Could not get PID from QGA"
raise ClanError(msg)
try:
return result_pid["return"]["pid"]
except KeyError as e:
if "error" in result_pid:
msg = f"Could not run command: {result_pid['error']['desc']}"
raise ClanError(msg) from e
msg = f"PID could not be found: {result_pid}"
raise ClanError(msg) from e
# run, wait for result, return exitcode and output
def run(self, cmd: str, check: bool = False) -> tuple[int, str, str]:
self.exec_cmd(cmd)
result_pid = self.get_response()
pid = result_pid["return"]["pid"]
def run(self, cmd: list[str], check: bool = True) -> VmCommandResult:
pid = self.run_nonblocking(cmd)
# loop until exited=true
status_payload = json.dumps(
{
"execute": "guest-exec-status",
"arguments": {
"pid": pid,
},
}
).encode("utf-8")
while True:
self.sock.send(status_payload)
result = self.get_response()
result = self.client.cmd("guest-exec-status", {"pid": pid})
if result is None:
msg = "Could not get status from QGA"
raise ClanError(msg)
if "error" in result and result["error"]["desc"].startswith("PID"):
msg = "PID could not be found"
raise ClanError(msg)
if result["return"]["exited"]:
break
sleep(0.1)
time.sleep(0.1)
exitcode = result["return"]["exitcode"]
stdout = (
""
if "out-data" not in result["return"]
else base64.b64decode(result["return"]["out-data"]).decode("utf-8")
)
stderr = (
""
if "err-data" not in result["return"]
else base64.b64decode(result["return"]["err-data"]).decode("utf-8")
)
err_data = result["return"].get("err-data")
stdout = None
stderr = None
if out_data := result["return"].get("out-data"):
stdout = base64.b64decode(out_data).decode("utf-8")
if err_data is not None:
stderr = base64.b64decode(err_data).decode("utf-8")
if check and exitcode != 0:
msg = f"Command on guest failed\nCommand: {cmd}\nExitcode {exitcode}\nStdout: {stdout}\nStderr: {stderr}"
raise ClanError(msg)
return exitcode, stdout, stderr
return VmCommandResult(exitcode, stdout, stderr)

View File

@@ -367,6 +367,9 @@ def generate_vars(
def generate_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
if len(args.machines) == 0:
machines = get_all_machines(args.flake, args.option)
else:

View File

@@ -14,7 +14,7 @@ class FactStore(FactStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.works_remotely = False
self.dir = vm_state_dir(str(machine.flake), machine.name) / "facts"
self.dir = vm_state_dir(machine.flake, machine.name) / "facts"
log.debug(f"FactStore initialized with dir {self.dir}")
@property

View File

@@ -10,7 +10,7 @@ from . import SecretStoreBase
class SecretStore(SecretStoreBase):
def __init__(self, machine: Machine) -> None:
self.machine = machine
self.dir = vm_state_dir(str(machine.flake), machine.name) / "secrets"
self.dir = vm_state_dir(machine.flake, machine.name) / "secrets"
self.dir.mkdir(parents=True, exist_ok=True)
@property

View File

@@ -17,14 +17,17 @@ class WaypipeConfig:
@dataclass
class VmConfig:
machine_name: str
machine_icon: Path
machine_description: str
flake_url: FlakeId
clan_name: str
cores: int
memory_size: int
graphics: bool
# FIXME: I don't think this belongs here.
clan_name: str
machine_description: str | None
machine_icon: Path | None
waypipe: bool = False
def __post_init__(self) -> None:

View File

@@ -92,10 +92,11 @@ def qemu_command(
virtiofsd_socket: Path,
qmp_socket_file: Path,
qga_socket_file: Path,
portmap: list[tuple[int, int]] | None = None,
portmap: dict[int, int] | None = None,
interactive: bool = False,
) -> QemuCommand:
if portmap is None:
portmap = []
portmap = {}
kernel_cmdline = [
(Path(nixos_config["toplevel"]) / "kernel-params").read_text(),
f'init={nixos_config["toplevel"]}/init',
@@ -104,7 +105,7 @@ def qemu_command(
]
if not vm.waypipe:
kernel_cmdline.append("console=tty0")
hostfwd = ",".join(f"hostfwd=tcp::{h}-:{g}" for h, g in portmap)
hostfwd = ",".join(f"hostfwd=tcp::{h}-:{g}" for h, g in portmap.items())
# fmt: off
command = [
"qemu-kvm",
@@ -137,12 +138,20 @@ def qemu_command(
"-chardev", f"socket,path={qga_socket_file},server=on,wait=off,id=qga0",
"-device", "virtio-serial",
"-device", "virtserialport,chardev=qga0,name=org.qemu.guest_agent.0",
"-serial", "null",
"-chardev", "stdio,mux=on,id=char0,signal=off",
"-mon", "chardev=char0,mode=readline",
"-device", "virtconsole,chardev=char0,nr=0",
] # fmt: on
if interactive:
command.extend([
"-serial", "null",
"-chardev", "stdio,mux=on,id=char0,signal=off",
"-mon", "chardev=char0,mode=readline",
"-device", "virtconsole,chardev=char0,nr=0",
])
else:
command.extend([
"-serial", "null",
"-chardev", "file,id=char0,path=/dev/stdout",
"-device", "virtconsole,chardev=char0,nr=0",
])
vsock_cid = None
if vm.graphics:

View File

@@ -3,17 +3,24 @@ import importlib
import json
import logging
import os
from contextlib import ExitStack
import subprocess
import time
from collections.abc import Iterator
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack, contextmanager
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from clan_cli.cmd import Log, run
from clan_cli.cmd import CmdOut, Log, handle_output, run
from clan_cli.completions import add_dynamic_completer, complete_machines
from clan_cli.dirs import module_root, user_cache_dir, vm_state_dir
from clan_cli.errors import ClanError
from clan_cli.errors import ClanCmdError, ClanError
from clan_cli.facts.generate import generate_facts
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from .inspect import VmConfig, inspect_vm
from .qemu import qemu_command
@@ -25,13 +32,14 @@ log = logging.getLogger(__name__)
def facts_to_nixos_config(facts: dict[str, dict[str, bytes]]) -> dict:
nixos_config: dict = {}
nixos_config["clanCore"] = {}
nixos_config["clanCore"]["secrets"] = {}
nixos_config["clan"] = {}
nixos_config["clan"]["core"] = {}
nixos_config["clan"]["core"]["secrets"] = {}
for service, service_facts in facts.items():
nixos_config["clanCore"]["secrets"][service] = {}
nixos_config["clanCore"]["secrets"][service]["facts"] = {}
nixos_config["clan"]["core"]["secrets"][service] = {}
nixos_config["clan"]["core"]["secrets"][service]["facts"] = {}
for fact, value in service_facts.items():
nixos_config["clanCore"]["secrets"][service]["facts"][fact] = {
nixos_config["clan"]["core"]["secrets"][service]["facts"][fact] = {
"value": value.decode()
}
return nixos_config
@@ -107,16 +115,101 @@ def prepare_disk(
return disk_img
def run_vm(
@contextmanager
def start_vm(
args: list[str],
packages: list[str],
extra_env: dict[str, str],
stdout: int | None = None,
stderr: int | None = None,
stdin: int | None = None,
) -> Iterator[subprocess.Popen]:
env = os.environ.copy()
env.update(extra_env)
cmd = nix_shell(packages, args)
with subprocess.Popen(
cmd, env=env, stdout=stdout, stderr=stderr, stdin=stdin
) as process:
try:
yield process
finally:
process.terminate()
try:
# Fix me: This should in future properly shutdown the VM using qmp
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
class QemuVm:
def __init__(
self,
machine: Machine,
process: subprocess.Popen,
socketdir: Path,
) -> None:
self.machine = machine
self.process = process
self.qmp_socket_file = socketdir / "qmp.sock"
self.qga_socket_file = socketdir / "qga.sock"
# wait for vm to be up then connect and return qmp instance
@contextmanager
def qmp_connect(self) -> Iterator[QEMUMonitorProtocol]:
with QEMUMonitorProtocol(
address=str(os.path.realpath(self.qmp_socket_file)),
) as qmp:
qmp.connect()
yield qmp
@contextmanager
def qga_connect(self, timeout_sec: float = 100) -> Iterator[QgaSession]:
start_time = time.time()
# try to reconnect a couple of times if connection refused
session = None
while time.time() - start_time < timeout_sec:
try:
session = QgaSession(str(self.qga_socket_file))
break
except ConnectionRefusedError:
time.sleep(0.1)
continue
if session is None:
msg = (
f"Timeout after {timeout_sec} seconds. Could not connect to QGA socket"
)
raise ClanError(msg)
with session:
yield session
def wait_up(self, timeout_sec: float = 60) -> None:
start_time = time.time()
while time.time() - start_time < timeout_sec:
if self.process.poll() is not None:
msg = "VM failed to start. Qemu process exited with code {self.process.returncode}"
raise ClanError(msg)
if self.qmp_socket_file.exists():
break
time.sleep(0.1)
def wait_down(self) -> int:
return self.process.wait()
@contextmanager
def spawn_vm(
vm: VmConfig,
*,
cachedir: Path | None = None,
socketdir: Path | None = None,
nix_options: list[str] | None = None,
portmap: list[tuple[int, int]] | None = None,
) -> None:
portmap: dict[int, int] | None = None,
stdout: int | None = None,
stderr: int | None = None,
stdin: int | None = None,
) -> Iterator[QemuVm]:
if portmap is None:
portmap = []
portmap = {}
if nix_options is None:
nix_options = []
with ExitStack() as stack:
@@ -141,7 +234,7 @@ def run_vm(
# TODO: We should get this from the vm argument
nixos_config = build_vm(machine, cachedir, nix_options)
state_dir = vm_state_dir(str(vm.flake_url), machine.name)
state_dir = vm_state_dir(vm.flake_url, machine.name)
state_dir.mkdir(parents=True, exist_ok=True)
# specify socket files for qmp and qga
@@ -181,28 +274,96 @@ def run_vm(
qmp_socket_file=qmp_socket_file,
qga_socket_file=qga_socket_file,
portmap=portmap,
interactive=stdin is None,
)
packages = ["nixpkgs#qemu"]
env = os.environ.copy()
extra_env = {}
if vm.graphics and not vm.waypipe:
packages.append("nixpkgs#virt-viewer")
remote_viewer_mimetypes = module_root() / "vms" / "mimetypes"
env["XDG_DATA_DIRS"] = (
f"{remote_viewer_mimetypes}:{env.get('XDG_DATA_DIRS', '')}"
extra_env["XDG_DATA_DIRS"] = (
f"{remote_viewer_mimetypes}:{os.environ.get('XDG_DATA_DIRS', '')}"
)
with (
start_waypipe(qemu_cmd.vsock_cid, f"[{vm.machine_name}] "),
start_virtiofsd(virtiofsd_socket),
start_vm(
qemu_cmd.args, packages, extra_env, stdout=stdout, stderr=stderr
) as process,
):
run(
nix_shell(packages, qemu_cmd.args),
env=env,
log=Log.BOTH,
error_msg=f"Could not start vm {machine}",
)
qemu_vm = QemuVm(machine, process, socketdir)
qemu_vm.wait_up()
try:
yield qemu_vm
finally:
try:
with qemu_vm.qmp_connect() as qmp:
qmp.command("system_powerdown")
qemu_vm.wait_down()
except OSError:
pass
# TODO: add a timeout here instead of waiting indefinitely
@dataclass
class RuntimeConfig:
cachedir: Path | None = None
socketdir: Path | None = None
nix_options: list[str] | None = None
portmap: dict[int, int] | None = None
command: list[str] | None = None
no_block: bool = False
def run_vm(
vm_config: VmConfig,
runtime_config: RuntimeConfig,
) -> CmdOut:
stdin = None
if runtime_config.command is not None:
stdin = subprocess.DEVNULL
with (
spawn_vm(
vm_config,
cachedir=runtime_config.cachedir,
socketdir=runtime_config.socketdir,
nix_options=runtime_config.nix_options,
portmap=runtime_config.portmap,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=stdin,
) as vm,
ThreadPoolExecutor(max_workers=1) as executor,
):
future = executor.submit(handle_output, vm.process, Log.BOTH)
args: list[str] = vm.process.args # type: ignore[assignment]
if runtime_config.command is not None:
with vm.qga_connect() as qga:
if runtime_config.no_block:
qga.run_nonblocking(runtime_config.command)
else:
qga.run(runtime_config.command)
stdout_buf, stderr_buf = future.result()
rc = vm.wait_down()
cmd_out = CmdOut(
stdout=stdout_buf,
stderr=stderr_buf,
cwd=Path.cwd(),
command_list=args,
returncode=vm.process.returncode,
msg=f"VM {vm_config.machine_name} exited with code {rc}",
env={},
)
if rc != 0:
raise ClanCmdError(cmd_out)
return cmd_out
def run_command(
@@ -212,9 +373,16 @@ def run_command(
vm: VmConfig = inspect_vm(machine=machine_obj)
portmap = [p.split(":") for p in args.publish]
portmap = dict(p.split(":") for p in args.publish)
run_vm(vm, nix_options=args.option, portmap=portmap)
runtime_config = RuntimeConfig(
nix_options=args.option,
portmap=portmap,
command=args.command,
no_block=args.no_block,
)
run_vm(vm, runtime_config)
def register_run_parser(parser: argparse.ArgumentParser) -> None:
@@ -231,4 +399,15 @@ def register_run_parser(parser: argparse.ArgumentParser) -> None:
default=[],
help="Forward ports from host to guest",
)
parser.add_argument(
"--no-block",
action="store_true",
help="Do no block when running the command",
default=False,
)
parser.add_argument(
"command",
nargs=argparse.REMAINDER,
help="command to run in the vm",
)
parser.set_defaults(func=lambda args: run_command(args))

View File

@@ -33,7 +33,7 @@ testpaths = "tests"
faulthandler_timeout = 60
log_level = "DEBUG"
log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s"
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --new-first -n auto" # Add --pdb for debugging
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --new-first -W error -n auto" # Add --pdb for debugging
norecursedirs = "tests/helpers"
markers = ["impure", "with_core"]
filterwarnings = "default::ResourceWarning"

View File

@@ -1,132 +0,0 @@
import contextlib
import os
import socket
import sys
import threading
import traceback
from collections.abc import Iterator
from pathlib import Path
from time import sleep
from clan_cli.dirs import vm_state_dir
from clan_cli.errors import ClanError
from clan_cli.qemu.qga import QgaSession
from clan_cli.qemu.qmp import QEMUMonitorProtocol
from . import cli
def find_free_port() -> int:
"""Find an unused localhost port from 1024-65535 and return it."""
with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return sock.getsockname()[1]
class VmThread(threading.Thread):
def __init__(self, machine_name: str, ssh_port: int | None = None) -> None:
super().__init__()
self.machine_name = machine_name
self.ssh_port = ssh_port
self.exception: Exception | None = None
self.daemon = True
def run(self) -> None:
try:
cli.run(
["vms", "run", self.machine_name, "--publish", f"{self.ssh_port}:22"]
)
except Exception as ex:
# print exception details
print(traceback.format_exc(), file=sys.stderr)
print(sys.exc_info()[2], file=sys.stderr)
self.exception = ex
def run_vm_in_thread(machine_name: str, ssh_port: int | None = None) -> VmThread:
# runs machine and prints exceptions
if ssh_port is None:
ssh_port = find_free_port()
vm_thread = VmThread(machine_name, ssh_port)
vm_thread.start()
return vm_thread
# wait for qmp socket to exist
def wait_vm_up(machine_name: str, vm: VmThread, flake_url: str | None = None) -> None:
if flake_url is None:
flake_url = str(Path.cwd())
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
timeout: float = 600
while True:
if vm.exception:
msg = "VM failed to start"
raise ClanError(msg) from vm.exception
if timeout <= 0:
msg = f"qmp socket {socket_file} not found. Is the VM running?"
raise TimeoutError(msg)
if socket_file.exists():
break
sleep(0.1)
timeout -= 0.1
# wait for vm to be down by checking if qmp socket is down
def wait_vm_down(machine_name: str, vm: VmThread, flake_url: str | None = None) -> None:
if flake_url is None:
flake_url = str(Path.cwd())
socket_file = vm_state_dir(flake_url, machine_name) / "qmp.sock"
timeout: float = 300
while socket_file.exists():
if vm.exception:
msg = "VM failed to start"
raise ClanError(msg) from vm.exception
if timeout <= 0:
msg = f"qmp socket {socket_file} still exists. Is the VM down?"
raise TimeoutError(msg)
sleep(0.1)
timeout -= 0.1
# wait for vm to be up then connect and return qmp instance
@contextlib.contextmanager
def qmp_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> Iterator[QEMUMonitorProtocol]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
with QEMUMonitorProtocol(
address=str(os.path.realpath(state_dir / "qmp.sock")),
) as qmp:
qmp.connect()
yield qmp
# wait for vm to be up then connect and return qga instance
@contextlib.contextmanager
def qga_connect(
machine_name: str, vm: VmThread, flake_url: str | None = None
) -> Iterator[QgaSession]:
if flake_url is None:
flake_url = str(Path.cwd())
state_dir = vm_state_dir(flake_url, machine_name)
wait_vm_up(machine_name, vm, flake_url)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
# try to reconnect a couple of times if connection refused
socket_file = os.path.realpath(state_dir / "qga.sock")
for _ in range(100):
try:
sock.connect(str(socket_file))
except ConnectionRefusedError:
sleep(0.1)
else:
break
sock.connect(str(socket_file))
yield QgaSession(sock)
finally:
sock.close()

View File

@@ -16,6 +16,7 @@
# (subdir / ".clan-flake").touch()
# assert _get_clan_flake_toplevel() == subdir
from clan_cli.clan_uri import FlakeId
from clan_cli.dirs import clan_key_safe, vm_state_dir
@@ -24,12 +25,12 @@ def test_clan_key_safe() -> None:
def test_vm_state_dir_identity() -> None:
dir1 = vm_state_dir("https://some.clan", "vm1")
dir2 = vm_state_dir("https://some.clan", "vm1")
dir1 = vm_state_dir(FlakeId("https://some.clan"), "vm1")
dir2 = vm_state_dir(FlakeId("https://some.clan"), "vm1")
assert str(dir1) == str(dir2)
def test_vm_state_dir_no_collision() -> None:
dir1 = vm_state_dir("/foo/bar", "vm1")
dir2 = vm_state_dir("https://some.clan", "vm1")
dir1 = vm_state_dir(FlakeId("/foo/bar"), "vm1")
dir2 = vm_state_dir(FlakeId("https://some.clan"), "vm1")
assert str(dir1) != str(dir2)

View File

@@ -24,16 +24,6 @@
clan.core.networking.zerotier.controller.enable = true;
networking.useDHCP = false;
systemd.services.shutdown-after-boot = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "multi-user.target" ];
script = ''
#!/usr/bin/env bash
shutdown -h now
'';
};
};
vm2 =
{ lib, ... }:

View File

@@ -1,14 +1,17 @@
import json
from contextlib import ExitStack
from pathlib import Path
import pytest
from age_keys import SopsSetup
from clan_cli import cmd
from clan_cli.clan_uri import FlakeId
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_eval, run
from clan_cli.vms.run import inspect_vm, spawn_vm
from fixtures_flakes import generate_flake
from helpers import cli
from helpers.nixos_config import nested_dict
from helpers.vms import qga_connect, run_vm_in_thread, wait_vm_down
from root import CLAN_CORE
@@ -61,9 +64,10 @@ def test_vm_deployment(
flake_template=CLAN_CORE / "templates" / "minimal",
machine_configs={"m1_machine": machine1_config, "m2_machine": machine2_config},
)
monkeypatch.chdir(flake.path)
sops_setup.init()
cli.run(["vars", "generate"])
sops_setup.init(flake.path)
cli.run(["vars", "generate", "--flake", str(flake.path)])
# check sops secrets not empty
for machine in ["m1_machine", "m2_machine"]:
sops_secrets = json.loads(
@@ -94,37 +98,31 @@ def test_vm_deployment(
).stdout.strip()
assert "no-such-path" not in shared_secret_path
# run nix flake lock
cmd.run(["nix", "flake", "lock"])
vm_m1 = run_vm_in_thread("m1_machine")
vm_m2 = run_vm_in_thread("m2_machine")
with (
qga_connect("m1_machine", vm_m1) as qga_m1,
qga_connect("m2_machine", vm_m2) as qga_m2,
):
cmd.run(["nix", "flake", "lock"], cwd=flake.path)
vm1_config = inspect_vm(machine=Machine("m1_machine", FlakeId(str(flake.path))))
vm2_config = inspect_vm(machine=Machine("m2_machine", FlakeId(str(flake.path))))
with ExitStack() as stack:
vm1 = stack.enter_context(spawn_vm(vm1_config))
vm2 = stack.enter_context(spawn_vm(vm2_config))
qga_m1 = stack.enter_context(vm1.qga_connect())
qga_m2 = stack.enter_context(vm2.qga_connect())
# check my_secret is deployed
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/m1_generator/my_secret", check=True
)
assert out == "hello\n"
result = qga_m1.run(["cat", "/run/secrets/vars/m1_generator/my_secret"])
assert result.stdout == "hello\n"
# check shared_secret is deployed on m1
_, out, _ = qga_m1.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
result = qga_m1.run(
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"]
)
assert out == "hello\n"
assert result.stdout == "hello\n"
# check shared_secret is deployed on m2
_, out, _ = qga_m2.run(
"cat /run/secrets/vars/my_shared_generator/shared_secret", check=True
result = qga_m2.run(
["cat", "/run/secrets/vars/my_shared_generator/shared_secret"]
)
assert out == "hello\n"
assert result.stdout == "hello\n"
# check no_deploy_secret is not deployed
returncode, out, _ = qga_m1.run(
"test -e /run/secrets/vars/my_shared_generator/no_deploy_secret",
result = qga_m1.run(
["test", "-e", "/run/secrets/vars/my_shared_generator/no_deploy_secret"],
check=False,
)
assert returncode != 0
qga_m1.exec_cmd("poweroff")
qga_m2.exec_cmd("poweroff")
wait_vm_down("m1_machine", vm_m1)
wait_vm_down("m2_machine", vm_m2)
vm_m1.join()
vm_m2.join()
assert result.returncode != 0

View File

@@ -2,10 +2,12 @@ from pathlib import Path
from typing import TYPE_CHECKING
import pytest
from clan_cli.clan_uri import FlakeId
from clan_cli.machines.machines import Machine
from clan_cli.vms.run import inspect_vm, spawn_vm
from fixtures_flakes import FlakeForTest, generate_flake
from helpers import cli
from helpers.nixos_config import nested_dict
from helpers.vms import qga_connect, qmp_connect, run_vm_in_thread, wait_vm_down
from root import CLAN_CORE
from stdout import CaptureOutput
@@ -51,7 +53,7 @@ def test_run(
"user1",
]
)
cli.run(["vms", "run", "vm1"])
cli.run(["vms", "run", "--no-block", "vm1", "shutdown", "-h", "now"])
@pytest.mark.skipif(no_kvm, reason="Requires KVM")
@@ -66,7 +68,7 @@ def test_vm_persistence(
config["my_machine"]["systemd"]["services"]["logrotate-checkconf"]["enable"] = False
config["my_machine"]["services"]["getty"]["autologinUser"] = "root"
config["my_machine"]["clan"]["virtualisation"] = {"graphics": False}
config["my_machine"]["clan"]["networking"] = {"targetHost": "client"}
config["my_machine"]["clan"]["core"]["networking"] = {"targetHost": "client"}
config["my_machine"]["clan"]["core"]["state"]["my_state"]["folders"] = [
# to be owned by root
"/var/my-state",
@@ -84,54 +86,40 @@ def test_vm_persistence(
machine_configs=config,
)
monkeypatch.chdir(flake.path)
vm_config = inspect_vm(machine=Machine("my_machine", FlakeId(str(flake.path))))
vm = run_vm_in_thread("my_machine")
# wait for the VM to start and connect qga
with qga_connect("my_machine", vm) as qga:
with spawn_vm(vm_config) as vm, vm.qga_connect() as qga:
# create state via qmp command instead of systemd service
qga.run("echo 'dream2nix' > /var/my-state/root", check=True)
qga.run("echo 'dream2nix' > /var/my-state/test", check=True)
qga.run("chown test /var/my-state/test", check=True)
qga.run("chown test /var/user-state", check=True)
qga.run("touch /var/my-state/rebooting", check=True)
qga.exec_cmd("poweroff")
# wait for socket to be down (systemd service 'poweroff' rebooting machine)
wait_vm_down("my_machine", vm)
vm.join()
qga.run(["/bin/sh", "-c", "echo 'dream2nix' > /var/my-state/root"])
qga.run(["/bin/sh", "-c", "echo 'dream2nix' > /var/my-state/test"])
qga.run(["/bin/sh", "-c", "chown test /var/my-state/test"])
qga.run(["/bin/sh", "-c", "chown test /var/user-state"])
qga.run_nonblocking(["shutdown", "-h", "now"])
## start vm again
vm = run_vm_in_thread("my_machine")
## connect second time
with qga_connect("my_machine", vm) as qga:
with spawn_vm(vm_config) as vm, vm.qga_connect() as qga:
# check state exists
qga.run("cat /var/my-state/test", check=True)
qga.run(["cat", "/var/my-state/test"])
# ensure root file is owned by root
qga.run("stat -c '%U' /var/my-state/root", check=True)
qga.run(["stat", "-c", "%U", "/var/my-state/root"])
# ensure test file is owned by test
qga.run("stat -c '%U' /var/my-state/test", check=True)
qga.run(["stat", "-c", "%U", "/var/my-state/test"])
# ensure /var/user-state is owned by test
qga.run("stat -c '%U' /var/user-state", check=True)
qga.run(["stat", "-c", "%U", "/var/user-state"])
# ensure that the file created by the service is still there and has the expected content
exitcode, out, err = qga.run("cat /var/my-state/test")
assert exitcode == 0, err
assert out == "dream2nix\n", out
result = qga.run(["cat", "/var/my-state/test"])
assert result.stdout == "dream2nix\n", result.stdout
# check for errors
exitcode, out, err = qga.run("cat /var/my-state/error")
assert exitcode == 1, out
result = qga.run(["cat", "/var/my-state/error"], check=False)
assert result.returncode == 1, result.stdout
# check all systemd services are OK, or print details
exitcode, out, err = qga.run(
"systemctl --failed | tee /tmp/yolo | grep -q '0 loaded units listed' || ( cat /tmp/yolo && false )"
result = qga.run(
[
"/bin/sh",
"-c",
"systemctl --failed | tee /tmp/log | grep -q '0 loaded units listed' || ( cat /tmp/log && false )",
],
)
assert exitcode == 0, out
with qmp_connect("my_machine", vm) as qmp:
qmp.command("system_powerdown")
vm.join()

View File

@@ -158,14 +158,9 @@ class VMObject(GObject.Object):
)
assert self.machine is not None
if self.machine.flake.is_local():
state_dir = vm_state_dir(
flake_url=str(self.machine.flake.path), vm_name=self.machine.name
)
else:
state_dir = vm_state_dir(
flake_url=self.machine.flake.url, vm_name=self.machine.name
)
state_dir = vm_state_dir(
flake_url=self.machine.flake, vm_name=self.machine.name
)
self.qmp_wrap = QMPWrapper(state_dir)
assert self.machine is not None
yield self.machine