Files
clan-core/pkgs/clan-cli/clan_cli/vms/run.py
2024-01-19 13:56:17 +01:00

275 lines
8.4 KiB
Python

import argparse
import json
import logging
import os
import sys
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import IO
from ..cmd import Log, run
from ..dirs import module_root, specific_groot_dir, vm_state_dir
from ..errors import ClanError
from ..nix import nix_build, nix_config, nix_shell
from .inspect import VmConfig, inspect_vm
log = logging.getLogger(__name__)
def graphics_options(vm: VmConfig) -> list[str]:
common = ["-audio", "driver=pa,model=virtio"]
if vm.wayland:
# fmt: off
return [
*common,
"-nographic",
"-vga", "none",
"-device", "virtio-gpu-rutabaga,gfxstream-vulkan=on,cross-domain=on,hostmem=4G,wsi=headless",
]
# fmt: on
else:
# fmt: off
return [
*common,
"-vga", "none",
"-display", "gtk,gl=on",
"-device", "virtio-gpu-gl",
"-display", "spice-app,gl=on",
"-device", "virtio-serial-pci",
"-chardev", "spicevmc,id=vdagent0,name=vdagent",
"-device", "virtserialport,chardev=vdagent0,name=com.redhat.spice.0",
"-device", "qemu-xhci,id=spicepass",
"-chardev", "spicevmc,id=usbredirchardev1,name=usbredir",
"-device", "usb-redir,chardev=usbredirchardev1,id=usbredirdev1",
"-chardev", "spicevmc,id=usbredirchardev2,name=usbredir",
"-device", "usb-redir,chardev=usbredirchardev2,id=usbredirdev2",
"-chardev", "spicevmc,id=usbredirchardev3,name=usbredir",
"-device", "usb-redir,chardev=usbredirchardev3,id=usbredirdev3",
"-device", "pci-ohci,id=smartpass",
"-device", "usb-ccid",
"-chardev", "spicevmc,id=ccid,name=smartcard",
]
# fmt: on
def qemu_command(
vm: VmConfig,
nixos_config: dict[str, str],
xchg_dir: Path,
secrets_dir: Path,
state_dir: Path,
disk_img: Path,
) -> list[str]:
kernel_cmdline = [
(Path(nixos_config["toplevel"]) / "kernel-params").read_text(),
f'init={nixos_config["toplevel"]}/init',
f'regInfo={nixos_config["regInfo"]}/registration',
"console=ttyS0,115200n8",
]
if not vm.wayland:
kernel_cmdline.append("console=tty0")
# fmt: off
command = [
"qemu-kvm",
"-name", vm.flake_attr,
"-m", f'{nixos_config["memorySize"]}M',
"-smp", str(nixos_config["cores"]),
"-cpu", "max",
"-enable-kvm",
"-device", "virtio-rng-pci",
"-net", "nic,netdev=user.0,model=virtio",
"-netdev", "user,id=user.0",
"-virtfs", "local,path=/nix/store,security_model=none,mount_tag=nix-store",
"-virtfs", f"local,path={xchg_dir},security_model=none,mount_tag=shared",
"-virtfs", f"local,path={xchg_dir},security_model=none,mount_tag=xchg",
"-virtfs", f"local,path={secrets_dir},security_model=none,mount_tag=secrets",
"-virtfs", f"local,path={state_dir},security_model=none,mount_tag=state",
"-drive", f"cache=writeback,file={disk_img},format=raw,id=drive1,if=none,index=1,werror=report",
"-device", "virtio-blk-pci,bootindex=1,drive=drive1,serial=root",
"-device", "virtio-keyboard",
"-usb", "-device", "usb-tablet,bus=usb-bus.0",
"-kernel", f'{nixos_config["toplevel"]}/kernel',
"-initrd", nixos_config["initrd"],
"-append", " ".join(kernel_cmdline),
] # fmt: on
if vm.graphics:
command.extend(graphics_options(vm))
else:
command.append("-nographic")
return command
def get_vm_create_info(vm: VmConfig, nix_options: list[str]) -> dict[str, str]:
config = nix_config()
system = config["system"]
clan_dir = vm.flake_url
machine = vm.flake_attr
cmd = nix_build(
[
f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.system.clan.vm.create',
*nix_options,
],
specific_groot_dir(clan_name=vm.clan_name, flake_url=str(vm.flake_url))
/ f"vm-{machine}",
)
proc = run(cmd, log=Log.BOTH, error_msg=f"Could not build vm config for {machine}")
try:
return json.loads(Path(proc.stdout.strip()).read_text())
except json.JSONDecodeError as e:
raise ClanError(f"Failed to parse vm config: {e}")
def generate_secrets(
vm: VmConfig,
nixos_config: dict[str, str],
tmpdir: Path,
log_fd: IO[str] | None,
) -> Path:
secrets_dir = tmpdir / "secrets"
secrets_dir.mkdir(exist_ok=True)
env = os.environ.copy()
env["CLAN_DIR"] = str(vm.flake_url)
env["PYTHONPATH"] = str(":".join(sys.path)) # TODO do this in the clanCore module
env["SECRETS_DIR"] = str(secrets_dir)
# Only generate secrets for local clans
if isinstance(vm.flake_url, Path) and vm.flake_url.is_dir():
if Path(vm.flake_url).is_dir():
run([nixos_config["generateSecrets"], vm.clan_name], env=env)
else:
log.warning("won't generate secrets for non local clan")
cmd = [nixos_config["uploadSecrets"]]
run(
cmd,
env=env,
log=Log.BOTH,
error_msg=f"Could not upload secrets for {vm.flake_attr}",
)
return secrets_dir
def prepare_disk(tmpdir: Path, log_fd: IO[str] | None) -> Path:
disk_img = tmpdir / "disk.img"
cmd = nix_shell(
["nixpkgs#qemu"],
[
"qemu-img",
"create",
"-f",
"raw",
str(disk_img),
"1024M",
],
)
run(
cmd,
log=Log.BOTH,
error_msg=f"Could not create disk image at {disk_img}",
)
cmd = nix_shell(
["nixpkgs#e2fsprogs"],
[
"mkfs.ext4",
"-L",
"nixos",
str(disk_img),
],
)
run(
cmd,
log=Log.BOTH,
error_msg=f"Could not create ext4 filesystem at {disk_img}",
)
return disk_img
def run_vm(
vm: VmConfig, nix_options: list[str] = [], log_fd: IO[str] | None = None
) -> None:
"""
log_fd can be used to stream the output of all commands to a UI
"""
machine = vm.flake_attr
log.debug(f"Creating VM for {machine}")
# TODO: We should get this from the vm argument
nixos_config = get_vm_create_info(vm, nix_options)
with tempfile.TemporaryDirectory() as tmpdir_:
tmpdir = Path(tmpdir_)
xchg_dir = tmpdir / "xchg"
xchg_dir.mkdir(exist_ok=True)
secrets_dir = generate_secrets(vm, nixos_config, tmpdir, log_fd)
disk_img = prepare_disk(tmpdir, log_fd)
state_dir = vm_state_dir(vm.clan_name, str(vm.flake_url), machine)
state_dir.mkdir(parents=True, exist_ok=True)
qemu_cmd = qemu_command(
vm,
nixos_config,
xchg_dir=xchg_dir,
secrets_dir=secrets_dir,
state_dir=state_dir,
disk_img=disk_img,
)
packages = ["nixpkgs#qemu"]
env = os.environ.copy()
if vm.graphics and not vm.wayland:
packages.append("nixpkgs#virt-viewer")
remote_viewer_mimetypes = module_root() / "vms" / "mimetypes"
env[
"XDG_DATA_DIRS"
] = f"{remote_viewer_mimetypes}:{env.get('XDG_DATA_DIRS', '')}"
run(
nix_shell(packages, qemu_cmd),
env=env,
log=Log.BOTH,
error_msg=f"Could not start vm {machine}",
)
@dataclass
class RunOptions:
machine: str
flake_url: str | None
flake: Path
nix_options: list[str] = field(default_factory=list)
wayland: bool = False
def run_command(args: argparse.Namespace) -> None:
run_options = RunOptions(
machine=args.machine,
flake_url=args.flake_url,
flake=args.flake or Path.cwd(),
nix_options=args.option,
wayland=args.wayland,
)
flake_url = run_options.flake_url or run_options.flake
vm = inspect_vm(flake_url=flake_url, flake_attr=run_options.machine)
# TODO: allow to set this in the config
vm.wayland = run_options.wayland
run_vm(vm, run_options.nix_options)
def register_run_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument("machine", type=str, help="machine in the flake to run")
parser.add_argument("--flake-url", type=str, help="flake url")
parser.add_argument("--wayland", action="store_true", help="use wayland")
parser.set_defaults(func=run_command)