clan-app: renamed deps folder to 'backend'

This commit is contained in:
Qubasa
2025-09-30 14:23:27 +02:00
parent adb82a8414
commit 8ad9f99606
11 changed files with 3 additions and 4 deletions

View File

@@ -0,0 +1,373 @@
import json
import logging
import threading
import uuid
from http.server import BaseHTTPRequestHandler
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
from clan_lib.api import (
MethodRegistry,
SuccessDataClass,
dataclass_to_dict,
)
from clan_lib.api.tasks import WebThread
from clan_lib.async_run import (
set_current_thread_opkey,
set_should_cancel,
)
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
if TYPE_CHECKING:
from clan_app.middleware.base import Middleware
log = logging.getLogger(__name__)
class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
"""HTTP-specific implementation of the API bridge that handles HTTP requests directly.
This bridge combines the API bridge functionality with HTTP request handling.
"""
def __init__(
self,
api: MethodRegistry,
middleware_chain: tuple["Middleware", ...],
request: Any,
client_address: Any,
server: Any,
*,
openapi_file: Path | None = None,
swagger_dist: Path | None = None,
shared_threads: dict[str, WebThread] | None = None,
) -> None:
# Initialize API bridge fields
self.api = api
self.middleware_chain = middleware_chain
self.threads = shared_threads if shared_threads is not None else {}
# Initialize OpenAPI/Swagger fields
self.openapi_file = openapi_file
self.swagger_dist = swagger_dist
# Initialize HTTP handler
super(BaseHTTPRequestHandler, self).__init__(request, client_address, server)
def _send_cors_headers(self) -> None:
"""Send CORS headers for cross-origin requests."""
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
def _send_json_response_with_status(
self,
data: dict[str, Any],
status_code: int = 200,
) -> None:
"""Send a JSON response with the given status code."""
try:
self.send_response_only(status_code)
self.send_header("Content-Type", "application/json")
self._send_cors_headers()
self.end_headers()
response_data = json.dumps(data, indent=2, ensure_ascii=False)
self.wfile.write(response_data.encode("utf-8"))
except BrokenPipeError as e:
log.warning(f"Client disconnected before we could send a response: {e!s}")
def send_api_response(self, response: BackendResponse) -> None:
"""Send HTTP response directly to the client."""
response_dict = dataclass_to_dict(response)
self._send_json_response_with_status(response_dict, 200)
log.debug(
f"HTTP response for {response._op_key}: {json.dumps(response_dict, indent=2)}", # noqa: SLF001
)
def _create_success_response(
self,
op_key: str,
data: dict[str, Any],
) -> BackendResponse:
"""Create a successful API response."""
return BackendResponse(
body=SuccessDataClass(op_key=op_key, status="success", data=data),
header={},
_op_key=op_key,
)
def _send_info_response(self) -> None:
"""Send server information response."""
response = self._create_success_response(
"info",
{"message": "Clan API Server", "version": "1.0.0"},
)
self.send_api_response(response)
def _send_methods_response(self) -> None:
"""Send available API methods response."""
response = self._create_success_response(
"methods",
{"methods": list(self.api.functions.keys())},
)
self.send_api_response(response)
def _handle_swagger_request(self, parsed_url: Any) -> None:
"""Handle Swagger UI related requests."""
if not self.swagger_dist or not self.swagger_dist.exists():
self.send_error(404, "Swagger file not found")
return
rel_path = parsed_url.path[len("/api/swagger") :].lstrip("/")
# Redirect /api/swagger to /api/swagger/index.html
if rel_path == "":
self.send_response(302)
self.send_header("Location", "/api/swagger/index.html")
self.end_headers()
return
self._serve_swagger_file(rel_path)
def _serve_swagger_file(self, rel_path: str) -> None:
"""Serve a specific Swagger UI file."""
file_path = self._get_swagger_file_path(rel_path)
if not file_path.exists() or not file_path.is_file():
self.send_error(404, "Swagger file not found")
return
try:
content_type = self._get_content_type(file_path)
file_data = self._read_and_process_file(file_path, rel_path)
self.send_response(200)
self.send_header("Content-Type", content_type)
self.end_headers()
self.wfile.write(file_data)
except (OSError, json.JSONDecodeError, UnicodeDecodeError):
log.exception("Error reading Swagger file")
self.send_error(500, "Internal Server Error")
def _get_swagger_file_path(self, rel_path: str) -> Path:
"""Get the file path for a Swagger resource."""
if rel_path == "index.html":
return Path(__file__).parent / "swagger.html"
if rel_path == "openapi.json":
if not self.openapi_file:
return Path("/nonexistent") # Will fail exists() check
return self.openapi_file
return (
self.swagger_dist / rel_path if self.swagger_dist else Path("/nonexistent")
)
def _get_content_type(self, file_path: Path) -> str:
"""Get the content type for a file based on its extension."""
content_types = {
".html": "text/html",
".js": "application/javascript",
".css": "text/css",
".json": "application/json",
".png": "image/png",
".svg": "image/svg+xml",
}
return content_types.get(file_path.suffix, "application/octet-stream")
def _read_and_process_file(self, file_path: Path, rel_path: str) -> bytes:
"""Read and optionally process a file (e.g., inject server URL into openapi.json)."""
with file_path.open("rb") as f:
file_data = f.read()
if rel_path == "openapi.json":
json_data = json.loads(file_data.decode("utf-8"))
server_address = getattr(self.server, "server_address", ("localhost", 80))
json_data["servers"] = [
{"url": f"http://{server_address[0]}:{server_address[1]}/api/v1/"},
]
file_data = json.dumps(json_data, indent=2).encode("utf-8")
return file_data
def do_OPTIONS(self) -> None:
"""Handle CORS preflight requests."""
self.send_response_only(200)
self._send_cors_headers()
self.end_headers()
def do_GET(self) -> None:
"""Handle GET requests."""
parsed_url = urlparse(self.path)
path = parsed_url.path
if path == "/":
self._send_info_response()
elif path.startswith("/api/swagger"):
self._handle_swagger_request(parsed_url)
elif path == "/api/methods":
self._send_methods_response()
else:
self.send_api_error_response("info", "Not Found", ["http_bridge", "GET"])
def do_POST(self) -> None:
"""Handle POST requests."""
parsed_url = urlparse(self.path)
path = parsed_url.path
# Validate API path
if not path.startswith("/api/v1/"):
self.send_api_error_response(
"post",
f"Path not found: {path}",
["http_bridge", "POST"],
)
return
# Extract and validate method name
method_name = path[len("/api/v1/") :]
if not method_name:
self.send_api_error_response(
"post",
"Method name required",
["http_bridge", "POST"],
)
return
if method_name not in self.api.functions:
self.send_api_error_response(
"post",
f"Method '{method_name}' not found",
["http_bridge", "POST", method_name],
)
return
# Read and parse request body
request_data = self._read_request_body(method_name)
if request_data is None:
return # Error already sent
# Generate operation key and handle request
gen_op_key = str(uuid.uuid4())
try:
self._handle_api_request(method_name, request_data, gen_op_key)
except RuntimeError as e:
log.exception(f"Error processing API request {method_name}")
self.send_api_error_response(
gen_op_key,
f"Internal server error: {e!s}",
["http_bridge", "POST", method_name],
)
def _read_request_body(self, method_name: str) -> dict[str, Any] | None:
"""Read and parse the request body. Returns None if there was an error."""
try:
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
return {}
body = self.rfile.read(content_length)
return json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
self.send_api_error_response(
"post",
"Invalid JSON in request body",
["http_bridge", "POST", method_name],
)
return None
except (OSError, ValueError, UnicodeDecodeError) as e:
self.send_api_error_response(
"post",
f"Error reading request: {e!s}",
["http_bridge", "POST", method_name],
)
return None
def _parse_request_data(
self,
request_data: dict[str, Any],
gen_op_key: str,
) -> tuple[dict[str, Any], dict[str, Any], str]:
"""Parse and validate request data components."""
header = request_data.get("header", {})
if not isinstance(header, dict):
msg = f"Expected header to be a dict, got {type(header)}"
raise TypeError(msg)
body = request_data.get("body", {})
if not isinstance(body, dict):
msg = f"Expected body to be a dict, got {type(body)}"
raise TypeError(msg)
op_key = header.get("op_key", gen_op_key)
if not isinstance(op_key, str):
msg = f"Expected op_key to be a string, got {type(op_key)}"
raise TypeError(msg)
return header, body, op_key
def _handle_api_request(
self,
method_name: str,
request_data: dict[str, Any],
gen_op_key: str,
) -> None:
"""Handle an API request by processing it through middleware."""
try:
# Validate and parse request data
header, body, op_key = self._parse_request_data(request_data, gen_op_key)
# Validate operation key
self._validate_operation_key(op_key)
# Create API request
api_request = BackendRequest(
method_name=method_name,
args=body,
header=header,
op_key=op_key,
)
except (KeyError, TypeError, ValueError) as e:
self.send_api_error_response(
gen_op_key,
str(e),
["http_bridge", method_name],
)
return
self._process_api_request_in_thread(api_request)
def process_request_in_thread(
self,
request: BackendRequest,
*,
thread_name: str = "ApiBridgeThread",
wait_for_completion: bool = False,
timeout: float = 60.0 * 60, # 1 hour default timeout
) -> None:
pass
def _process_api_request_in_thread(
self,
api_request: BackendRequest,
) -> None:
"""Process the API request in a separate thread."""
stop_event = threading.Event()
request = api_request
op_key = request.op_key or "unknown"
set_should_cancel(lambda: stop_event.is_set())
set_current_thread_opkey(op_key)
curr_thread = threading.current_thread()
self.threads[op_key] = WebThread(thread=curr_thread, stop_event=stop_event)
log.debug(
f"Processing {request.method_name} with args {request.args} "
f"and header {request.header}",
)
self.process_request(request)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002
"""Override default logging to use our logger."""
log.info(f"{self.address_string()} - {format % args}")

