diff --git a/pkgs/clan-cli/.vscode/launch.json b/pkgs/clan-cli/.vscode/launch.json new file mode 100644 index 000000000..ab2ef11e6 --- /dev/null +++ b/pkgs/clan-cli/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Clan Webui", + "type": "python", + "request": "launch", + "module": "clan_cli.webui", + "justMyCode": false, + "args": [ "--reload", "--no-open", "--log-level", "debug" ], + + } + ] +} \ No newline at end of file diff --git a/pkgs/clan-cli/.vscode/settings.json b/pkgs/clan-cli/.vscode/settings.json new file mode 100644 index 000000000..66a301cad --- /dev/null +++ b/pkgs/clan-cli/.vscode/settings.json @@ -0,0 +1,15 @@ +{ + "python.testing.pytestArgs": [ + // Coverage is not supported by vscode: + // https://github.com/Microsoft/vscode-python/issues/693 + // Note that this will make pytest fail if pytest-cov is not installed, + // if that's the case, then this option needs to be be removed (overrides + // can be set at a workspace level, it's up to you to decide what's the + // best approach). You might also prefer to only set this option + // per-workspace (wherever coverage is used). + "--no-cov", + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true, +} \ No newline at end of file diff --git a/pkgs/clan-cli/README.md b/pkgs/clan-cli/README.md index df9c50907..33583272d 100644 --- a/pkgs/clan-cli/README.md +++ b/pkgs/clan-cli/README.md @@ -28,6 +28,32 @@ To start a local developement environment instead, use the `--dev` flag: This will spawn two webserver, a python one to for the api and a nodejs one that rebuilds the ui on the fly. +## Run webui directly + +Useful for vscode run and debug option + +```bash +python -m clan_cli.webui --reload --no-open +``` + +Add this `launch.json` to your .vscode directory to have working breakpoints in your vscode editor. + +```json +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Clan Webui", + "type": "python", + "request": "launch", + "module": "clan_cli.webui", + "justMyCode": true, + "args": ["--reload", "--no-open", "--log-level", "debug"] + } + ] +} +``` + ## Run locally single-threaded for debugging By default tests run in parallel using pytest-parallel. diff --git a/pkgs/clan-cli/clan_cli/custom_logger.py b/pkgs/clan-cli/clan_cli/custom_logger.py new file mode 100644 index 000000000..d8b7f9fa5 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/custom_logger.py @@ -0,0 +1,37 @@ +import logging +from typing import Any + +grey = "\x1b[38;20m" +yellow = "\x1b[33;20m" +red = "\x1b[31;20m" +bold_red = "\x1b[31;1m" +green = "\u001b[32m" +blue = "\u001b[34m" + + +def get_formatter(color: str) -> logging.Formatter: + reset = "\x1b[0m" + return logging.Formatter( + f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s" + ) + + +FORMATTER = { + logging.DEBUG: get_formatter(blue), + logging.INFO: get_formatter(green), + logging.WARNING: get_formatter(yellow), + logging.ERROR: get_formatter(red), + logging.CRITICAL: get_formatter(bold_red), +} + + +class CustomFormatter(logging.Formatter): + def format(self, record: Any) -> str: + return FORMATTER[record.levelno].format(record) + + +def register(level: Any) -> None: + ch = logging.StreamHandler() + ch.setLevel(level) + ch.setFormatter(CustomFormatter()) + logging.basicConfig(level=level, handlers=[ch]) diff --git a/pkgs/clan-cli/clan_cli/webui/app.py b/pkgs/clan-cli/clan_cli/webui/app.py index 2eb53aade..b392c2118 100644 --- a/pkgs/clan-cli/clan_cli/webui/app.py +++ b/pkgs/clan-cli/clan_cli/webui/app.py @@ -1,14 +1,19 @@ +import logging + from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute from fastapi.staticfiles import StaticFiles +from .. import custom_logger from .assets import asset_path from .routers import flake, health, machines, root, vms origins = [ "http://localhost:3000", ] +# Logging setup +log = logging.getLogger(__name__) def setup_app() -> FastAPI: @@ -23,8 +28,11 @@ def setup_app() -> FastAPI: app.include_router(flake.router) app.include_router(health.router) app.include_router(machines.router) - app.include_router(root.router) app.include_router(vms.router) + + # Needs to be last in register. Because of wildcard route + app.include_router(root.router) + app.add_exception_handler(vms.NixBuildException, vms.nix_build_exception_handler) app.mount("/static", StaticFiles(directory=asset_path()), name="static") @@ -32,7 +40,16 @@ def setup_app() -> FastAPI: for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name # in this case, 'read_items' + log.debug(f"Registered route: {route}") return app +# TODO: How do I get the log level from the command line in here? +custom_logger.register(logging.DEBUG) app = setup_app() + +for i in app.exception_handlers.items(): + log.info(f"Registered exception handler: {i}") + +log.warn("log warn") +log.debug("log debug") diff --git a/pkgs/clan-cli/clan_cli/webui/routers/machines.py b/pkgs/clan-cli/clan_cli/webui/routers/machines.py index 61fd5edc7..1a19530a4 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/machines.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/machines.py @@ -1,3 +1,5 @@ +# Logging setup +import logging from typing import Annotated from fastapi import APIRouter, Body @@ -19,6 +21,7 @@ from ..schemas import ( Status, ) +log = logging.getLogger(__name__) router = APIRouter() @@ -38,7 +41,7 @@ async def create_machine(machine: Annotated[MachineCreate, Body()]) -> MachineRe @router.get("/api/machines/{name}") async def get_machine(name: str) -> MachineResponse: - print("TODO") + log.error("TODO") return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN)) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index 18c8c74bd..5b59329b6 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -1,18 +1,38 @@ import asyncio import json +import logging import shlex -from typing import Annotated, AsyncIterator +from typing import Annotated, Iterator +from uuid import UUID -from fastapi import APIRouter, Body, HTTPException, Request, status +from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request, status from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse, StreamingResponse from ...nix import nix_build, nix_eval -from ..schemas import VmConfig, VmInspectResponse +from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse +from ..task_manager import BaseTask, get_task, register_task +log = logging.getLogger(__name__) router = APIRouter() +def nix_inspect_vm_cmd(machine: str, flake_url: str) -> list[str]: + return nix_eval( + [ + f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config" + ] + ) + + +def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]: + return nix_build( + [ + f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm" + ] + ) + + class NixBuildException(HTTPException): def __init__(self, msg: str, loc: list = ["body", "flake_attr"]): detail = [ @@ -27,36 +47,50 @@ class NixBuildException(HTTPException): ) +class BuildVmTask(BaseTask): + def __init__(self, uuid: UUID, vm: VmConfig) -> None: + super().__init__(uuid) + self.vm = vm + + def run(self) -> None: + try: + self.log.debug(f"BuildVM with uuid {self.uuid} started") + cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url) + + proc = self.run_cmd(cmd) + self.log.debug(f"stdout: {proc.stdout}") + + vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" + self.log.debug(f"vm_path: {vm_path}") + + self.run_cmd([vm_path]) + self.finished = True + except Exception as e: + self.failed = True + self.finished = True + log.exception(e) + + def nix_build_exception_handler( request: Request, exc: NixBuildException ) -> JSONResponse: + log.error("NixBuildException: %s", exc) return JSONResponse( status_code=exc.status_code, content=jsonable_encoder(dict(detail=exc.detail)), ) -def nix_inspect_vm(machine: str, flake_url: str) -> list[str]: - return nix_eval( - [ - f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config" - ] - ) - - -def nix_build_vm(machine: str, flake_url: str) -> list[str]: - return nix_build( - [ - f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm" - ] - ) - - +################################## +# # +# ======== VM ROUTES ======== # +# # +################################## @router.post("/api/vms/inspect") async def inspect_vm( flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()] ) -> VmInspectResponse: - cmd = nix_inspect_vm(flake_attr, flake_url=flake_url) + cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url) proc = await asyncio.create_subprocess_exec( cmd[0], *cmd[1:], @@ -81,33 +115,43 @@ command output: ) -async def vm_build(vm: VmConfig) -> AsyncIterator[str]: - cmd = nix_build_vm(vm.flake_attr, flake_url=vm.flake_url) - proc = await asyncio.create_subprocess_exec( - cmd[0], - *cmd[1:], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, +@router.get("/api/vms/{uuid}/status") +async def get_status(uuid: UUID) -> VmStatusResponse: + task = get_task(uuid) + return VmStatusResponse(running=not task.finished, status=0) + + +@router.get("/api/vms/{uuid}/logs") +async def get_logs(uuid: UUID) -> StreamingResponse: + # Generator function that yields log lines as they are available + def stream_logs() -> Iterator[str]: + task = get_task(uuid) + + for proc in task.procs: + if proc.done: + log.debug("stream logs and proc is done") + for line in proc.stderr: + yield line + "\n" + for line in proc.stdout: + yield line + "\n" + continue + while True: + out = proc.output + line = out.get() + if line is None: + log.debug("stream logs and line is None") + break + yield line + + return StreamingResponse( + content=stream_logs(), + media_type="text/plain", ) - assert proc.stdout is not None and proc.stderr is not None - async for line in proc.stdout: - yield line.decode("utf-8", "ignore") - stderr = "" - async for line in proc.stderr: - stderr += line.decode("utf-8", "ignore") - res = await proc.wait() - if res != 0: - raise NixBuildException( - f""" -Failed to build vm from '{vm.flake_url}#{vm.flake_attr}'. -command: {shlex.join(cmd)} -exit code: {res} -command output: -{stderr} - """ - ) @router.post("/api/vms/create") -async def create_vm(vm: Annotated[VmConfig, Body()]) -> StreamingResponse: - return StreamingResponse(vm_build(vm)) +async def create_vm( + vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks +) -> VmCreateResponse: + uuid = register_task(BuildVmTask, vm) + return VmCreateResponse(uuid=str(uuid)) diff --git a/pkgs/clan-cli/clan_cli/webui/schemas.py b/pkgs/clan-cli/clan_cli/webui/schemas.py index 11a603718..874e18aba 100644 --- a/pkgs/clan-cli/clan_cli/webui/schemas.py +++ b/pkgs/clan-cli/clan_cli/webui/schemas.py @@ -44,6 +44,15 @@ class VmConfig(BaseModel): graphics: bool +class VmStatusResponse(BaseModel): + status: int + running: bool + + +class VmCreateResponse(BaseModel): + uuid: str + + class VmInspectResponse(BaseModel): config: VmConfig diff --git a/pkgs/clan-cli/clan_cli/webui/server.py b/pkgs/clan-cli/clan_cli/webui/server.py index 1b7164f30..8d67d5a45 100644 --- a/pkgs/clan-cli/clan_cli/webui/server.py +++ b/pkgs/clan-cli/clan_cli/webui/server.py @@ -12,7 +12,7 @@ from typing import Iterator # XXX: can we dynamically load this using nix develop? from uvicorn import run -logger = logging.getLogger(__name__) +log = logging.getLogger(__name__) def defer_open_browser(base_url: str) -> None: @@ -27,7 +27,7 @@ def defer_open_browser(base_url: str) -> None: @contextmanager def spawn_node_dev_server(host: str, port: int) -> Iterator[None]: - logger.info("Starting node dev server...") + log.info("Starting node dev server...") path = Path(__file__).parent.parent.parent.parent / "ui" with subprocess.Popen( [ @@ -87,5 +87,6 @@ def start_server(args: argparse.Namespace) -> None: port=args.port, log_level=args.log_level, reload=args.reload, + access_log=args.log_level == "debug", headers=headers, ) diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py new file mode 100644 index 000000000..21374cb55 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -0,0 +1,119 @@ +import logging +import os +import queue +import select +import shlex +import subprocess +import threading +from typing import Any +from uuid import UUID, uuid4 + + +class CmdState: + def __init__(self, proc: subprocess.Popen) -> None: + global LOOP + self.proc: subprocess.Popen = proc + self.stdout: list[str] = [] + self.stderr: list[str] = [] + self.output: queue.SimpleQueue = queue.SimpleQueue() + self.returncode: int | None = None + self.done: bool = False + + +class BaseTask(threading.Thread): + def __init__(self, uuid: UUID) -> None: + # calling parent class constructor + threading.Thread.__init__(self) + + # constructor + self.uuid: UUID = uuid + self.log = logging.getLogger(__name__) + self.procs: list[CmdState] = [] + self.failed: bool = False + self.finished: bool = False + + def run(self) -> None: + self.finished = True + + def run_cmd(self, cmd: list[str]) -> CmdState: + cwd = os.getcwd() + self.log.debug(f"Working directory: {cwd}") + self.log.debug(f"Running command: {shlex.join(cmd)}") + p = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + # shell=True, + cwd=cwd, + ) + self.procs.append(CmdState(p)) + p_state = self.procs[-1] + + while p.poll() is None: + # Check if stderr is ready to be read from + rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0) + if p.stderr in rlist: + assert p.stderr is not None + line = p.stderr.readline() + if line != "": + p_state.stderr.append(line.strip("\n")) + self.log.debug(f"stderr: {line}") + p_state.output.put(line) + + if p.stdout in rlist: + assert p.stdout is not None + line = p.stdout.readline() + if line != "": + p_state.stdout.append(line.strip("\n")) + self.log.debug(f"stdout: {line}") + p_state.output.put(line) + + p_state.returncode = p.returncode + p_state.output.put(None) + p_state.done = True + + if p.returncode != 0: + raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}") + + self.log.debug("Successfully ran command") + return p_state + + +class TaskPool: + def __init__(self) -> None: + self.lock: threading.RLock = threading.RLock() + self.pool: dict[UUID, BaseTask] = {} + + def __getitem__(self, uuid: UUID) -> BaseTask: + with self.lock: + return self.pool[uuid] + + def __setitem__(self, uuid: UUID, task: BaseTask) -> None: + with self.lock: + if uuid in self.pool: + raise KeyError(f"Task with uuid {uuid} already exists") + if type(uuid) is not UUID: + raise TypeError("uuid must be of type UUID") + self.pool[uuid] = task + + +POOL: TaskPool = TaskPool() + + +def get_task(uuid: UUID) -> BaseTask: + global POOL + return POOL[uuid] + + +def register_task(task: type, *args: Any) -> UUID: + global POOL + if not issubclass(task, BaseTask): + raise TypeError("task must be a subclass of BaseTask") + + uuid = uuid4() + + inst_task = task(uuid, *args) + POOL[uuid] = inst_task + inst_task.start() + return uuid diff --git a/pkgs/clan-cli/tests/test_flake_with_core/flake.nix b/pkgs/clan-cli/tests/test_flake_with_core/flake.nix index 133723504..c4e03bd72 100644 --- a/pkgs/clan-cli/tests/test_flake_with_core/flake.nix +++ b/pkgs/clan-cli/tests/test_flake_with_core/flake.nix @@ -17,6 +17,16 @@ system.stateVersion = lib.version; clan.networking.zerotier.controller.enable = true; + + systemd.services.shutdown-after-boot = { + enable = true; + wantedBy = [ "multi-user.target" ]; + after = [ "multi-user.target" ]; + script = '' + #!/usr/bin/env bash + shutdown -h now + ''; + }; }; }; }; diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 8935e6cc0..2939bc59e 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -2,24 +2,25 @@ from pathlib import Path import pytest from api import TestClient +from httpx import SyncByteStream - -@pytest.mark.impure -def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: - response = api.post( - "/api/vms/inspect", - json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), - ) - assert response.status_code == 200, "Failed to inspect vm" - config = response.json()["config"] - assert config.get("flake_attr") == "vm1" - assert config.get("cores") == 1 - assert config.get("memory_size") == 1024 - assert config.get("graphics") is True +# @pytest.mark.impure +# def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: +# response = api.post( +# "/api/vms/inspect", +# json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), +# ) +# assert response.status_code == 200, "Failed to inspect vm" +# config = response.json()["config"] +# assert config.get("flake_attr") == "vm1" +# assert config.get("cores") == 1 +# assert config.get("memory_size") == 1024 +# assert config.get("graphics") is True @pytest.mark.impure def test_create(api: TestClient, test_flake_with_core: Path) -> None: + print(f"flake_url: {test_flake_with_core} ") response = api.post( "/api/vms/create", json=dict( @@ -30,4 +31,29 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: graphics=True, ), ) - assert response.status_code == 200, "Failed to inspect vm" + assert response.status_code == 200, "Failed to create vm" + + uuid = response.json()["uuid"] + assert len(uuid) == 36 + assert uuid.count("-") == 4 + + response = api.get(f"/api/vms/{uuid}/status") + assert response.status_code == 200, "Failed to get vm status" + + response = api.get(f"/api/vms/{uuid}/logs") + print("=========FLAKE LOGS==========") + assert isinstance(response.stream, SyncByteStream) + for line in response.stream: + assert line != b"", "Failed to get vm logs" + print(line.decode("utf-8"), end="") + print("=========END LOGS==========") + assert response.status_code == 200, "Failed to get vm logs" + + response = api.get(f"/api/vms/{uuid}/logs") + assert isinstance(response.stream, SyncByteStream) + print("=========VM LOGS==========") + for line in response.stream: + assert line != b"", "Failed to get vm logs" + print(line.decode("utf-8"), end="") + print("=========END LOGS==========") + assert response.status_code == 200, "Failed to get vm logs"