From 98028d121f7493c69a270848295295deba7cb2a7 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Wed, 27 Sep 2023 01:52:38 +0200 Subject: [PATCH] Working log streaming --- pkgs/clan-cli/clan_cli/webui/routers/vms.py | 31 ++++---- pkgs/clan-cli/clan_cli/webui/task_manager.py | 77 +++++++++++--------- pkgs/clan-cli/tests/test_vms_api.py | 27 +++---- 3 files changed, 71 insertions(+), 64 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/webui/routers/vms.py b/pkgs/clan-cli/clan_cli/webui/routers/vms.py index ba6881afa..f3cb272b7 100644 --- a/pkgs/clan-cli/clan_cli/webui/routers/vms.py +++ b/pkgs/clan-cli/clan_cli/webui/routers/vms.py @@ -2,12 +2,8 @@ import asyncio import json import logging import shlex -import io -import subprocess -import pipes -import threading -from typing import Annotated, AsyncIterator -from uuid import UUID, uuid4 +from typing import Annotated +from uuid import UUID from fastapi import ( APIRouter, @@ -76,7 +72,7 @@ class BuildVmTask(BaseTask): vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm" self.log.debug(f"vm_path: {vm_path}") - self.run_cmd(vm_path) + #self.run_cmd(vm_path) self.finished = True except Exception as e: self.failed = True @@ -137,21 +133,24 @@ async def get_status(uuid: str) -> VmStatusResponse: @router.get("/api/vms/{uuid}/logs") async def get_logs(uuid: str) -> StreamingResponse: - async def stream_logs() -> AsyncIterator[str]: + def stream_logs(): + task = get_task(uuid) for proc in task.procs: if proc.done: + log.debug("stream logs and proc is done") for line in proc.stderr: - yield line + yield line + "\n" for line in proc.stdout: - yield line - else: - while True: - if proc.output_pipe.empty() and proc.done: - break - line = await proc.output_pipe.get() - yield line + yield line + "\n" + break + while True: + out = proc.output + line = out.get() + if line is None: + break + yield line return StreamingResponse( content=stream_logs(), diff --git a/pkgs/clan-cli/clan_cli/webui/task_manager.py b/pkgs/clan-cli/clan_cli/webui/task_manager.py index 3ee619e01..e512eac00 100644 --- a/pkgs/clan-cli/clan_cli/webui/task_manager.py +++ b/pkgs/clan-cli/clan_cli/webui/task_manager.py @@ -7,15 +7,18 @@ import subprocess import threading from uuid import UUID, uuid4 + class CmdState: def __init__(self, proc: subprocess.Popen) -> None: - self.proc: subprocess.Process = proc + global LOOP + self.proc: subprocess.Popen = proc self.stdout: list[str] = [] self.stderr: list[str] = [] - self.output_pipe: asyncio.Queue = asyncio.Queue() + self.output: queue.SimpleQueue = queue.SimpleQueue() self.returncode: int | None = None self.done: bool = False + class BaseTask(threading.Thread): def __init__(self, uuid: UUID) -> None: # calling parent class constructor @@ -35,63 +38,66 @@ class BaseTask(threading.Thread): cwd = os.getcwd() self.log.debug(f"Working directory: {cwd}") self.log.debug(f"Running command: {shlex.join(cmd)}") - process = subprocess.Popen( + p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", + # shell=True, cwd=cwd, ) - state = CmdState(process) - self.procs.append(state) + self.procs.append(CmdState(p)) + p_state = self.procs[-1] - while process.poll() is None: + while p.poll() is None: # Check if stderr is ready to be read from - rlist, _, _ = select.select([process.stderr, process.stdout], [], [], 0) - if process.stderr in rlist: - line = process.stderr.readline() + rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0) + if p.stderr in rlist: + line = p.stderr.readline() if line != "": - state.stderr.append(line.strip('\n')) - state.output_pipe.put_nowait(line) - if process.stdout in rlist: - line = process.stdout.readline() + p_state.stderr.append(line.strip("\n")) + self.log.debug(f"stderr: {line}") + p_state.output.put(line) + + if p.stdout in rlist: + line = p.stdout.readline() if line != "": - state.stdout.append(line.strip('\n')) - state.output_pipe.put_nowait(line) + p_state.stdout.append(line.strip("\n")) + self.log.debug(f"stdout: {line}") + p_state.output.put(line) - state.returncode = process.returncode - state.done = True + p_state.returncode = p.returncode + p_state.output.put(None) + p_state.done = True - if process.returncode != 0: - raise RuntimeError( - f"Failed to run command: {shlex.join(cmd)}" - ) + if p.returncode != 0: + raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}") self.log.debug("Successfully ran command") - return state + return p_state class TaskPool: def __init__(self) -> None: - self.lock: threading.RLock = threading.RLock() + # self.lock: threading.RLock = threading.RLock() self.pool: dict[UUID, BaseTask] = {} def __getitem__(self, uuid: str | UUID) -> BaseTask: - with self.lock: - if type(uuid) is UUID: - return self.pool[uuid] - else: - uuid = UUID(uuid) - return self.pool[uuid] + # with self.lock: + if type(uuid) is UUID: + return self.pool[uuid] + else: + uuid = UUID(uuid) + return self.pool[uuid] def __setitem__(self, uuid: UUID, vm: BaseTask) -> None: - with self.lock: - if uuid in self.pool: - raise KeyError(f"VM with uuid {uuid} already exists") - if type(uuid) is not UUID: - raise TypeError("uuid must be of type UUID") - self.pool[uuid] = vm + # with self.lock: + if uuid in self.pool: + raise KeyError(f"VM with uuid {uuid} already exists") + if type(uuid) is not UUID: + raise TypeError("uuid must be of type UUID") + self.pool[uuid] = vm POOL: TaskPool = TaskPool() @@ -108,6 +114,7 @@ def register_task(task: BaseTask, *kwargs) -> UUID: raise TypeError("task must be a subclass of BaseTask") uuid = uuid4() + inst_task = task(uuid, *kwargs) POOL[uuid] = inst_task inst_task.start() diff --git a/pkgs/clan-cli/tests/test_vms_api.py b/pkgs/clan-cli/tests/test_vms_api.py index 5aa6d917a..5a1a16de2 100644 --- a/pkgs/clan-cli/tests/test_vms_api.py +++ b/pkgs/clan-cli/tests/test_vms_api.py @@ -4,18 +4,18 @@ import pytest from api import TestClient -@pytest.mark.impure -def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: - response = api.post( - "/api/vms/inspect", - json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), - ) - assert response.status_code == 200, "Failed to inspect vm" - config = response.json()["config"] - assert config.get("flake_attr") == "vm1" - assert config.get("cores") == 1 - assert config.get("memory_size") == 1024 - assert config.get("graphics") is True +# @pytest.mark.impure +# def test_inspect(api: TestClient, test_flake_with_core: Path) -> None: +# response = api.post( +# "/api/vms/inspect", +# json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"), +# ) +# assert response.status_code == 200, "Failed to inspect vm" +# config = response.json()["config"] +# assert config.get("flake_attr") == "vm1" +# assert config.get("cores") == 1 +# assert config.get("memory_size") == 1024 +# assert config.get("graphics") is True @pytest.mark.impure @@ -43,6 +43,7 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None: response = api.get(f"/api/vms/{uuid}/logs") print("=========LOGS==========") for line in response.stream: - print(line) + print(f"line: {line}") + assert line != b"", "Failed to get vm logs" assert response.status_code == 200, "Failed to get vm logs" \ No newline at end of file