View File

@@ -0,0 +1,114 @@
import logging
import threading
from http.server import HTTPServer, ThreadingHTTPServer
from pathlib import Path
from typing import TYPE_CHECKING, Any
from clan_lib.api import MethodRegistry
from clan_lib.api.tasks import WebThread
if TYPE_CHECKING:
from clan_app.middleware.base import Middleware
from .http_bridge import HttpBridge
log = logging.getLogger(__name__)
class HttpApiServer:
"""HTTP server for the Clan API using Python's built-in HTTP server."""
def __init__(
self,
api: MethodRegistry,
host: str = "127.0.0.1",
port: int = 8080,
openapi_file: Path | None = None,
swagger_dist: Path | None = None,
shared_threads: dict[str, WebThread] | None = None,
) -> None:
self.api = api
self.openapi = openapi_file
self.swagger_dist = swagger_dist
self.host = host
self.port = port
self._server: HTTPServer | None = None
self._server_thread: threading.Thread | None = None
# Bridge is now the request handler itself, no separate instance needed
self._middleware: list[Middleware] = []
self.shared_threads = shared_threads if shared_threads is not None else {}
def add_middleware(self, middleware: "Middleware") -> None:
"""Add middleware to the middleware chain."""
if self._server is not None:
msg = "Cannot add middleware after server is started"
raise RuntimeError(msg)
self._middleware.append(middleware)
@property
def server(self) -> HTTPServer | None:
"""Get the HTTP server instance."""
return self._server
@property
def server_thread(self) -> threading.Thread | None:
"""Get the server thread."""
return self._server_thread
def _create_request_handler(self) -> type[HttpBridge]:
"""Create a request handler class with injected dependencies."""
api = self.api
middleware_chain = tuple(self._middleware)
openapi_file = self.openapi
swagger_dist = self.swagger_dist
shared_threads = self.shared_threads
class RequestHandler(HttpBridge):
def __init__(self, request: Any, client_address: Any, server: Any) -> None:
super().__init__(
api=api,
middleware_chain=middleware_chain,
request=request,
client_address=client_address,
server=server,
openapi_file=openapi_file,
swagger_dist=swagger_dist,
shared_threads=shared_threads,
)
return RequestHandler
def start(self) -> None:
"""Start the HTTP server in a separate thread."""
if self._server_thread is not None:
log.warning("HTTP server is already running")
return
# Create the server using ThreadingHTTPServer for concurrent request handling
handler_class = self._create_request_handler()
self._server = ThreadingHTTPServer((self.host, self.port), handler_class)
def run_server() -> None:
if self._server:
log.info(f"HTTP API server started on http://{self.host}:{self.port}")
self._server.serve_forever()
self._server_thread = threading.Thread(target=run_server, daemon=True)
self._server_thread.start()
def stop(self) -> None:
"""Stop the HTTP server."""
if self._server:
self._server.shutdown()
self._server.server_close()
self._server = None
if self._server_thread:
self._server_thread.join(timeout=5)
self._server_thread = None
log.info("HTTP API server stopped")
def is_running(self) -> bool:
"""Check if the server is running."""
return self._server_thread is not None and self._server_thread.is_alive()

