Merge pull request 'impl_non_blocking_http' (#4296) from Qubasa/clan-core:impl_non_blocking_http into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4296
This commit is contained in:
Luis Hebendanz
2025-07-10 10:32:03 +00:00
7 changed files with 193 additions and 17 deletions

View File

@@ -126,6 +126,7 @@ class ApiBridge(ABC):
target=thread_task, args=(stop_event,), name=thread_name target=thread_task, args=(stop_event,), name=thread_name
) )
thread.start() thread.start()
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event) self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
if wait_for_completion: if wait_for_completion:

View File

@@ -1,6 +1,6 @@
import logging import logging
import threading import threading
from http.server import HTTPServer from http.server import HTTPServer, ThreadingHTTPServer
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
@@ -36,7 +36,7 @@ class HttpApiServer:
self._server_thread: threading.Thread | None = None self._server_thread: threading.Thread | None = None
# Bridge is now the request handler itself, no separate instance needed # Bridge is now the request handler itself, no separate instance needed
self._middleware: list[Middleware] = [] self._middleware: list[Middleware] = []
self.shared_threads = shared_threads or {} self.shared_threads = shared_threads if shared_threads is not None else {}
def add_middleware(self, middleware: "Middleware") -> None: def add_middleware(self, middleware: "Middleware") -> None:
"""Add middleware to the middleware chain.""" """Add middleware to the middleware chain."""
@@ -84,9 +84,9 @@ class HttpApiServer:
log.warning("HTTP server is already running") log.warning("HTTP server is already running")
return return
# Create the server # Create the server using ThreadingHTTPServer for concurrent request handling
handler_class = self._create_request_handler() handler_class = self._create_request_handler()
self._server = HTTPServer((self.host, self.port), handler_class) self._server = ThreadingHTTPServer((self.host, self.port), handler_class)
def run_server() -> None: def run_server() -> None:
if self._server: if self._server:

View File

@@ -2,7 +2,8 @@
<html lang="en"> <html lang="en">
<head> <head>
<meta charset="UTF-8" /> <meta charset="UTF-8" />
<title>Swagger UI</title> <title>Swagger UI with Interceptors</title>
<!-- Assuming these files are in the same directory -->
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" /> <link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" /> <link rel="stylesheet" type="text/css" href="index.css" />
<link <link
@@ -23,14 +24,100 @@
<div id="swagger-ui"></div> <div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"></script> <script src="./swagger-ui-bundle.js" charset="UTF-8"></script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"></script> <script src="./swagger-ui-standalone-preset.js" charset="UTF-8"></script>
<script src="./swagger-initializer.js" charset="UTF-8"></script> <!-- Your swagger-initializer.js is not needed if you configure directly in the HTML -->
<script> <script>
window.onload = () => { window.onload = () => {
SwaggerUIBundle({ SwaggerUIBundle({
url: "./openapi.json", // Path to your OpenAPI 3 spec (YAML or JSON) url: "./openapi.json", // Path to your OpenAPI 3 spec
dom_id: "#swagger-ui", dom_id: "#swagger-ui",
presets: [SwaggerUIBundle.presets.apis, SwaggerUIStandalonePreset], presets: [SwaggerUIBundle.presets.apis, SwaggerUIStandalonePreset],
layout: "StandaloneLayout", layout: "StandaloneLayout",
tryItOutEnabled: true,
deepLinking: true,
displayOperationId: true,
// --- INTERCEPTORS START HERE ---
/**
* requestInterceptor
* This function is called before a request is sent.
* It takes the request object and must return a modified request object.
* We will use it to wrap the user's input.
*/
requestInterceptor: (request) => {
console.log("Intercepting request:", request);
// Only modify requests that have a body (like POST, PUT)
if (request.body) {
try {
// The body from the UI is a string, so we parse it to an object.
const originalBody = JSON.parse(request.body);
// Create the new, nested structure.
const newBody = {
body: originalBody,
header: {}, // Add an empty header object as per your example
};
// Replace the original body with the new, stringified, nested structure.
request.body = JSON.stringify(newBody);
// Update the 'Content-Length' header to match the new body size.
request.headers["Content-Length"] = new Blob([
request.body,
]).size;
console.log("Modified request body:", request.body);
} catch (e) {
// If the user's input isn't valid JSON, don't modify the request.
console.error(
"Request Interceptor: Could not parse body as JSON.",
e,
);
}
}
return request; // Always return the request object
},
/**
* responseInterceptor
* This function is called after a response is received, but before it's displayed.
* It takes the response object and must return a modified response object.
* We will use it to un-nest the data for display.
*/
responseInterceptor: (response) => {
console.log("Intercepting response:", response);
// Check if the response was successful and has data to process.
if (response.ok && response.data) {
try {
// The response data is a string, so we parse it into an object.
const fullResponse = JSON.parse(response.data);
// Check if the expected 'body' property exists.
if (fullResponse && typeof fullResponse.body !== "undefined") {
console.log(
"Found nested 'body' property. Un-nesting for display.",
);
// Replace the response's data with JUST the nested 'body' object.
// We stringify it with pretty-printing (2-space indentation) for readability in the UI.
response.data = JSON.stringify(fullResponse.body, null, 2);
response.text = response.data; // Also update the 'text' property
}
} catch (e) {
// If the response isn't the expected JSON structure, do nothing.
// This prevents errors on other endpoints that have a normal response.
console.error(
"Response Interceptor: Could not parse response or un-nest data.",
e,
);
}
}
return response; // Always return the response object
},
// --- INTERCEPTORS END HERE ---
}); });
}; };
</script> </script>

