Added threaded create_vm endpoint

This commit is contained in:
Qubasa
2023-09-25 16:28:32 +02:00
committed by Mic92
parent 7479fca82b
commit d16bb5db26
8 changed files with 120 additions and 89 deletions

View File

@@ -1,3 +1,5 @@
# Logging setup
import logging
from typing import Annotated
from fastapi import APIRouter, Body
@@ -19,10 +21,7 @@ from ..schemas import (
Status,
)
# Logging setup
import logging
log = logging.getLogger(__name__)
router = APIRouter()
@@ -42,7 +41,7 @@ async def create_machine(machine: Annotated[MachineCreate, Body()]) -> MachineRe
@router.get("/api/machines/{name}")
async def get_machine(name: str) -> MachineResponse:
print("TODO")
log.error("TODO")
return MachineResponse(machine=Machine(name=name, status=Status.UNKNOWN))

View File

@@ -1,45 +1,27 @@
import asyncio
import json
import shlex
from typing import Annotated, AsyncIterator
import logging
import os
import shlex
import uuid
from typing import Annotated, AsyncIterator
from fastapi import APIRouter, Body, HTTPException, Request, status, logger
from fastapi import APIRouter, Body, FastAPI, HTTPException, Request, status, BackgroundTasks
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, StreamingResponse
from ...nix import nix_build, nix_eval
from ..schemas import VmConfig, VmInspectResponse
from ..schemas import VmConfig, VmInspectResponse, VmCreateResponse
# Logging setup
log = logging.getLogger(__name__)
router = APIRouter()
class NixBuildException(HTTPException):
def __init__(self, msg: str, loc: list = ["body", "flake_attr"]):
detail = [
{
"loc": loc,
"msg": msg,
"type": "value_error",
}
]
super().__init__(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=detail
)
app = FastAPI()
def nix_build_exception_handler(
request: Request, exc: NixBuildException
) -> JSONResponse:
return JSONResponse(
status_code=exc.status_code,
content=jsonable_encoder(dict(detail=exc.detail)),
)
def nix_inspect_vm(machine: str, flake_url: str) -> list[str]:
def nix_inspect_vm_cmd(machine: str, flake_url: str) -> list[str]:
return nix_eval(
[
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.clan.vm.config"
@@ -47,7 +29,7 @@ def nix_inspect_vm(machine: str, flake_url: str) -> list[str]:
)
def nix_build_vm(machine: str, flake_url: str) -> list[str]:
def nix_build_vm_cmd(machine: str, flake_url: str) -> list[str]:
return nix_build(
[
f"{flake_url}#nixosConfigurations.{json.dumps(machine)}.config.system.build.vm"
@@ -55,21 +37,13 @@ def nix_build_vm(machine: str, flake_url: str) -> list[str]:
)
async def start_vm(vm_path: str) -> None:
proc = await asyncio.create_subprocess_exec(
vm_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await proc.wait()
@router.post("/api/vms/inspect")
async def inspect_vm(
flake_url: Annotated[str, Body()], flake_attr: Annotated[str, Body()]
) -> VmInspectResponse:
cmd = nix_inspect_vm(flake_attr, flake_url=flake_url)
cmd = nix_inspect_vm_cmd(flake_attr, flake_url=flake_url)
proc = await asyncio.create_subprocess_exec(
cmd[0],
*cmd[1:],
@@ -94,40 +68,91 @@ command output:
)
async def vm_build(vm: VmConfig) -> AsyncIterator[str]:
cmd = nix_build_vm(vm.flake_attr, flake_url=vm.flake_url)
log.debug(f"Running command: {shlex.join(cmd)}")
proc = await asyncio.create_subprocess_exec(
cmd[0],
*cmd[1:],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None and proc.stderr is not None
vm_path = ""
async for line in proc.stdout:
vm_path = f'{line.decode("utf-8", "ignore").strip()}/bin/run-nixos-vm'
await start_vm(vm_path)
stderr = ""
async for line in proc.stderr:
stderr += line.decode("utf-8", "ignore")
yield line.decode("utf-8", "ignore")
res = await proc.wait()
if res != 0:
raise NixBuildException(
f"""
Failed to build vm from '{vm.flake_url}#{vm.flake_attr}'.
command: {shlex.join(cmd)}
exit code: {res}
command output:
{stderr}
"""
class NixBuildException(HTTPException):
def __init__(self, uuid: uuid.UUID, msg: str,loc: list = ["body", "flake_attr"]):
self.uuid = uuid
detail = [
{
"loc": loc,
"uuid": str(uuid),
"msg": msg,
"type": "value_error",
}
]
super().__init__(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=detail
)
import logging
import threading
import subprocess
import uuid
class BuildVM(threading.Thread):
def __init__(self, vm: VmConfig, uuid: uuid.UUID):
# calling parent class constructor
threading.Thread.__init__(self)
# constructor
self.vm: VmConfig = vm
self.uuid: uuid.UUID = uuid
self.log = logging.getLogger(__name__)
self.process: subprocess.Popen = None
def run(self):
self.log.debug(f"BuildVM with uuid {self.uuid} started")
cmd = nix_build_vm_cmd(self.vm.flake_attr, flake_url=self.vm.flake_url)
(out, err) = self.run_cmd(cmd)
vm_path = f'{out.strip()}/bin/run-nixos-vm'
self.log.debug(f"vm_path: {vm_path}")
(out, err) = self.run_cmd(vm_path)
def run_cmd(self, cmd: list[str]):
cwd=os.getcwd()
log.debug(f"Working directory: {cwd}")
log.debug(f"Running command: {shlex.join(cmd)}")
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
cwd=cwd,
)
self.process.wait()
if self.process.returncode != 0:
raise NixBuildException(self.uuid, f"Failed to run command: {shlex.join(cmd)}")
log.info("Successfully ran command")
return (self.process.stdout, self.process.stderr)
POOL: dict[uuid.UUID, BuildVM] = {}
def nix_build_exception_handler(
request: Request, exc: NixBuildException
) -> JSONResponse:
log.error("NixBuildException: %s", exc)
del POOL[exc.uuid]
return JSONResponse(
status_code=exc.status_code,
content=jsonable_encoder(dict(detail=exc.detail)),
)
@router.post("/api/vms/create")
async def create_vm(vm: Annotated[VmConfig, Body()]) -> StreamingResponse:
return StreamingResponse(vm_build(vm))
async def create_vm(vm: Annotated[VmConfig, Body()], background_tasks: BackgroundTasks) -> StreamingResponse:
handle_id = uuid.uuid4()
handle = BuildVM(vm, handle_id)
handle.start()
POOL[handle_id] = handle
return VmCreateResponse(uuid=str(handle_id))