View File

@@ -0,0 +1,125 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Swagger UI with Interceptors</title>
<!-- Assuming these files are in the same directory -->
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="stylesheet" type="text/css" href="index.css" />
<link
rel="icon"
type="image/png"
href="./favicon-32x32.png"
sizes="32x32"
/>
<link
rel="icon"
type="image/png"
href="./favicon-16x16.png"
sizes="16x16"
/>
</head>
<body>
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"></script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"></script>
<!-- Your swagger-initializer.js is not needed if you configure directly in the HTML -->
<script>
window.onload = () => {
SwaggerUIBundle({
url: "./openapi.json", // Path to your OpenAPI 3 spec
dom_id: "#swagger-ui",
presets: [SwaggerUIBundle.presets.apis, SwaggerUIStandalonePreset],
layout: "StandaloneLayout",
tryItOutEnabled: true,
deepLinking: true,
displayOperationId: true,
// --- INTERCEPTORS START HERE ---
/**
* requestInterceptor
* This function is called before a request is sent.
* It takes the request object and must return a modified request object.
* We will use it to wrap the user's input.
*/
requestInterceptor: (request) => {
console.log("Intercepting request:", request);
// Only modify requests that have a body (like POST, PUT)
if (request.body) {
try {
// The body from the UI is a string, so we parse it to an object.
const originalBody = JSON.parse(request.body);
// Create the new, nested structure.
const newBody = {
body: originalBody,
header: {}, // Add an empty header object as per your example
};
// Replace the original body with the new, stringified, nested structure.
request.body = JSON.stringify(newBody);
// Update the 'Content-Length' header to match the new body size.
request.headers["Content-Length"] = new Blob([
request.body,
]).size;
console.log("Modified request body:", request.body);
} catch (e) {
// If the user's input isn't valid JSON, don't modify the request.
console.error(
"Request Interceptor: Could not parse body as JSON.",
e,
);
}
}
return request; // Always return the request object
},
/**
* responseInterceptor
* This function is called after a response is received, but before it's displayed.
* It takes the response object and must return a modified response object.
* We will use it to un-nest the data for display.
*/
responseInterceptor: (response) => {
console.log("Intercepting response:", response);
// Check if the response was successful and has data to process.
if (response.ok && response.data) {
try {
// The response data is a string, so we parse it into an object.
const fullResponse = JSON.parse(response.data);
// Check if the expected 'body' property exists.
if (fullResponse && typeof fullResponse.body !== "undefined") {
console.log(
"Found nested 'body' property. Un-nesting for display.",
);
// Replace the response's data with JUST the nested 'body' object.
// We stringify it with pretty-printing (2-space indentation) for readability in the UI.
response.data = JSON.stringify(fullResponse.body, null, 2);
response.text = response.data; // Also update the 'text' property
}
} catch (e) {
// If the response isn't the expected JSON structure, do nothing.
// This prevents errors on other endpoints that have a normal response.
console.error(
"Response Interceptor: Could not parse response or un-nest data.",
e,
);
}
}
return response; // Always return the response object
},
// --- INTERCEPTORS END HERE ---
});
};
</script>
</body>
</html>

