From c7bf416af42b9177eec3b85d91c0270b7dc526af Mon Sep 17 00:00:00 2001 From: Qubasa Date: Mon, 2 Oct 2023 15:36:02 +0200 Subject: [PATCH 1/4] CLI: Use API functions --- pkgs/clan-cli/.vscode/launch.json | 9 ++ pkgs/clan-cli/clan_cli/__init__.py | 17 ++- pkgs/clan-cli/clan_cli/__main__.py | 4 + pkgs/clan-cli/clan_cli/machines/list.py | 4 + pkgs/clan-cli/clan_cli/vms/create.py | 102 +++-------------- pkgs/clan-cli/clan_cli/vms/inspect.py | 32 +----- pkgs/clan-cli/clan_cli/webui/__init__.py | 2 + pkgs/clan-cli/clan_cli/webui/__main__.py | 5 + pkgs/clan-cli/clan_cli/webui/app.py | 13 +-- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 109 +++++++++++++++---- pkgs/clan-cli/clan_cli/webui/server.py | 100 +++++++++++++++++ pkgs/clan-cli/clan_cli/webui/task_manager.py | 11 +- 12 files changed, 259 insertions(+), 149 deletions(-) create mode 100644 pkgs/clan-cli/clan_cli/__main__.py diff --git a/pkgs/clan-cli/.vscode/launch.json b/pkgs/clan-cli/.vscode/launch.json index ab2ef11e6..4e2c20a75 100644 --- a/pkgs/clan-cli/.vscode/launch.json +++ b/pkgs/clan-cli/.vscode/launch.json @@ -12,6 +12,15 @@ "justMyCode": false, "args": [ "--reload", "--no-open", "--log-level", "debug" ], + }, + { + "name": "Clan Cli VMs", + "type": "python", + "request": "launch", + "module": "clan_cli", + "justMyCode": false, + "args": [ "vms" ], + } ] } \ No newline at end of file diff --git a/pkgs/clan-cli/clan_cli/__init__.py b/pkgs/clan-cli/clan_cli/__init__.py index bfbe083e8..7cd2c3a28 100644 --- a/pkgs/clan-cli/clan_cli/__init__.py +++ b/pkgs/clan-cli/clan_cli/__init__.py @@ -1,12 +1,15 @@ import argparse +import logging import sys from types import ModuleType from typing import Optional -from . import config, create, machines, secrets, vms, webui +from . import config, create, custom_logger, machines, secrets, vms, webui from .errors import ClanError from .ssh import cli as ssh_cli +log = logging.getLogger(__name__) + argcomplete: Optional[ModuleType] = None try: import argcomplete # type: ignore[no-redef] @@ -62,14 +65,20 @@ def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser: def main() -> None: parser = create_parser() args = parser.parse_args() + + if args.debug: + custom_logger.register(logging.DEBUG) + log.debug("Debug logging enabled") + else: + custom_logger.register(logging.INFO) + if not hasattr(args, "func"): + log.error("No argparse function registered") return try: args.func(args) except ClanError as e: - if args.debug: - raise - print(f"{sys.argv[0]}: {e}") + log.exception(e) sys.exit(1) diff --git a/pkgs/clan-cli/clan_cli/__main__.py b/pkgs/clan-cli/clan_cli/__main__.py new file mode 100644 index 000000000..868d99efc --- /dev/null +++ b/pkgs/clan-cli/clan_cli/__main__.py @@ -0,0 +1,4 @@ +from . import main + +if __name__ == "__main__": + main() diff --git a/pkgs/clan-cli/clan_cli/machines/list.py b/pkgs/clan-cli/clan_cli/machines/list.py index dc4755f69..ae8b1d3b1 100644 --- a/pkgs/clan-cli/clan_cli/machines/list.py +++ b/pkgs/clan-cli/clan_cli/machines/list.py @@ -1,12 +1,16 @@ import argparse +import logging import os from .folders import machines_folder from .types import validate_hostname +log = logging.getLogger(__name__) + def list_machines() -> list[str]: path = machines_folder() + log.debug(f"Listing machines in {path}") if not path.exists(): return [] objs: list[str] = [] diff --git a/pkgs/clan-cli/clan_cli/vms/create.py b/pkgs/clan-cli/clan_cli/vms/create.py index 93ffa6b58..a01c31640 100644 --- a/pkgs/clan-cli/clan_cli/vms/create.py +++ b/pkgs/clan-cli/clan_cli/vms/create.py @@ -1,99 +1,23 @@ import argparse -import json -import subprocess -import tempfile -from pathlib import Path +import asyncio from ..dirs import get_clan_flake_toplevel -from ..nix import nix_build, nix_shell - - -def get_vm_create_info(machine: str) -> dict: - clan_dir = get_clan_flake_toplevel().as_posix() - - # config = nix_config() - # system = config["system"] - - vm_json = subprocess.run( - nix_build( - [ - # f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.clan.virtualisation.createJSON' # TODO use this - f'{clan_dir}#nixosConfigurations."{machine}".config.system.clan.vm.create' - ] - ), - stdout=subprocess.PIPE, - check=True, - text=True, - ).stdout.strip() - with open(vm_json) as f: - return json.load(f) +from ..webui.routers import vms +from ..webui.schemas import VmConfig def create(args: argparse.Namespace) -> None: - print(f"Creating VM for {args.machine}") - machine = args.machine - vm_config = get_vm_create_info(machine) - with tempfile.TemporaryDirectory() as tmpdir_: - xchg_dir = Path(tmpdir_) / "xchg" - xchg_dir.mkdir() - disk_img = f"{tmpdir_}/disk.img" - subprocess.run( - nix_shell( - ["qemu"], - [ - "qemu-img", - "create", - "-f", - "raw", - disk_img, - "1024M", - ], - ), - stdout=subprocess.PIPE, - check=True, - text=True, - ) - subprocess.run( - [ - "mkfs.ext4", - "-L", - "nixos", - disk_img, - ], - stdout=subprocess.PIPE, - check=True, - text=True, - ) + clan_dir = get_clan_flake_toplevel().as_posix() + vm = VmConfig( + flake_url=clan_dir, + flake_attr=args.machine, + cores=0, + graphics=False, + memory_size=0, + ) - subprocess.run( - nix_shell( - ["qemu"], - [ - # fmt: off - "qemu-kvm", - "-name", machine, - "-m", f'{vm_config["memorySize"]}M', - "-smp", str(vm_config["cores"]), - "-device", "virtio-rng-pci", - "-net", "nic,netdev=user.0,model=virtio", "-netdev", "user,id=user.0", - "-virtfs", "local,path=/nix/store,security_model=none,mount_tag=nix-store", - "-virtfs", f"local,path={xchg_dir},security_model=none,mount_tag=shared", - "-virtfs", f"local,path={xchg_dir},security_model=none,mount_tag=xchg", - "-drive", f'cache=writeback,file={disk_img},format=raw,id=drive1,if=none,index=1,werror=report', - "-device", "virtio-blk-pci,bootindex=1,drive=drive1,serial=root", - "-device", "virtio-keyboard", - "-usb", - "-device", "usb-tablet,bus=usb-bus.0", - "-kernel", f'{vm_config["toplevel"]}/kernel', - "-initrd", vm_config["initrd"], - "-append", f'{(Path(vm_config["toplevel"]) / "kernel-params").read_text()} init={vm_config["toplevel"]}/init regInfo={vm_config["regInfo"]}/registration console=ttyS0,115200n8 console=tty0', - # fmt: on - ], - ), - stdout=subprocess.PIPE, - check=True, - text=True, - ) + res = asyncio.run(vms.create_vm(vm)) + print(res.json()) def register_create_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/clan-cli/clan_cli/vms/inspect.py b/pkgs/clan-cli/clan_cli/vms/inspect.py index 67e5fedc8..f98009a9d 100644 --- a/pkgs/clan-cli/clan_cli/vms/inspect.py +++ b/pkgs/clan-cli/clan_cli/vms/inspect.py @@ -1,36 +1,14 @@ import argparse -import json -import subprocess +import asyncio from ..dirs import get_clan_flake_toplevel -from ..nix import nix_eval - - -def get_vm_inspect_info(machine: str) -> dict: - clan_dir = get_clan_flake_toplevel().as_posix() - - # config = nix_config() - # system = config["system"] - - return json.loads( - subprocess.run( - nix_eval( - [ - # f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.clan.virtualisation' # TODO use this - f'{clan_dir}#nixosConfigurations."{machine}".config.system.clan.vm.config' - ] - ), - stdout=subprocess.PIPE, - check=True, - text=True, - ).stdout - ) +from ..webui.routers import vms def inspect(args: argparse.Namespace) -> None: - print(f"Creating VM for {args.machine}") - machine = args.machine - print(get_vm_inspect_info(machine)) + clan_dir = get_clan_flake_toplevel().as_posix() + res = asyncio.run(vms.inspect_vm(flake_url=clan_dir, flake_attr=args.machine)) + print(res.json()) def register_inspect_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/clan-cli/clan_cli/webui/__init__.py b/pkgs/clan-cli/clan_cli/webui/__init__.py index fc1d8ca55..ca71979ed 100644 --- a/pkgs/clan-cli/clan_cli/webui/__init__.py +++ b/pkgs/clan-cli/clan_cli/webui/__init__.py @@ -45,6 +45,8 @@ def register_parser(parser: argparse.ArgumentParser) -> None: help="Log level", choices=["critical", "error", "warning", "info", "debug", "trace"], ) + + # Set the args.func variable in args if start_server is None: parser.set_defaults(func=fastapi_is_not_installed) else: diff --git a/pkgs/clan-cli/clan_cli/webui/__main__.py b/pkgs/clan-cli/clan_cli/webui/__main__.py index c551d7042..f6bd9ea79 100644 --- a/pkgs/clan-cli/clan_cli/webui/__main__.py +++ b/pkgs/clan-cli/clan_cli/webui/__main__.py @@ -5,6 +5,11 @@ from . import register_parser if __name__ == "__main__": # this is use in our integration test parser = argparse.ArgumentParser() + # call the register_parser function, which adds arguments to the parser register_parser(parser) args = parser.parse_args() + + # call the function that is stored + # in the func attribute of args, and pass args as the argument + # look into register_parser to see how this is done args.func(args) diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index b3efaa603..daf415861 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -5,7 +5,6 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles -from .. import custom_logger from .assets import asset_path from .routers import flake, health, machines, root, utils, vms @@ -43,15 +42,11 @@ def setup_app() -> FastAPI: if isinstance(route, APIRoute): route.operation_id = route.name # in this case, 'read_items' log.debug(f"Registered route: {route}") + + for i in app.exception_handlers.items(): + log.debug(f"Registered exception handler: {i}") + return app -# TODO: How do I get the log level from the command line in here? -custom_logger.register(logging.DEBUG) app = setup_app() - -for i in app.exception_handlers.items(): - log.info(f"Registered exception handler: {i}") - -log.warning("log warn") -log.debug("log debug") diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 3011c32c5..40637e046 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -1,12 +1,14 @@ import json import logging +import tempfile +from pathlib import Path from typing import Annotated, Iterator from uuid import UUID -from fastapi import APIRouter, BackgroundTasks, Body +from fastapi import APIRouter, Body from fastapi.responses import StreamingResponse -from ...nix import nix_build, nix_eval +from ...nix import nix_build, nix_eval, nix_shell from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse from ..task_manager import BaseTask, get_task, register_task from .utils import run_cmd @@ -36,23 +38,94 @@ class BuildVmTask(BaseTask): super().__init__(uuid) self.vm = vm - def run(self) -> None: - try: - self.log.debug(f"BuildVM with uuid {self.uuid} started") - cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) + def get_vm_create_info(self) -> dict: + clan_dir = self.vm.flake_url + machine = self.vm.flake_attr + cmd_state = self.run_cmd( + nix_build( + [ + # f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.clan.virtualisation.createJSON' # TODO use this + f'{clan_dir}#nixosConfigurations."{machine}".config.system.clan.vm.create' + ] + ) + ) + vm_json = "".join(cmd_state.stdout) + self.log.debug(f"VM JSON path: {vm_json}") + with open(vm_json) as f: + return json.load(f) - proc = self.run_cmd(cmd) - self.log.debug(f"stdout: {proc.stdout}") + def task_run(self) -> None: + machine = self.vm.flake_attr + self.log.debug(f"Creating VM for {machine}") + vm_config = self.get_vm_create_info() + with tempfile.TemporaryDirectory() as tmpdir_: + xchg_dir = Path(tmpdir_) / "xchg" + xchg_dir.mkdir() + disk_img = f"{tmpdir_}/disk.img" + cmd = nix_shell( + ["qemu"], + [ + "qemu" "qemu-img", + "create", + "-f", + "raw", + disk_img, + "1024M", + ], + ) + self.run_cmd(cmd) - vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" - self.log.debug(f"vm_path: {vm_path}") + cmd = [ + "mkfs.ext4", + "-L", + "nixos", + disk_img, + ] + self.run_cmd(cmd) - self.run_cmd([vm_path]) - self.finished = True - except Exception as e: - self.failed = True - self.finished = True - log.exception(e) + cmd = nix_shell( + ["qemu"], + [ + # fmt: off + "qemu-kvm", + "-name", machine, + "-m", f'{vm_config["memorySize"]}M', + "-smp", str(vm_config["cores"]), + "-device", "virtio-rng-pci", + "-net", "nic,netdev=user.0,model=virtio", "-netdev", "user,id=user.0", + "-virtfs", "local,path=/nix/store,security_model=none,mount_tag=nix-store", + "-virtfs", f"local,path={xchg_dir},security_model=none,mount_tag=shared", + "-virtfs", f"local,path={xchg_dir},security_model=none,mount_tag=xchg", + "-drive", f'cache=writeback,file={disk_img},format=raw,id=drive1,if=none,index=1,werror=report', + "-device", "virtio-blk-pci,bootindex=1,drive=drive1,serial=root", + "-device", "virtio-keyboard", + "-usb", + "-device", "usb-tablet,bus=usb-bus.0", + "-kernel", f'{vm_config["toplevel"]}/kernel', + "-initrd", vm_config["initrd"], + "-append", f'{(Path(vm_config["toplevel"]) / "kernel-params").read_text()} init={vm_config["toplevel"]}/init regInfo={vm_config["regInfo"]}/registration console=ttyS0,115200n8 console=tty0', + # fmt: on + ], + ) + self.run_cmd(cmd) + + # def run(self) -> None: + # try: + # self.log.debug(f"BuildVM with uuid {self.uuid} started") + # cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) + + # proc = self.run_cmd(cmd) + # self.log.debug(f"stdout: {proc.stdout}") + + # vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" + # self.log.debug(f"vm_path: {vm_path}") + + # self.run_cmd([vm_path]) + # self.finished = True + # except Exception as e: + # self.failed = True + # self.finished = True + # log.exception(e) @router.post("/api/vms/inspect") @@ -104,8 +177,6 @@ async def get_vm_logs(uuid: UUID) -> StreamingResponse: @router.post("/api/vms/create") -async def create_vm( - vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks -) -> VmCreateResponse: +async def create_vm(vm: Annotated[VmConfig, Body()]) -> VmCreateResponse: uuid = register_task(BuildVmTask, vm) return VmCreateResponse(uuid=str(uuid)) diff --git a/pkgs/clan-cli/clan_cli/webui/server.py b/pkgs/clan-cli/clan_cli/webui/server.py index 8d67d5a45..f780f9b62 100644 --- a/pkgs/clan-cli/clan_cli/webui/server.py +++ b/pkgs/clan-cli/clan_cli/webui/server.py @@ -1,6 +1,11 @@ import argparse import logging +import multiprocessing as mp +import os +import socket import subprocess +import sys +import syslog import time import urllib.request import webbrowser @@ -90,3 +95,98 @@ def start_server(args: argparse.Namespace) -> None: access_log=args.log_level == "debug", headers=headers, ) + + +# Define a function that takes the path of the file socket as input and returns True if it is served, False otherwise +def is_served(file_socket: Path) -> bool: + # Create a Unix stream socket + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # Try to connect to the file socket + try: + client.connect(str(file_socket)) + # Connection succeeded, return True + return True + except OSError: + # Connection failed, return False + return False + finally: + # Close the client socket + client.close() + + +def set_out_to_syslog() -> None: # type: ignore + # Define some constants for convenience + log_levels = { + "emerg": syslog.LOG_EMERG, + "alert": syslog.LOG_ALERT, + "crit": syslog.LOG_CRIT, + "err": syslog.LOG_ERR, + "warning": syslog.LOG_WARNING, + "notice": syslog.LOG_NOTICE, + "info": syslog.LOG_INFO, + "debug": syslog.LOG_DEBUG, + } + facility = syslog.LOG_USER # Use user facility for custom applications + + # Open a connection to the system logger + syslog.openlog("clan-cli", 0, facility) # Use "myapp" as the prefix for messages + + # Define a custom write function that sends messages to syslog + def write(message: str) -> int: + # Strip the newline character from the message + message = message.rstrip("\n") + # Check if the message is not empty + if message: + # Send the message to syslog with the appropriate level + if message.startswith("ERROR:"): + # Use error level for messages that start with "ERROR:" + syslog.syslog(log_levels["err"], message) + else: + # Use info level for other messages + syslog.syslog(log_levels["info"], message) + return 0 + + # Assign the custom write function to sys.stdout and sys.stderr + setattr(sys.stdout, "write", write) + setattr(sys.stderr, "write", write) + + # Define a dummy flush function to prevent errors + def flush() -> None: + pass + + # Assign the dummy flush function to sys.stdout and sys.stderr + setattr(sys.stdout, "flush", flush) + setattr(sys.stderr, "flush", flush) + + +def _run_socketfile(socket_file: Path, debug: bool) -> None: + set_out_to_syslog() + run( + "clan_cli.webui.app:app", + uds=str(socket_file), + access_log=debug, + reload=False, + log_level="debug" if debug else "info", + ) + + +@contextmanager +def api_server(debug: bool) -> Iterator[Path]: + runtime_dir = os.getenv("XDG_RUNTIME_DIR") + if runtime_dir is None: + raise RuntimeError("XDG_RUNTIME_DIR not set") + socket_path = Path(runtime_dir) / "clan.sock" + socket_path = socket_path.resolve() + + log.debug("Socketfile lies at %s", socket_path) + + if not is_served(socket_path): + log.debug("Starting api server...") + mp.set_start_method(method="spawn") + proc = mp.Process(target=_run_socketfile, args=(socket_path, debug)) + proc.start() + else: + log.info("Api server is already running on %s", socket_path) + + yield socket_path + proc.terminate() diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 21374cb55..58a5995a4 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -33,7 +33,16 @@ class BaseTask(threading.Thread): self.finished: bool = False def run(self) -> None: - self.finished = True + try: + self.task_run() + except Exception as e: + self.failed = True + self.log.exception(e) + finally: + self.finished = True + + def task_run(self) -> None: + raise NotImplementedError def run_cmd(self, cmd: list[str]) -> CmdState: cwd = os.getcwd() From ab6b96e5162f828011bc111b0d4654bce4724aab Mon Sep 17 00:00:00 2001 From: Qubasa Date: Mon, 2 Oct 2023 18:36:50 +0200 Subject: [PATCH 2/4] CLI: Restructured TaskManager and log collection --- pkgs/clan-cli/clan_cli/vms/create.py | 21 +++ pkgs/clan-cli/clan_cli/webui/routers/vms.py | 75 ++++------ pkgs/clan-cli/clan_cli/webui/task_manager.py | 137 ++++++++++++------- 3 files changed, 136 insertions(+), 97 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/vms/create.py b/pkgs/clan-cli/clan_cli/vms/create.py index a01c31640..d2481326d 100644 --- a/pkgs/clan-cli/clan_cli/vms/create.py +++ b/pkgs/clan-cli/clan_cli/vms/create.py @@ -1,10 +1,24 @@ import argparse import asyncio +from uuid import UUID +import threading +import queue from ..dirs import get_clan_flake_toplevel from ..webui.routers import vms from ..webui.schemas import VmConfig +from typing import Any, Iterator +from fastapi.responses import StreamingResponse +import pdb +def read_stream_response(stream: StreamingResponse) -> Iterator[Any]: + iterator = stream.body_iterator + while True: + try: + tem = asyncio.run(iterator.__anext__()) + except StopAsyncIteration: + break + yield tem def create(args: argparse.Namespace) -> None: clan_dir = get_clan_flake_toplevel().as_posix() @@ -18,6 +32,13 @@ def create(args: argparse.Namespace) -> None: res = asyncio.run(vms.create_vm(vm)) print(res.json()) + uuid = UUID(res.uuid) + + res = asyncio.run(vms.get_vm_logs(uuid)) + + for line in read_stream_response(res): + print(line) + def register_create_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 40637e046..6f4377544 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -1,8 +1,9 @@ import json import logging import tempfile +import time from pathlib import Path -from typing import Annotated, Iterator +from typing import Annotated, Iterator, Iterable from uuid import UUID from fastapi import APIRouter, Body @@ -10,7 +11,7 @@ from fastapi.responses import StreamingResponse from ...nix import nix_build, nix_eval, nix_shell from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse -from ..task_manager import BaseTask, get_task, register_task +from ..task_manager import BaseTask, get_task, register_task, CmdState from .utils import run_cmd log = logging.getLogger(__name__) @@ -38,10 +39,11 @@ class BuildVmTask(BaseTask): super().__init__(uuid) self.vm = vm - def get_vm_create_info(self) -> dict: + def get_vm_create_info(self, cmds: Iterable[CmdState]) -> dict: clan_dir = self.vm.flake_url machine = self.vm.flake_attr - cmd_state = self.run_cmd( + cmd = next(cmds) + cmd.run( nix_build( [ # f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.clan.virtualisation.createJSON' # TODO use this @@ -49,41 +51,48 @@ class BuildVmTask(BaseTask): ] ) ) - vm_json = "".join(cmd_state.stdout) + vm_json = "".join(cmd.stdout) self.log.debug(f"VM JSON path: {vm_json}") with open(vm_json) as f: return json.load(f) def task_run(self) -> None: + cmds = self.register_cmds(4) + machine = self.vm.flake_attr self.log.debug(f"Creating VM for {machine}") - vm_config = self.get_vm_create_info() + + # TODO: We should get this from the vm argument + vm_config = self.get_vm_create_info(cmds) + with tempfile.TemporaryDirectory() as tmpdir_: xchg_dir = Path(tmpdir_) / "xchg" xchg_dir.mkdir() disk_img = f"{tmpdir_}/disk.img" - cmd = nix_shell( + + cmd = next(cmds) + cmd.run(nix_shell( ["qemu"], [ - "qemu" "qemu-img", + "qemu-img", "create", "-f", "raw", disk_img, "1024M", ], - ) - self.run_cmd(cmd) + )) - cmd = [ + cmd = next(cmds) + cmd.run([ "mkfs.ext4", "-L", "nixos", disk_img, - ] - self.run_cmd(cmd) + ]) - cmd = nix_shell( + cmd = next(cmds) + cmd.run(nix_shell( ["qemu"], [ # fmt: off @@ -106,26 +115,7 @@ class BuildVmTask(BaseTask): "-append", f'{(Path(vm_config["toplevel"]) / "kernel-params").read_text()} init={vm_config["toplevel"]}/init regInfo={vm_config["regInfo"]}/registration console=ttyS0,115200n8 console=tty0', # fmt: on ], - ) - self.run_cmd(cmd) - - # def run(self) -> None: - # try: - # self.log.debug(f"BuildVM with uuid {self.uuid} started") - # cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) - - # proc = self.run_cmd(cmd) - # self.log.debug(f"stdout: {proc.stdout}") - - # vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" - # self.log.debug(f"vm_path: {vm_path}") - - # self.run_cmd([vm_path]) - # self.finished = True - # except Exception as e: - # self.failed = True - # self.finished = True - # log.exception(e) + )) @router.post("/api/vms/inspect") @@ -154,21 +144,8 @@ async def get_vm_logs(uuid: UUID) -> StreamingResponse: def stream_logs() -> Iterator[str]: task = get_task(uuid) - for proc in task.procs: - if proc.done: - log.debug("stream logs and proc is done") - for line in proc.stderr: - yield line + "\n" - for line in proc.stdout: - yield line + "\n" - continue - while True: - out = proc.output - line = out.get() - if line is None: - log.debug("stream logs and line is None") - break - yield line + for line in task.logs_iter(): + yield line return StreamingResponse( content=stream_logs(), diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 58a5995a4..7e15930d2 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -5,19 +5,72 @@ import select import shlex import subprocess import threading -from typing import Any +from typing import Any, Iterable, Iterator from uuid import UUID, uuid4 class CmdState: - def __init__(self, proc: subprocess.Popen) -> None: - global LOOP - self.proc: subprocess.Popen = proc + def __init__(self, log: logging.Logger) -> None: + self.log: logging.Logger = log + self.p: subprocess.Popen = None self.stdout: list[str] = [] self.stderr: list[str] = [] - self.output: queue.SimpleQueue = queue.SimpleQueue() + self._output: queue.SimpleQueue = queue.SimpleQueue() self.returncode: int | None = None self.done: bool = False + self.running: bool = False + self.cmd_str: str | None = None + self.workdir: str | None = None + + def close_queue(self) -> None: + if self.p is not None: + self.returncode = self.p.returncode + self._output.put(None) + self.running = False + self.done = True + + def run(self, cmd: list[str]) -> None: + self.running = True + try: + self.cmd_str = shlex.join(cmd) + self.workdir = os.getcwd() + self.log.debug(f"Working directory: {self.workdir}") + self.log.debug(f"Running command: {shlex.join(cmd)}") + self.p = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + cwd=self.workdir, + ) + + while self.p.poll() is None: + # Check if stderr is ready to be read from + rlist, _, _ = select.select([self.p.stderr, self.p.stdout], [], [], 0) + if self.p.stderr in rlist: + assert self.p.stderr is not None + line = self.p.stderr.readline() + if line != "": + line = line.strip('\n') + self.stderr.append(line) + self.log.debug("stderr: %s", line) + self._output.put(line) + + if self.p.stdout in rlist: + assert self.p.stdout is not None + line = self.p.stdout.readline() + if line != "": + line = line.strip('\n') + self.stdout.append(line) + self.log.debug("stdout: %s", line) + self._output.put(line) + + if self.p.returncode != 0: + raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}") + + self.log.debug("Successfully ran command") + finally: + self.close_queue() class BaseTask(threading.Thread): @@ -31,64 +84,52 @@ class BaseTask(threading.Thread): self.procs: list[CmdState] = [] self.failed: bool = False self.finished: bool = False + self.logs_lock = threading.Lock() def run(self) -> None: try: self.task_run() except Exception as e: + for proc in self.procs: + proc.close_queue() self.failed = True - self.log.exception(e) - finally: self.finished = True + self.log.exception(e) + def task_run(self) -> None: raise NotImplementedError - def run_cmd(self, cmd: list[str]) -> CmdState: - cwd = os.getcwd() - self.log.debug(f"Working directory: {cwd}") - self.log.debug(f"Running command: {shlex.join(cmd)}") - p = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding="utf-8", - # shell=True, - cwd=cwd, - ) - self.procs.append(CmdState(p)) - p_state = self.procs[-1] + ## TODO: If two clients are connected to the same task, + def logs_iter(self) -> Iterator[str]: + with self.logs_lock: + for proc in self.procs: + if self.finished: + self.log.debug("log iter: Task is finished") + break + if proc.done: + for line in proc.stderr: + yield line + for line in proc.stdout: + yield line + continue + while True: + out = proc._output + line = out.get() + if line is None: + break + yield line - while p.poll() is None: - # Check if stderr is ready to be read from - rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0) - if p.stderr in rlist: - assert p.stderr is not None - line = p.stderr.readline() - if line != "": - p_state.stderr.append(line.strip("\n")) - self.log.debug(f"stderr: {line}") - p_state.output.put(line) + def register_cmds(self, num_cmds: int) -> Iterable[CmdState]: + for i in range(num_cmds): + cmd = CmdState(self.log) + self.procs.append(cmd) - if p.stdout in rlist: - assert p.stdout is not None - line = p.stdout.readline() - if line != "": - p_state.stdout.append(line.strip("\n")) - self.log.debug(f"stdout: {line}") - p_state.output.put(line) - - p_state.returncode = p.returncode - p_state.output.put(None) - p_state.done = True - - if p.returncode != 0: - raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}") - - self.log.debug("Successfully ran command") - return p_state + for cmd in self.procs: + yield cmd +# TODO: We need to test concurrency class TaskPool: def __init__(self) -> None: self.lock: threading.RLock = threading.RLock() From 14831e871fc7842e1dbff44e58c0ecb075fb1026 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 3 Oct 2023 11:51:31 +0200 Subject: [PATCH 3/4] Fixed failing tests --- pkgs/clan-cli/clan_cli/vms/create.py | 17 +++--- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 64 +++++++++++--------- pkgs/clan-cli/clan_cli/webui/task_manager.py | 16 ++--- pkgs/clan-cli/tests/test_vms_api.py | 13 +--- 4 files changed, 52 insertions(+), 58 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/vms/create.py b/pkgs/clan-cli/clan_cli/vms/create.py index d2481326d..78f441d55 100644 --- a/pkgs/clan-cli/clan_cli/vms/create.py +++ b/pkgs/clan-cli/clan_cli/vms/create.py @@ -1,25 +1,25 @@ import argparse import asyncio +from typing import Any, Iterator from uuid import UUID -import threading -import queue + +from fastapi.responses import StreamingResponse from ..dirs import get_clan_flake_toplevel from ..webui.routers import vms from ..webui.schemas import VmConfig -from typing import Any, Iterator -from fastapi.responses import StreamingResponse -import pdb + def read_stream_response(stream: StreamingResponse) -> Iterator[Any]: iterator = stream.body_iterator while True: try: - tem = asyncio.run(iterator.__anext__()) + tem = asyncio.run(iterator.__anext__()) # type: ignore except StopAsyncIteration: break yield tem + def create(args: argparse.Namespace) -> None: clan_dir = get_clan_flake_toplevel().as_posix() vm = VmConfig( @@ -34,13 +34,12 @@ def create(args: argparse.Namespace) -> None: print(res.json()) uuid = UUID(res.uuid) - res = asyncio.run(vms.get_vm_logs(uuid)) + stream = asyncio.run(vms.get_vm_logs(uuid)) - for line in read_stream_response(res): + for line in read_stream_response(stream): print(line) - def register_create_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument("machine", type=str) parser.set_defaults(func=create) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 6f4377544..88cd375d4 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -1,9 +1,8 @@ import json import logging import tempfile -import time from pathlib import Path -from typing import Annotated, Iterator, Iterable +from typing import Annotated, Iterator from uuid import UUID from fastapi import APIRouter, Body @@ -11,7 +10,7 @@ from fastapi.responses import StreamingResponse from ...nix import nix_build, nix_eval, nix_shell from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse -from ..task_manager import BaseTask, get_task, register_task, CmdState +from ..task_manager import BaseTask, CmdState, get_task, register_task from .utils import run_cmd log = logging.getLogger(__name__) @@ -39,7 +38,7 @@ class BuildVmTask(BaseTask): super().__init__(uuid) self.vm = vm - def get_vm_create_info(self, cmds: Iterable[CmdState]) -> dict: + def get_vm_create_info(self, cmds: Iterator[CmdState]) -> dict: clan_dir = self.vm.flake_url machine = self.vm.flake_attr cmd = next(cmds) @@ -71,31 +70,36 @@ class BuildVmTask(BaseTask): disk_img = f"{tmpdir_}/disk.img" cmd = next(cmds) - cmd.run(nix_shell( - ["qemu"], + cmd.run( + nix_shell( + ["qemu"], + [ + "qemu-img", + "create", + "-f", + "raw", + disk_img, + "1024M", + ], + ) + ) + + cmd = next(cmds) + cmd.run( [ - "qemu-img", - "create", - "-f", - "raw", + "mkfs.ext4", + "-L", + "nixos", disk_img, - "1024M", - ], - )) + ] + ) cmd = next(cmds) - cmd.run([ - "mkfs.ext4", - "-L", - "nixos", - disk_img, - ]) - - cmd = next(cmds) - cmd.run(nix_shell( - ["qemu"], - [ - # fmt: off + cmd.run( + nix_shell( + ["qemu"], + [ + # fmt: off "qemu-kvm", "-name", machine, "-m", f'{vm_config["memorySize"]}M', @@ -113,9 +117,10 @@ class BuildVmTask(BaseTask): "-kernel", f'{vm_config["toplevel"]}/kernel', "-initrd", vm_config["initrd"], "-append", f'{(Path(vm_config["toplevel"]) / "kernel-params").read_text()} init={vm_config["toplevel"]}/init regInfo={vm_config["regInfo"]}/registration console=ttyS0,115200n8 console=tty0', - # fmt: on - ], - )) + # fmt: on + ], + ) + ) @router.post("/api/vms/inspect") @@ -144,8 +149,7 @@ async def get_vm_logs(uuid: UUID) -> StreamingResponse: def stream_logs() -> Iterator[str]: task = get_task(uuid) - for line in task.logs_iter(): - yield line + yield from task.logs_iter() return StreamingResponse( content=stream_logs(), diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 7e15930d2..c0913e60f 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -5,14 +5,14 @@ import select import shlex import subprocess import threading -from typing import Any, Iterable, Iterator +from typing import Any, Iterator from uuid import UUID, uuid4 class CmdState: def __init__(self, log: logging.Logger) -> None: self.log: logging.Logger = log - self.p: subprocess.Popen = None + self.p: subprocess.Popen | None = None self.stdout: list[str] = [] self.stderr: list[str] = [] self._output: queue.SimpleQueue = queue.SimpleQueue() @@ -51,7 +51,7 @@ class CmdState: assert self.p.stderr is not None line = self.p.stderr.readline() if line != "": - line = line.strip('\n') + line = line.strip("\n") self.stderr.append(line) self.log.debug("stderr: %s", line) self._output.put(line) @@ -60,7 +60,7 @@ class CmdState: assert self.p.stdout is not None line = self.p.stdout.readline() if line != "": - line = line.strip('\n') + line = line.strip("\n") self.stdout.append(line) self.log.debug("stdout: %s", line) self._output.put(line) @@ -93,14 +93,14 @@ class BaseTask(threading.Thread): for proc in self.procs: proc.close_queue() self.failed = True - self.finished = True self.log.exception(e) - + finally: + self.finished = True def task_run(self) -> None: raise NotImplementedError - ## TODO: If two clients are connected to the same task, + ## TODO: If two clients are connected to the same task, def logs_iter(self) -> Iterator[str]: with self.logs_lock: for proc in self.procs: @@ -120,7 +120,7 @@ class BaseTask(threading.Thread): break yield line - def register_cmds(self, num_cmds: int) -> Iterable[CmdState]: + def register_cmds(self, num_cmds: int) -> Iterator[CmdState]: for i in range(num_cmds): cmd = CmdState(self.log) self.procs.append(cmd) diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 5bbc3c6d8..7904af19e 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -74,20 +74,11 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: assert response.status_code == 200, "Failed to get vm status" response = api.get(f"/api/vms/{uuid}/logs") - print("=========FLAKE LOGS==========") - assert isinstance(response.stream, SyncByteStream) - for line in response.stream: - assert line != b"", "Failed to get vm logs" - print(line.decode("utf-8"), end="") - print("=========END LOGS==========") - assert response.status_code == 200, "Failed to get vm logs" - - response = api.get(f"/api/vms/{uuid}/logs") - assert isinstance(response.stream, SyncByteStream) print("=========VM LOGS==========") + assert isinstance(response.stream, SyncByteStream) for line in response.stream: assert line != b"", "Failed to get vm logs" - print(line.decode("utf-8"), end="") + print(line.decode("utf-8")) print("=========END LOGS==========") assert response.status_code == 200, "Failed to get vm logs" From e0ef03fa03ba25f07d8daf67bb2ec7440086d9f1 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 3 Oct 2023 12:50:47 +0200 Subject: [PATCH 4/4] API: Added test for inspect_flake --- pkgs/clan-cli/clan_cli/secrets/generate.py | 5 +++++ pkgs/clan-cli/clan_cli/secrets/upload.py | 5 +++++ pkgs/clan-cli/tests/test_flake_api.py | 25 ++++++++++++++++++++-- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/secrets/generate.py b/pkgs/clan-cli/clan_cli/secrets/generate.py index 9e47c93cf..77d13ca41 100644 --- a/pkgs/clan-cli/clan_cli/secrets/generate.py +++ b/pkgs/clan-cli/clan_cli/secrets/generate.py @@ -1,4 +1,5 @@ import argparse +import logging import os import shlex import subprocess @@ -9,6 +10,8 @@ from clan_cli.errors import ClanError from ..dirs import get_clan_flake_toplevel, module_root from ..nix import nix_build, nix_config +log = logging.getLogger(__name__) + def build_generate_script(machine: str, clan_dir: Path) -> str: config = nix_config() @@ -39,6 +42,8 @@ def run_generate_secrets(secret_generator_script: str, clan_dir: Path) -> None: ) if proc.returncode != 0: + log.error("stdout: %s", proc.stdout) + log.error("stderr: %s", proc.stderr) raise ClanError("failed to generate secrets") else: print("successfully generated secrets") diff --git a/pkgs/clan-cli/clan_cli/secrets/upload.py b/pkgs/clan-cli/clan_cli/secrets/upload.py index 44aac77b5..8491b12a9 100644 --- a/pkgs/clan-cli/clan_cli/secrets/upload.py +++ b/pkgs/clan-cli/clan_cli/secrets/upload.py @@ -1,5 +1,6 @@ import argparse import json +import logging import os import shlex import subprocess @@ -11,6 +12,8 @@ from ..errors import ClanError from ..nix import nix_build, nix_config, nix_shell from ..ssh import parse_deployment_address +log = logging.getLogger(__name__) + def build_upload_script(machine: str, clan_dir: Path) -> str: config = nix_config() @@ -67,6 +70,8 @@ def run_upload_secrets( ) if proc.returncode != 0: + log.error("Stdout: %s", proc.stdout) + log.error("Stderr: %s", proc.stderr) raise ClanError("failed to upload secrets") h = parse_deployment_address(flake_attr, target) diff --git a/pkgs/clan-cli/tests/test_flake_api.py b/pkgs/clan-cli/tests/test_flake_api.py index 767af4f7b..c760f7194 100644 --- a/pkgs/clan-cli/tests/test_flake_api.py +++ b/pkgs/clan-cli/tests/test_flake_api.py @@ -1,11 +1,11 @@ from pathlib import Path - +import json import pytest from api import TestClient @pytest.mark.impure -def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: +def test_inspect_attrs(api: TestClient, test_flake_with_core: Path) -> None: params = {"url": str(test_flake_with_core)} response = api.get( "/api/flake/attrs", @@ -15,3 +15,24 @@ def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: data = response.json() print("Data: ", data) assert data.get("flake_attrs") == ["vm1"] + + + +@pytest.mark.impure +def test_inspect_flake(api: TestClient, test_flake_with_core: Path) -> None: + params = {"url": str(test_flake_with_core)} + response = api.get( + "/api/flake", + params=params, + ) + assert response.status_code == 200, "Failed to inspect vm" + data = response.json() + print("Data: ", json.dumps(data, indent=2)) + assert data.get("content") is not None + actions = data.get("actions") + assert actions is not None + assert len(actions) == 2 + assert actions[0].get("id") == "vms/inspect" + assert actions[0].get("uri") == "api/vms/inspect" + assert actions[1].get("id") == "vms/create" + assert actions[1].get("uri") == "api/vms/create" \ No newline at end of file