View File

@@ -1,27 +1,33 @@
"""Tests for HTTP API components.""" """Tests for HTTP API components."""
import json import json
import logging
import threading
import time import time
from unittest.mock import Mock from unittest.mock import Mock
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
import pytest import pytest
from clan_lib.api import MethodRegistry from clan_lib.api import MethodRegistry, tasks
from clan_lib.async_run import is_async_cancelled
from clan_lib.log_manager import LogManager from clan_lib.log_manager import LogManager
from clan_app.api.middleware import ( from clan_app.api.middleware import (
ArgumentParsingMiddleware, ArgumentParsingMiddleware,
LoggingMiddleware,
MethodExecutionMiddleware, MethodExecutionMiddleware,
) )
from clan_app.deps.http.http_server import HttpApiServer from clan_app.deps.http.http_server import HttpApiServer
log = logging.getLogger(__name__)
@pytest.fixture @pytest.fixture
def mock_api() -> MethodRegistry: def mock_api() -> MethodRegistry:
"""Create a mock API with test methods.""" """Create a mock API with test methods."""
api = MethodRegistry() api = MethodRegistry()
api.register(tasks.delete_task)
@api.register @api.register
def test_method(message: str) -> dict[str, str]: def test_method(message: str) -> dict[str, str]:
return {"response": f"Hello {message}!"} return {"response": f"Hello {message}!"}
@@ -31,6 +37,19 @@ def mock_api() -> MethodRegistry:
msg = "Test error" msg = "Test error"
raise ValueError(msg) raise ValueError(msg)
@api.register
def run_task_blocking(wtime: int) -> str:
"""A long blocking task that simulates a long-running operation."""
time.sleep(1)
for i in range(wtime):
if is_async_cancelled():
log.debug("Task was cancelled")
return "Task was cancelled"
log.debug(f"Processing {i} for {wtime}")
time.sleep(1)
return f"Task completed with wtime: {wtime}"
return api return api
@@ -50,7 +69,7 @@ def http_bridge(
"""Create HTTP bridge dependencies for testing.""" """Create HTTP bridge dependencies for testing."""
middleware_chain = ( middleware_chain = (
ArgumentParsingMiddleware(api=mock_api), ArgumentParsingMiddleware(api=mock_api),
LoggingMiddleware(log_manager=mock_log_manager), # LoggingMiddleware(log_manager=mock_log_manager),
MethodExecutionMiddleware(api=mock_api), MethodExecutionMiddleware(api=mock_api),
) )
return mock_api, middleware_chain return mock_api, middleware_chain
@@ -67,7 +86,7 @@ def http_server(mock_api: MethodRegistry, mock_log_manager: Mock) -> HttpApiServ
# Add middleware # Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) # server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager))
server.add_middleware(MethodExecutionMiddleware(api=mock_api)) server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Bridge will be created automatically when accessed # Bridge will be created automatically when accessed
@@ -84,7 +103,7 @@ class TestHttpBridge:
# We'll test initialization through the server # We'll test initialization through the server
api, middleware_chain = http_bridge api, middleware_chain = http_bridge
assert api is not None assert api is not None
assert len(middleware_chain) == 3 assert len(middleware_chain) == 2
def test_http_bridge_middleware_setup(self, http_bridge: tuple) -> None: def test_http_bridge_middleware_setup(self, http_bridge: tuple) -> None:
"""Test that middleware is properly set up.""" """Test that middleware is properly set up."""
@@ -92,10 +111,10 @@ class TestHttpBridge:
# Test that we can create the bridge with middleware # Test that we can create the bridge with middleware
# The actual HTTP handling will be tested through the server integration tests # The actual HTTP handling will be tested through the server integration tests
assert len(middleware_chain) == 3 assert len(middleware_chain) == 2
assert isinstance(middleware_chain[0], ArgumentParsingMiddleware) assert isinstance(middleware_chain[0], ArgumentParsingMiddleware)
assert isinstance(middleware_chain[1], LoggingMiddleware) # assert isinstance(middleware_chain[1], LoggingMiddleware)
assert isinstance(middleware_chain[2], MethodExecutionMiddleware) assert isinstance(middleware_chain[1], MethodExecutionMiddleware)
class TestHttpApiServer: class TestHttpApiServer:
@@ -248,7 +267,7 @@ class TestIntegration:
# Add middleware # Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) # server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager))
server.add_middleware(MethodExecutionMiddleware(api=mock_api)) server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Bridge will be created automatically when accessed # Bridge will be created automatically when accessed
@@ -281,6 +300,73 @@ class TestIntegration:
# Always stop server # Always stop server
server.stop() server.stop()
def test_blocking_task(
self, mock_api: MethodRegistry, mock_log_manager: Mock
) -> None:
shared_threads: dict[str, tasks.WebThread] = {}
tasks.BAKEND_THREADS = shared_threads
"""Test a long-running blocking task."""
server: HttpApiServer = HttpApiServer(
api=mock_api,
host="127.0.0.1",
port=8083,
shared_threads=shared_threads,
)
# Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
# server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager))
server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Start server
server.start()
time.sleep(0.1) # Give server time to start
blocking_op_key = "b37f920f-ce8c-4c8d-b595-28ca983d265e" # str(uuid.uuid4())
def parallel_task() -> None:
# Make API call
request_data: dict = {
"body": {"wtime": 60},
"header": {"op_key": blocking_op_key},
}
req: Request = Request(
"http://127.0.0.1:8083/api/v1/run_task_blocking",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req)
data: dict = json.loads(response.read().decode())
# thread.join()
assert "body" in data
assert data["body"]["status"] == "success"
assert data["body"]["data"] == "Task was cancelled"
thread = threading.Thread(
target=parallel_task,
name="ParallelTaskThread",
daemon=True,
)
thread.start()
time.sleep(1)
request_data: dict = {
"body": {"task_id": blocking_op_key},
}
req: Request = Request(
"http://127.0.0.1:8083/api/v1/delete_task",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req)
data: dict = json.loads(response.read().decode())
assert "body" in data
assert "header" in data
assert data["body"]["status"] == "success"
if __name__ == "__main__": if __name__ == "__main__":
pytest.main([__file__, "-v"]) pytest.main([__file__, "-v"])

View File

@@ -21,7 +21,7 @@ exclude = ["result", "**/__pycache__"]
clan_app = ["**/assets/*"] clan_app = ["**/assets/*"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = "tests" testpaths = [ "tests", "clan_app" ]
faulthandler_timeout = 60 faulthandler_timeout = 60
log_level = "DEBUG" log_level = "DEBUG"
log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s" log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s"

View File

@@ -58,6 +58,7 @@ mkShell {
with ps; with ps;
[ [
mypy mypy
pytest-cov
] ]
++ (clan-app.devshellPyDeps ps) ++ (clan-app.devshellPyDeps ps)
)) ))

View File

@@ -23,6 +23,7 @@ def delete_task(task_id: str) -> None:
"""Cancel a task by its op_key.""" """Cancel a task by its op_key."""
assert BAKEND_THREADS is not None, "Backend threads not initialized" assert BAKEND_THREADS is not None, "Backend threads not initialized"
future = BAKEND_THREADS.get(task_id) future = BAKEND_THREADS.get(task_id)
log.debug(f"Thread ID: {threading.get_ident()}") log.debug(f"Thread ID: {threading.get_ident()}")
if future: if future:
future.stop_event.set() future.stop_event.set()