View File

@@ -0,0 +1,358 @@
"""Tests for HTTP API components."""
import json
import logging
import threading
import time
from urllib.request import Request, urlopen
import pytest
from clan_lib.api import MethodRegistry, tasks
from clan_lib.async_run import is_async_cancelled
from clan_app.backends.http.http_server import HttpApiServer
from clan_app.middleware import (
ArgumentParsingMiddleware,
MethodExecutionMiddleware,
)
log = logging.getLogger(__name__)
@pytest.fixture
def mock_api() -> MethodRegistry:
"""Create a mock API with test methods."""
api = MethodRegistry()
api.register(tasks.delete_task)
@api.register
def test_method(message: str) -> dict[str, str]:
return {"response": f"Hello {message}!"}
@api.register
def test_method_with_error() -> dict[str, str]:
msg = "Test error"
raise ValueError(msg)
@api.register
def run_task_blocking(wtime: int) -> str:
"""A long blocking task that simulates a long-running operation."""
time.sleep(1)
for i in range(wtime):
if is_async_cancelled():
log.debug("Task was cancelled")
return "Task was cancelled"
log.debug(f"Processing {i} for {wtime}")
time.sleep(1)
return f"Task completed with wtime: {wtime}"
return api
@pytest.fixture
def http_bridge(
mock_api: MethodRegistry,
) -> tuple[MethodRegistry, tuple]:
"""Create HTTP bridge dependencies for testing."""
middleware_chain = (
ArgumentParsingMiddleware(api=mock_api),
MethodExecutionMiddleware(api=mock_api),
)
return mock_api, middleware_chain
@pytest.fixture
def http_server(mock_api: MethodRegistry) -> HttpApiServer:
"""Create HTTP server with mock dependencies."""
server = HttpApiServer(
api=mock_api,
host="127.0.0.1",
port=8081, # Use different port for tests
)
# Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Bridge will be created automatically when accessed
return server
class TestHttpBridge:
"""Tests for HttpBridge class."""
def test_http_bridge_initialization(self, http_bridge: tuple) -> None:
"""Test HTTP bridge initialization."""
# Since HttpBridge is now a request handler, we can't instantiate it directly
# We'll test initialization through the server
api, middleware_chain = http_bridge
assert api is not None
assert len(middleware_chain) == 2
def test_http_bridge_middleware_setup(self, http_bridge: tuple) -> None:
"""Test that middleware is properly set up."""
api, middleware_chain = http_bridge
# Test that we can create the bridge with middleware
# The actual HTTP handling will be tested through the server integration tests
assert len(middleware_chain) == 2
assert isinstance(middleware_chain[0], ArgumentParsingMiddleware)
assert isinstance(middleware_chain[1], MethodExecutionMiddleware)
class TestHttpApiServer:
"""Tests for HttpApiServer class."""
def test_server_initialization(self, http_server: HttpApiServer) -> None:
"""Test HTTP server initialization."""
assert http_server.host == "127.0.0.1"
assert http_server.port == 8081
assert http_server.server is None
assert http_server.server_thread is None
assert not http_server.is_running()
def test_server_start_stop(self, http_server: HttpApiServer) -> None:
"""Test starting and stopping the server."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
assert http_server.is_running()
# Stop server
http_server.stop()
time.sleep(0.1) # Give server time to stop
assert not http_server.is_running()
def test_server_endpoints(self, http_server: HttpApiServer) -> None:
"""Test server endpoints."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
try:
# Test root endpoint
response = urlopen("http://127.0.0.1:8081/")
data: dict = json.loads(response.read().decode())
assert data["body"]["status"] == "success"
assert data["body"]["data"]["message"] == "Clan API Server"
assert data["body"]["data"]["version"] == "1.0.0"
# Test methods endpoint
response = urlopen("http://127.0.0.1:8081/api/methods")
data = json.loads(response.read().decode())
assert data["body"]["status"] == "success"
assert "test_method" in data["body"]["data"]["methods"]
assert "test_method_with_error" in data["body"]["data"]["methods"]
# Test API call endpoint
request_data: dict = {"header": {}, "body": {"message": "World"}}
req: Request = Request(
"http://127.0.0.1:8081/api/v1/test_method",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req) # noqa: S310
data = json.loads(response.read().decode())
# Response should be BackendResponse format
assert "body" in data
assert "header" in data
assert data["body"]["status"] == "success"
assert data["body"]["data"] == {"response": "Hello World!"}
finally:
# Always stop server
http_server.stop()
def test_server_error_handling(self, http_server: HttpApiServer) -> None:
"""Test server error handling."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
try:
# Test 404 error
res = urlopen("http://127.0.0.1:8081/nonexistent")
assert res.status == 200
body = json.loads(res.read().decode())["body"]
assert body["status"] == "error"
# Test method not found
request_data: dict = {"header": {}, "body": {}}
req: Request = Request(
"http://127.0.0.1:8081/api/v1/nonexistent_method",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
res = urlopen(req) # noqa: S310
assert res.status == 200
body = json.loads(res.read().decode())["body"]
assert body["status"] == "error"
# Test invalid JSON
req = Request(
"http://127.0.0.1:8081/api/v1/test_method",
data=b"invalid json",
headers={"Content-Type": "application/json"},
)
res = urlopen(req) # noqa: S310
assert res.status == 200
body = json.loads(res.read().decode())["body"]
assert body["status"] == "error"
finally:
# Always stop server
http_server.stop()
def test_server_cors_headers(self, http_server: HttpApiServer) -> None:
"""Test CORS headers are properly set."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
try:
# Test OPTIONS request
class OptionsRequest(Request):
def get_method(self) -> str:
return "OPTIONS"
req: Request = OptionsRequest("http://127.0.0.1:8081/api/call/test_method")
response = urlopen(req) # noqa: S310
# Check CORS headers
headers = response.info()
assert headers.get("Access-Control-Allow-Origin") == "*"
assert "GET" in headers.get("Access-Control-Allow-Methods", "")
assert "POST" in headers.get("Access-Control-Allow-Methods", "")
finally:
# Always stop server
http_server.stop()
class TestIntegration:
"""Integration tests for HTTP API components."""
def test_full_request_flow(
self,
mock_api: MethodRegistry,
) -> None:
"""Test complete request flow from server to bridge to middleware."""
server: HttpApiServer = HttpApiServer(
api=mock_api,
host="127.0.0.1",
port=8082,
)
# Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Bridge will be created automatically when accessed
# Start server
server.start()
time.sleep(0.1) # Give server time to start
try:
# Make API call
request_data: dict = {
"header": {"logging": {"group_path": ["test", "group"]}},
"body": {"message": "Integration"},
}
req: Request = Request(
"http://127.0.0.1:8082/api/v1/test_method",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req) # noqa: S310
data: dict = json.loads(response.read().decode())
# Verify response in BackendResponse format
assert "body" in data
assert "header" in data
assert data["body"]["status"] == "success"
assert data["body"]["data"] == {"response": "Hello Integration!"}
finally:
# Always stop server
server.stop()
def test_blocking_task(
self,
mock_api: MethodRegistry,
) -> None:
shared_threads: dict[str, tasks.WebThread] = {}
tasks.BAKEND_THREADS = shared_threads
"""Test a long-running blocking task."""
server: HttpApiServer = HttpApiServer(
api=mock_api,
host="127.0.0.1",
port=8083,
shared_threads=shared_threads,
)
# Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Start server
server.start()
time.sleep(0.1) # Give server time to start
blocking_op_key = "b37f920f-ce8c-4c8d-b595-28ca983d265e" # str(uuid.uuid4())
def parallel_task() -> None:
# Make API call
request_data: dict = {
"body": {"wtime": 60},
"header": {"op_key": blocking_op_key},
}
req: Request = Request(
"http://127.0.0.1:8083/api/v1/run_task_blocking",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req) # noqa: S310
data: dict = json.loads(response.read().decode())
# thread.join()
assert "body" in data
assert data["body"]["status"] == "success"
assert data["body"]["data"] == "Task was cancelled"
thread = threading.Thread(
target=parallel_task,
name="ParallelTaskThread",
daemon=True,
)
thread.start()
time.sleep(1)
request_data: dict = {
"body": {"task_id": blocking_op_key},
}
req: Request = Request(
"http://127.0.0.1:8083/api/v1/delete_task",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req) # noqa: S310
data: dict = json.loads(response.read().decode())
assert "body" in data
assert "header" in data
assert data["body"]["status"] == "success"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,126 @@
import ctypes
import ctypes.util
import os
import platform
from ctypes import CFUNCTYPE, c_char_p, c_int, c_void_p
from pathlib import Path
# Native handle kinds
WEBVIEW_NATIVE_HANDLE_KIND_UI_WINDOW = 0
WEBVIEW_NATIVE_HANDLE_KIND_UI_WIDGET = 1
WEBVIEW_NATIVE_HANDLE_KIND_BROWSER_CONTROLLER = 2
def _encode_c_string(s: str) -> bytes:
return s.encode("utf-8")
def _get_webview_version() -> str:
"""Get webview version from environment variable or use default"""
return os.getenv("WEBVIEW_VERSION", "0.8.1")
def _get_lib_names() -> list[str]:
"""Get platform-specific library names."""
system = platform.system().lower()
machine = platform.machine().lower()
if system == "windows":
if machine in {"amd64", "x86_64"}:
return ["webview.dll", "WebView2Loader.dll"]
if machine == "arm64":
msg = "arm64 is not supported on Windows"
raise RuntimeError(msg)
msg = f"Unsupported architecture: {machine}"
raise RuntimeError(msg)
if system == "darwin":
return ["libwebview.dylib"]
# linux
return ["libwebview.so"]
def _be_sure_libraries() -> list[Path] | None:
"""Ensure libraries exist and return paths."""
lib_dir = os.environ.get("WEBVIEW_LIB_DIR")
if not lib_dir:
msg = "WEBVIEW_LIB_DIR environment variable is not set"
raise RuntimeError(msg)
lib_dir_p = Path(lib_dir)
lib_names = _get_lib_names()
lib_paths = [lib_dir_p / lib_name for lib_name in lib_names]
# Check if any library is missing
missing_libs = [path for path in lib_paths if not path.exists()]
if not missing_libs:
return lib_paths
return None
class _WebviewLibrary:
def __init__(self) -> None:
lib_names = _get_lib_names()
library_path = ctypes.util.find_library(lib_names[0])
if not library_path:
library_paths = _be_sure_libraries()
if not library_paths:
msg = f"Failed to find required library: {lib_names}"
raise RuntimeError(msg)
try:
self.lib = ctypes.cdll.LoadLibrary(str(library_paths[0]))
except Exception as e:
print(f"Failed to load webview library: {e}")
raise
# Define FFI functions
self.webview_create = self.lib.webview_create
self.webview_create.argtypes = [c_int, c_void_p]
self.webview_create.restype = c_void_p
self.webview_create_with_app_id = self.lib.webview_create_with_app_id
self.webview_create_with_app_id.argtypes = [c_int, c_void_p, c_char_p]
self.webview_create_with_app_id.restype = c_void_p
self.webview_destroy = self.lib.webview_destroy
self.webview_destroy.argtypes = [c_void_p]
self.webview_run = self.lib.webview_run
self.webview_run.argtypes = [c_void_p]
self.webview_terminate = self.lib.webview_terminate
self.webview_terminate.argtypes = [c_void_p]
self.webview_set_title = self.lib.webview_set_title
self.webview_set_title.argtypes = [c_void_p, c_char_p]
self.webview_set_size = self.lib.webview_set_size
self.webview_set_size.argtypes = [c_void_p, c_int, c_int, c_int]
self.webview_navigate = self.lib.webview_navigate
self.webview_navigate.argtypes = [c_void_p, c_char_p]
self.webview_init = self.lib.webview_init
self.webview_init.argtypes = [c_void_p, c_char_p]
self.webview_eval = self.lib.webview_eval
self.webview_eval.argtypes = [c_void_p, c_char_p]
self.webview_bind = self.lib.webview_bind
self.webview_bind.argtypes = [c_void_p, c_char_p, c_void_p, c_void_p]
self.webview_unbind = self.lib.webview_unbind
self.webview_unbind.argtypes = [c_void_p, c_char_p]
self.webview_return = self.lib.webview_return
self.webview_return.argtypes = [c_void_p, c_char_p, c_int, c_char_p]
self.webview_get_native_handle = self.lib.webview_get_native_handle
self.webview_get_native_handle.argtypes = [c_void_p, c_int]
self.webview_get_native_handle.restype = c_void_p
self.binding_callback_t = CFUNCTYPE(None, c_char_p, c_char_p, c_void_p)
self.CFUNCTYPE = CFUNCTYPE
_webview_lib = _WebviewLibrary()

