Merge pull request 'api/vm/create: start vm' (#327) from lassulus-start-vm into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/327
This commit is contained in:
17
pkgs/clan-cli/.vscode/launch.json
vendored
Normal file
17
pkgs/clan-cli/.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
// Use IntelliSense to learn about possible attributes.
|
||||
// Hover to view descriptions of existing attributes.
|
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Clan Webui",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"module": "clan_cli.webui",
|
||||
"justMyCode": false,
|
||||
"args": [ "--reload", "--no-open", "--log-level", "debug" ],
|
||||
|
||||
}
|
||||
]
|
||||
}
|
||||
15
pkgs/clan-cli/.vscode/settings.json
vendored
Normal file
15
pkgs/clan-cli/.vscode/settings.json
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
{
|
||||
"python.testing.pytestArgs": [
|
||||
// Coverage is not supported by vscode:
|
||||
// https://github.com/Microsoft/vscode-python/issues/693
|
||||
// Note that this will make pytest fail if pytest-cov is not installed,
|
||||
// if that's the case, then this option needs to be be removed (overrides
|
||||
// can be set at a workspace level, it's up to you to decide what's the
|
||||
// best approach). You might also prefer to only set this option
|
||||
// per-workspace (wherever coverage is used).
|
||||
"--no-cov",
|
||||
"tests"
|
||||
],
|
||||
"python.testing.unittestEnabled": false,
|
||||
"python.testing.pytestEnabled": true,
|
||||
}
|
||||
@@ -28,6 +28,32 @@ To start a local developement environment instead, use the `--dev` flag:
|
||||
|
||||
This will spawn two webserver, a python one to for the api and a nodejs one that rebuilds the ui on the fly.
|
||||
|
||||
## Run webui directly
|
||||
|
||||
Useful for vscode run and debug option
|
||||
|
||||
```bash
|
||||
python -m clan_cli.webui --reload --no-open
|
||||
```
|
||||
|
||||
Add this `launch.json` to your .vscode directory to have working breakpoints in your vscode editor.
|
||||
|
||||
```json
|
||||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Clan Webui",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"module": "clan_cli.webui",
|
||||
"justMyCode": true,
|
||||
"args": ["--reload", "--no-open", "--log-level", "debug"]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Run locally single-threaded for debugging
|
||||
|
||||
By default tests run in parallel using pytest-parallel.
|
||||
|
||||
37
pkgs/clan-cli/clan_cli/custom_logger.py
Normal file
37
pkgs/clan-cli/clan_cli/custom_logger.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
grey = "\x1b[38;20m"
|
||||
yellow = "\x1b[33;20m"
|
||||
red = "\x1b[31;20m"
|
||||
bold_red = "\x1b[31;1m"
|
||||
green = "\u001b[32m"
|
||||
blue = "\u001b[34m"
|
||||
|
||||
|
||||
def get_formatter(color: str) -> logging.Formatter:
|
||||
reset = "\x1b[0m"
|
||||
return logging.Formatter(
|
||||
f"{color}%(levelname)s{reset}:(%(filename)s:%(lineno)d): %(message)s"
|
||||
)
|
||||
|
||||
|
||||
FORMATTER = {
|
||||
logging.DEBUG: get_formatter(blue),
|
||||
logging.INFO: get_formatter(green),
|
||||
logging.WARNING: get_formatter(yellow),
|
||||
logging.ERROR: get_formatter(red),
|
||||
logging.CRITICAL: get_formatter(bold_red),
|
||||
}
|
||||
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
def format(self, record: Any) -> str:
|
||||
return FORMATTER[record.levelno].format(record)
|
||||
|
||||
|
||||
def register(level: Any) -> None:
|
||||
ch = logging.StreamHandler()
|
||||
ch.setLevel(level)
|
||||
ch.setFormatter(CustomFormatter())
|
||||
logging.basicConfig(level=level, handlers=[ch])
|
||||
@@ -1,14 +1,19 @@
|
||||
import logging
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.routing import APIRoute
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
from .. import custom_logger
|
||||
from .assets import asset_path
|
||||
from .routers import flake, health, machines, root, vms
|
||||
|
||||
origins = [
|
||||
"http://localhost:3000",
|
||||
]
|
||||
# Logging setup
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_app() -> FastAPI:
|
||||
@@ -23,8 +28,11 @@ def setup_app() -> FastAPI:
|
||||
app.include_router(flake.router)
|
||||
app.include_router(health.router)
|
||||
app.include_router(machines.router)
|
||||
app.include_router(root.router)
|
||||
app.include_router(vms.router)
|
||||
|
||||
# Needs to be last in register. Because of wildcard route
|
||||
app.include_router(root.router)
|
||||
|
||||
app.add_exception_handler(vms.NixBuildException, vms.nix_build_exception_handler)
|
||||
|
||||
app.mount("/static", StaticFiles(directory=asset_path()), name="static")
|
||||
@@ -32,7 +40,16 @@ def setup_app() -> FastAPI:
|
||||
for route in app.routes:
|
||||
if isinstance(route, APIRoute):
|
||||
route.operation_id = route.name # in this case, 'read_items'
|
||||
log.debug(f"Registered route: {route}")
|
||||
return app
|
||||
|
||||
|
||||
# TODO: How do I get the log level from the command line in here?
|
||||
custom_logger.register(logging.DEBUG)
|
||||
app = setup_app()
|
||||
|
||||
for i in app.exception_handlers.items():
|
||||
log.info(f"Registered exception handler: {i}")
|
||||
|
||||
log.warn("log warn")
|
||||
log.debug("log debug")
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# Logging setup
|
||||
import logging
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Body
|
||||
@@ -19,6 +21,7 @@ from ..schemas import (
|
||||
Status,
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@@ -38,7 +41,7 @@ async def create_machine(machine: Annotated[MachineCreate, Body()]) -> MachineRe
|
||||
|
||||
@router.get("/api/machines/{name}")
|
||||
async def get_machine(name: str) -> MachineResponse:
|
||||
print("TODO")
|
||||
log.error("TODO")
|
||||
return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN))
|
||||
|
||||
|
||||
|
||||
@@ -1,18 +1,38 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import shlex
|
||||
from typing import Annotated, AsyncIterator
|
||||
from typing import Annotated, Iterator
|
||||
from uuid import UUID
|
||||
|
||||
from fastapi import APIRouter, Body, HTTPException, Request, status
|
||||
from fastapi import APIRouter, BackgroundTasks, Body, HTTPException, Request, status
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
|
||||
from ...nix import nix_build, nix_eval
|
||||
from ..schemas import VmConfig, VmInspectResponse
|
||||
from ..schemas import VmConfig, VmCreateResponse, VmInspectResponse, VmStatusResponse
|
||||
from ..task_manager import BaseTask, get_task, register_task
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def nix_inspect_vm_cmd(machine: str, flake_url: str) -> list[str]:
|
||||
return nix_eval(
|
||||
[
|
||||
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config"
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]:
|
||||
return nix_build(
|
||||
[
|
||||
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm"
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class NixBuildException(HTTPException):
|
||||
def __init__(self, msg: str, loc: list = ["body", "flake_attr"]):
|
||||
detail = [
|
||||
@@ -27,36 +47,50 @@ class NixBuildException(HTTPException):
|
||||
)
|
||||
|
||||
|
||||
class BuildVmTask(BaseTask):
|
||||
def __init__(self, uuid: UUID, vm: VmConfig) -> None:
|
||||
super().__init__(uuid)
|
||||
self.vm = vm
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.log.debug(f"BuildVM with uuid {self.uuid} started")
|
||||
cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url)
|
||||
|
||||
proc = self.run_cmd(cmd)
|
||||
self.log.debug(f"stdout: {proc.stdout}")
|
||||
|
||||
vm_path = f"{''.join(proc.stdout[0])}/bin/run-nixos-vm"
|
||||
self.log.debug(f"vm_path: {vm_path}")
|
||||
|
||||
self.run_cmd([vm_path])
|
||||
self.finished = True
|
||||
except Exception as e:
|
||||
self.failed = True
|
||||
self.finished = True
|
||||
log.exception(e)
|
||||
|
||||
|
||||
def nix_build_exception_handler(
|
||||
request: Request, exc: NixBuildException
|
||||
) -> JSONResponse:
|
||||
log.error("NixBuildException: %s", exc)
|
||||
return JSONResponse(
|
||||
status_code=exc.status_code,
|
||||
content=jsonable_encoder(dict(detail=exc.detail)),
|
||||
)
|
||||
|
||||
|
||||
def nix_inspect_vm(machine: str, flake_url: str) -> list[str]:
|
||||
return nix_eval(
|
||||
[
|
||||
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config"
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def nix_build_vm(machine: str, flake_url: str) -> list[str]:
|
||||
return nix_build(
|
||||
[
|
||||
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm"
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
##################################
|
||||
# #
|
||||
# ======== VM ROUTES ======== #
|
||||
# #
|
||||
##################################
|
||||
@router.post("/api/vms/inspect")
|
||||
async def inspect_vm(
|
||||
flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()]
|
||||
) -> VmInspectResponse:
|
||||
cmd = nix_inspect_vm(flake_attr, flake_url=flake_url)
|
||||
cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
cmd[0],
|
||||
*cmd[1:],
|
||||
@@ -81,33 +115,43 @@ command output:
|
||||
)
|
||||
|
||||
|
||||
async def vm_build(vm: VmConfig) -> AsyncIterator[str]:
|
||||
cmd = nix_build_vm(vm.flake_attr, flake_url=vm.flake_url)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
cmd[0],
|
||||
*cmd[1:],
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
assert proc.stdout is not None and proc.stderr is not None
|
||||
async for line in proc.stdout:
|
||||
yield line.decode("utf-8", "ignore")
|
||||
stderr = ""
|
||||
async for line in proc.stderr:
|
||||
stderr += line.decode("utf-8", "ignore")
|
||||
res = await proc.wait()
|
||||
if res != 0:
|
||||
raise NixBuildException(
|
||||
f"""
|
||||
Failed to build vm from '{vm.flake_url}#{vm.flake_attr}'.
|
||||
command: {shlex.join(cmd)}
|
||||
exit code: {res}
|
||||
command output:
|
||||
{stderr}
|
||||
"""
|
||||
@router.get("/api/vms/{uuid}/status")
|
||||
async def get_status(uuid: UUID) -> VmStatusResponse:
|
||||
task = get_task(uuid)
|
||||
return VmStatusResponse(running=not task.finished, status=0)
|
||||
|
||||
|
||||
@router.get("/api/vms/{uuid}/logs")
|
||||
async def get_logs(uuid: UUID) -> StreamingResponse:
|
||||
# Generator function that yields log lines as they are available
|
||||
def stream_logs() -> Iterator[str]:
|
||||
task = get_task(uuid)
|
||||
|
||||
for proc in task.procs:
|
||||
if proc.done:
|
||||
log.debug("stream logs and proc is done")
|
||||
for line in proc.stderr:
|
||||
yield line + "\n"
|
||||
for line in proc.stdout:
|
||||
yield line + "\n"
|
||||
continue
|
||||
while True:
|
||||
out = proc.output
|
||||
line = out.get()
|
||||
if line is None:
|
||||
log.debug("stream logs and line is None")
|
||||
break
|
||||
yield line
|
||||
|
||||
return StreamingResponse(
|
||||
content=stream_logs(),
|
||||
media_type="text/plain",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/vms/create")
|
||||
async def create_vm(vm: Annotated[VmConfig, Body()]) -> StreamingResponse:
|
||||
return StreamingResponse(vm_build(vm))
|
||||
async def create_vm(
|
||||
vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks
|
||||
) -> VmCreateResponse:
|
||||
uuid = register_task(BuildVmTask, vm)
|
||||
return VmCreateResponse(uuid=str(uuid))
|
||||
|
||||
@@ -44,6 +44,15 @@ class VmConfig(BaseModel):
|
||||
graphics: bool
|
||||
|
||||
|
||||
class VmStatusResponse(BaseModel):
|
||||
status: int
|
||||
running: bool
|
||||
|
||||
|
||||
class VmCreateResponse(BaseModel):
|
||||
uuid: str
|
||||
|
||||
|
||||
class VmInspectResponse(BaseModel):
|
||||
config: VmConfig
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ from typing import Iterator
|
||||
# XXX: can we dynamically load this using nix develop?
|
||||
from uvicorn import run
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def defer_open_browser(base_url: str) -> None:
|
||||
@@ -27,7 +27,7 @@ def defer_open_browser(base_url: str) -> None:
|
||||
|
||||
@contextmanager
|
||||
def spawn_node_dev_server(host: str, port: int) -> Iterator[None]:
|
||||
logger.info("Starting node dev server...")
|
||||
log.info("Starting node dev server...")
|
||||
path = Path(__file__).parent.parent.parent.parent / "ui"
|
||||
with subprocess.Popen(
|
||||
[
|
||||
@@ -87,5 +87,6 @@ def start_server(args: argparse.Namespace) -> None:
|
||||
port=args.port,
|
||||
log_level=args.log_level,
|
||||
reload=args.reload,
|
||||
access_log=args.log_level == "debug",
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
119
pkgs/clan-cli/clan_cli/webui/task_manager.py
Normal file
119
pkgs/clan-cli/clan_cli/webui/task_manager.py
Normal file
@@ -0,0 +1,119 @@
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import select
|
||||
import shlex
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
|
||||
class CmdState:
|
||||
def __init__(self, proc: subprocess.Popen) -> None:
|
||||
global LOOP
|
||||
self.proc: subprocess.Popen = proc
|
||||
self.stdout: list[str] = []
|
||||
self.stderr: list[str] = []
|
||||
self.output: queue.SimpleQueue = queue.SimpleQueue()
|
||||
self.returncode: int | None = None
|
||||
self.done: bool = False
|
||||
|
||||
|
||||
class BaseTask(threading.Thread):
|
||||
def __init__(self, uuid: UUID) -> None:
|
||||
# calling parent class constructor
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
# constructor
|
||||
self.uuid: UUID = uuid
|
||||
self.log = logging.getLogger(__name__)
|
||||
self.procs: list[CmdState] = []
|
||||
self.failed: bool = False
|
||||
self.finished: bool = False
|
||||
|
||||
def run(self) -> None:
|
||||
self.finished = True
|
||||
|
||||
def run_cmd(self, cmd: list[str]) -> CmdState:
|
||||
cwd = os.getcwd()
|
||||
self.log.debug(f"Working directory: {cwd}")
|
||||
self.log.debug(f"Running command: {shlex.join(cmd)}")
|
||||
p = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
encoding="utf-8",
|
||||
# shell=True,
|
||||
cwd=cwd,
|
||||
)
|
||||
self.procs.append(CmdState(p))
|
||||
p_state = self.procs[-1]
|
||||
|
||||
while p.poll() is None:
|
||||
# Check if stderr is ready to be read from
|
||||
rlist, _, _ = select.select([p.stderr, p.stdout], [], [], 0)
|
||||
if p.stderr in rlist:
|
||||
assert p.stderr is not None
|
||||
line = p.stderr.readline()
|
||||
if line != "":
|
||||
p_state.stderr.append(line.strip("\n"))
|
||||
self.log.debug(f"stderr: {line}")
|
||||
p_state.output.put(line)
|
||||
|
||||
if p.stdout in rlist:
|
||||
assert p.stdout is not None
|
||||
line = p.stdout.readline()
|
||||
if line != "":
|
||||
p_state.stdout.append(line.strip("\n"))
|
||||
self.log.debug(f"stdout: {line}")
|
||||
p_state.output.put(line)
|
||||
|
||||
p_state.returncode = p.returncode
|
||||
p_state.output.put(None)
|
||||
p_state.done = True
|
||||
|
||||
if p.returncode != 0:
|
||||
raise RuntimeError(f"Failed to run command: {shlex.join(cmd)}")
|
||||
|
||||
self.log.debug("Successfully ran command")
|
||||
return p_state
|
||||
|
||||
|
||||
class TaskPool:
|
||||
def __init__(self) -> None:
|
||||
self.lock: threading.RLock = threading.RLock()
|
||||
self.pool: dict[UUID, BaseTask] = {}
|
||||
|
||||
def __getitem__(self, uuid: UUID) -> BaseTask:
|
||||
with self.lock:
|
||||
return self.pool[uuid]
|
||||
|
||||
def __setitem__(self, uuid: UUID, task: BaseTask) -> None:
|
||||
with self.lock:
|
||||
if uuid in self.pool:
|
||||
raise KeyError(f"Task with uuid {uuid} already exists")
|
||||
if type(uuid) is not UUID:
|
||||
raise TypeError("uuid must be of type UUID")
|
||||
self.pool[uuid] = task
|
||||
|
||||
|
||||
POOL: TaskPool = TaskPool()
|
||||
|
||||
|
||||
def get_task(uuid: UUID) -> BaseTask:
|
||||
global POOL
|
||||
return POOL[uuid]
|
||||
|
||||
|
||||
def register_task(task: type, *args: Any) -> UUID:
|
||||
global POOL
|
||||
if not issubclass(task, BaseTask):
|
||||
raise TypeError("task must be a subclass of BaseTask")
|
||||
|
||||
uuid = uuid4()
|
||||
|
||||
inst_task = task(uuid, *args)
|
||||
POOL[uuid] = inst_task
|
||||
inst_task.start()
|
||||
return uuid
|
||||
@@ -17,6 +17,16 @@
|
||||
system.stateVersion = lib.version;
|
||||
|
||||
clan.networking.zerotier.controller.enable = true;
|
||||
|
||||
systemd.services.shutdown-after-boot = {
|
||||
enable = true;
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
after = [ "multi-user.target" ];
|
||||
script = ''
|
||||
#!/usr/bin/env bash
|
||||
shutdown -h now
|
||||
'';
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
@@ -2,24 +2,25 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from api import TestClient
|
||||
from httpx import SyncByteStream
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_inspect(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
response = api.post(
|
||||
"/api/vms/inspect",
|
||||
json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"),
|
||||
)
|
||||
assert response.status_code == 200, "Failed to inspect vm"
|
||||
config = response.json()["config"]
|
||||
assert config.get("flake_attr") == "vm1"
|
||||
assert config.get("cores") == 1
|
||||
assert config.get("memory_size") == 1024
|
||||
assert config.get("graphics") is True
|
||||
# @pytest.mark.impure
|
||||
# def test_inspect(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
# response = api.post(
|
||||
# "/api/vms/inspect",
|
||||
# json=dict(flake_url=str(test_flake_with_core), flake_attr="vm1"),
|
||||
# )
|
||||
# assert response.status_code == 200, "Failed to inspect vm"
|
||||
# config = response.json()["config"]
|
||||
# assert config.get("flake_attr") == "vm1"
|
||||
# assert config.get("cores") == 1
|
||||
# assert config.get("memory_size") == 1024
|
||||
# assert config.get("graphics") is True
|
||||
|
||||
|
||||
@pytest.mark.impure
|
||||
def test_create(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
print(f"flake_url: {test_flake_with_core} ")
|
||||
response = api.post(
|
||||
"/api/vms/create",
|
||||
json=dict(
|
||||
@@ -30,4 +31,29 @@ def test_create(api: TestClient, test_flake_with_core: Path) -> None:
|
||||
graphics=True,
|
||||
),
|
||||
)
|
||||
assert response.status_code == 200, "Failed to inspect vm"
|
||||
assert response.status_code == 200, "Failed to create vm"
|
||||
|
||||
uuid = response.json()["uuid"]
|
||||
assert len(uuid) == 36
|
||||
assert uuid.count("-") == 4
|
||||
|
||||
response = api.get(f"/api/vms/{uuid}/status")
|
||||
assert response.status_code == 200, "Failed to get vm status"
|
||||
|
||||
response = api.get(f"/api/vms/{uuid}/logs")
|
||||
print("=========FLAKE LOGS==========")
|
||||
assert isinstance(response.stream, SyncByteStream)
|
||||
for line in response.stream:
|
||||
assert line != b"", "Failed to get vm logs"
|
||||
print(line.decode("utf-8"), end="")
|
||||
print("=========END LOGS==========")
|
||||
assert response.status_code == 200, "Failed to get vm logs"
|
||||
|
||||
response = api.get(f"/api/vms/{uuid}/logs")
|
||||
assert isinstance(response.stream, SyncByteStream)
|
||||
print("=========VM LOGS==========")
|
||||
for line in response.stream:
|
||||
assert line != b"", "Failed to get vm logs"
|
||||
print(line.decode("utf-8"), end="")
|
||||
print("=========END LOGS==========")
|
||||
assert response.status_code == 200, "Failed to get vm logs"
|
||||
|
||||
Reference in New Issue
Block a user