View File

@@ -0,0 +1,300 @@
import functools
import json
import logging
import platform
import threading
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import IntEnum
from time import sleep
from typing import TYPE_CHECKING, Any
from clan_lib.api import MethodRegistry, message_queue
from clan_lib.api.tasks import WebThread
from ._webview_ffi import (
_encode_c_string,
_webview_lib,
)
from .webview_bridge import WebviewBridge
if TYPE_CHECKING:
from clan_app.middleware.base import Middleware
log = logging.getLogger(__name__)
class SizeHint(IntEnum):
NONE = 0
MIN = 1
MAX = 2
FIXED = 3
class FuncStatus(IntEnum):
SUCCESS = 0
FAILURE = 1
class NativeHandleKind(IntEnum):
# Top-level window. @c GtkWindow pointer (GTK), @c NSWindow pointer (Cocoa)
# or @c HWND (Win32)
UI_WINDOW = 0
# Browser widget. @c GtkWidget pointer (GTK), @c NSView pointer (Cocoa) or
# @c HWND (Win32).
UI_WIDGET = 1
# Browser controller. @c WebKitWebView pointer (WebKitGTK), @c WKWebView
# pointer (Cocoa/WebKit) or @c ICoreWebView2Controller pointer
# (Win32/WebView2).
BROWSER_CONTROLLER = 2
@dataclass(frozen=True)
class Size:
width: int
height: int
hint: SizeHint
@dataclass
class Webview:
title: str
debug: bool = False
size: Size | None = None
window: int | None = None
shared_threads: dict[str, WebThread] | None = None
app_id: str | None = None
# initialized later
_bridge: WebviewBridge | None = None
_handle: Any | None = None
__callbacks: dict[str, Callable[..., Any]] = field(default_factory=dict)
_middleware: list["Middleware"] = field(default_factory=list)
@property
def callbacks(self) -> dict[str, Callable[..., Any]]:
return self.__callbacks
@callbacks.setter
def callbacks(self, value: dict[str, Callable[..., Any]]) -> None:
del value # Unused
msg = "Cannot set callbacks directly"
raise AttributeError(msg)
def delete_callback(self, name: str) -> None:
if name in self.callbacks:
del self.__callbacks[name]
else:
msg = f"Callback {name} does not exist. Cannot delete."
raise RuntimeError(msg)
def add_callback(self, name: str, callback: Callable[..., Any]) -> None:
if name in self.callbacks:
msg = f"Callback {name} already exists. Cannot add."
raise RuntimeError(msg)
self.__callbacks[name] = callback
def _create_handle(self) -> None:
# Initialize the webview handle
with_debugger = True
# Use webview_create_with_app_id only on Linux if app_id is provided
if self.app_id and platform.system() == "Linux":
handle = _webview_lib.webview_create_with_app_id(
int(with_debugger), self.window, _encode_c_string(self.app_id)
)
else:
handle = _webview_lib.webview_create(int(with_debugger), self.window)
# Since we can't use object.__setattr__, we'll initialize differently
# by storing in __dict__ directly (this works for init=False fields)
self._handle = handle
if self.title:
self.set_title(self.title)
if self.size:
self.set_size(self.size)
def __post_init__(self) -> None:
self.setup_notify() # Start the notification loop
def setup_notify(self) -> None:
def loop() -> None:
while True:
try:
msg = message_queue.get() # Blocks until available
js_code = f"window.notifyBus({json.dumps(msg)});"
self.eval(js_code)
except (json.JSONDecodeError, RuntimeError, AttributeError) as e:
print("Bridge notify error:", e)
sleep(0.01) # avoid busy loop
threading.Thread(target=loop, daemon=True).start()
@property
def handle(self) -> Any:
"""Get the webview handle, creating it if necessary."""
if self._handle is None:
self._create_handle()
return self._handle
@property
def bridge(self) -> "WebviewBridge":
"""Get the bridge, creating it if necessary."""
if self._bridge is None:
self.create_bridge()
if self._bridge is None:
msg = "Bridge should be created"
raise RuntimeError(msg)
return self._bridge
def api_wrapper(
self,
method_name: str,
op_key_bytes: bytes,
request_data: bytes,
arg: int,
) -> None:
"""Legacy API wrapper - delegates to the bridge."""
del arg # Unused but required for C callback signature
self.bridge.handle_webview_call(
method_name=method_name,
op_key_bytes=op_key_bytes,
request_data=request_data,
)
@property
def threads(self) -> dict[str, WebThread]:
"""Access threads from the bridge for compatibility."""
return self.bridge.threads
def add_middleware(self, middleware: "Middleware") -> None:
"""Add middleware to the middleware chain."""
if self._bridge is not None:
msg = "Cannot add middleware after bridge creation."
raise RuntimeError(msg)
self._middleware.append(middleware)
def create_bridge(self) -> WebviewBridge:
"""Create and initialize the WebviewBridge with current middleware."""
# Use shared_threads if provided, otherwise let WebviewBridge use its default
if self.shared_threads is not None:
bridge = WebviewBridge(
webview=self,
middleware_chain=tuple(self._middleware),
threads=self.shared_threads,
)
else:
bridge = WebviewBridge(
webview=self,
middleware_chain=tuple(self._middleware),
threads={},
)
self._bridge = bridge
return bridge
# Legacy methods for compatibility
def set_size(self, value: Size) -> None:
"""Set the webview size (legacy compatibility)."""
_webview_lib.webview_set_size(
self.handle,
value.width,
value.height,
value.hint,
)
def set_title(self, value: str) -> None:
"""Set the webview title (legacy compatibility)."""
_webview_lib.webview_set_title(self.handle, _encode_c_string(value))
def destroy(self) -> None:
"""Destroy the webview."""
for name in list(self.callbacks.keys()):
self.unbind(name)
_webview_lib.webview_terminate(self.handle)
_webview_lib.webview_destroy(self.handle)
# Can't set _handle to None on frozen dataclass
def navigate(self, url: str) -> None:
"""Navigate to a URL."""
_webview_lib.webview_navigate(self.handle, _encode_c_string(url))
def run(self) -> None:
"""Run the webview."""
_webview_lib.webview_run(self.handle)
log.info("Shutting down webview...")
self.destroy()
def bind_jsonschema_api(self, api: MethodRegistry) -> None:
for name in api.functions:
wrapper = functools.partial(
self.api_wrapper,
name,
)
c_callback = _webview_lib.binding_callback_t(wrapper)
self.add_callback(name, c_callback)
_webview_lib.webview_bind(
self.handle,
_encode_c_string(name),
c_callback,
None,
)
def bind(self, name: str, callback: Callable[..., Any]) -> None:
def wrapper(seq: bytes, req: bytes, _arg: int) -> None:
args = json.loads(req.decode())
try:
result = callback(*args)
success = True
except Exception as e: # noqa: BLE001
result = str(e)
success = False
self.return_(seq.decode(), 0 if success else 1, json.dumps(result))
c_callback = _webview_lib.binding_callback_t(wrapper)
self.add_callback(name, c_callback)
_webview_lib.webview_bind(self.handle, _encode_c_string(name), c_callback, None)
def get_native_handle(
self, kind: NativeHandleKind = NativeHandleKind.UI_WINDOW
) -> int | None:
"""Get the native handle (platform-dependent).
Args:
kind: Handle kind - NativeHandleKind enum value
Returns:
Native handle as integer, or None if failed
"""
handle = _webview_lib.webview_get_native_handle(self.handle, kind.value)
return handle if handle else None
def unbind(self, name: str) -> None:
if name in self.callbacks:
_webview_lib.webview_unbind(self.handle, _encode_c_string(name))
self.delete_callback(name)
def return_(self, seq: str, status: int, result: str) -> None:
_webview_lib.webview_return(
self.handle,
_encode_c_string(seq),
status,
_encode_c_string(result),
)
def eval(self, source: str) -> None:
_webview_lib.webview_eval(self.handle, _encode_c_string(source))
if __name__ == "__main__":
wv = Webview(title="Hello, World!")
wv.navigate("https://www.google.com")
wv.run()

View File

@@ -0,0 +1,94 @@
import json
import logging
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from clan_lib.api import dataclass_to_dict
from clan_lib.api.tasks import WebThread
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
if TYPE_CHECKING:
from clan_app.middleware.base import Middleware
from .webview import Webview
log = logging.getLogger(__name__)
@dataclass
class WebviewBridge(ApiBridge):
"""Webview-specific implementation of the API bridge."""
webview: "Webview"
middleware_chain: tuple["Middleware", ...]
threads: dict[str, WebThread] = field(default_factory=dict)
def send_api_response(self, response: BackendResponse) -> None:
"""Send response back to the webview client."""
serialized = json.dumps(
dataclass_to_dict(response),
indent=4,
ensure_ascii=False,
)
log.debug(f"Sending response: {serialized}")
# Import FuncStatus locally to avoid circular import
from .webview import FuncStatus # noqa: PLC0415
self.webview.return_(response._op_key, FuncStatus.SUCCESS, serialized) # noqa: SLF001
def handle_webview_call(
self,
method_name: str,
op_key_bytes: bytes,
request_data: bytes,
) -> None:
"""Handle a call from webview's JavaScript bridge."""
try:
webview_op_key = op_key_bytes.decode()
raw_args = json.loads(request_data.decode())
# Parse the webview-specific request format
header = {}
args = {}
if len(raw_args) == 1:
request = raw_args[0]
header = request.get("header", {})
if not isinstance(header, dict):
msg = f"Expected header to be a dict, got {type(header)}"
raise TypeError(msg) # noqa: TRY301
body = request.get("body", {})
if not isinstance(body, dict):
msg = f"Expected body to be a dict, got {type(body)}"
raise TypeError(msg) # noqa: TRY301
args = body
elif len(raw_args) > 1:
msg = "Expected a single argument, got multiple arguments"
raise ValueError(msg) # noqa: TRY301
# Create API request
api_request = BackendRequest(
method_name=method_name,
args=args,
header=header,
op_key=webview_op_key,
)
except Exception as e:
msg = f"Error while handling webview call {method_name} with op_key {webview_op_key}"
log.exception(msg)
self.send_api_error_response(
webview_op_key,
str(e),
["webview_bridge", method_name],
)
return
# Process in a separate thread using the inherited method
self.process_request_in_thread(
api_request,
thread_name=f"WebviewThread-{method_name}",
wait_for_completion=False,
)