clan-app: Moved thread handling up to the ApiBridge
This commit is contained in:
@@ -23,9 +23,6 @@
|
||||
},
|
||||
{
|
||||
"path": "../clan-cli/clan_lib"
|
||||
},
|
||||
{
|
||||
"path": "ui-2d"
|
||||
}
|
||||
],
|
||||
"settings": {
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
import logging
|
||||
import threading
|
||||
from abc import ABC, abstractmethod
|
||||
from contextlib import ExitStack
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from clan_lib.api import ApiResponse
|
||||
from clan_lib.api.tasks import WebThread
|
||||
from clan_lib.async_run import set_should_cancel
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .middleware import Middleware
|
||||
@@ -32,6 +35,7 @@ class ApiBridge(ABC):
|
||||
"""Generic interface for API bridges that can handle method calls from different sources."""
|
||||
|
||||
middleware_chain: tuple["Middleware", ...]
|
||||
threads: dict[str, WebThread] = field(default_factory=dict)
|
||||
|
||||
@abstractmethod
|
||||
def send_api_response(self, response: BackendResponse) -> None:
|
||||
@@ -87,3 +91,50 @@ class ApiBridge(ABC):
|
||||
)
|
||||
|
||||
self.send_api_response(response)
|
||||
|
||||
def process_request_in_thread(
|
||||
self,
|
||||
request: BackendRequest,
|
||||
*,
|
||||
thread_name: str = "ApiBridgeThread",
|
||||
wait_for_completion: bool = False,
|
||||
timeout: float = 60.0,
|
||||
) -> None:
|
||||
"""Process an API request in a separate thread with cancellation support.
|
||||
|
||||
Args:
|
||||
request: The API request to process
|
||||
thread_name: Name for the thread (for debugging)
|
||||
wait_for_completion: Whether to wait for the thread to complete
|
||||
timeout: Timeout in seconds when waiting for completion
|
||||
"""
|
||||
op_key = request.op_key or "unknown"
|
||||
|
||||
def thread_task(stop_event: threading.Event) -> None:
|
||||
set_should_cancel(lambda: stop_event.is_set())
|
||||
try:
|
||||
log.debug(
|
||||
f"Processing {request.method_name} with args {request.args} "
|
||||
f"and header {request.header} in thread {thread_name}"
|
||||
)
|
||||
self.process_request(request)
|
||||
finally:
|
||||
self.threads.pop(op_key, None)
|
||||
|
||||
stop_event = threading.Event()
|
||||
thread = threading.Thread(
|
||||
target=thread_task, args=(stop_event,), name=thread_name
|
||||
)
|
||||
thread.start()
|
||||
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
|
||||
|
||||
if wait_for_completion:
|
||||
# Wait for the thread to complete (this blocks until response is sent)
|
||||
thread.join(timeout=timeout)
|
||||
|
||||
# Handle timeout
|
||||
if thread.is_alive():
|
||||
stop_event.set() # Cancel the thread
|
||||
self.send_api_error_response(
|
||||
op_key, "Request timeout", ["api_bridge", request.method_name]
|
||||
)
|
||||
|
||||
@@ -28,7 +28,7 @@ def open_file(
|
||||
GLib.idle_add(gtk_open_file, file_request, op_key)
|
||||
|
||||
while RESULT.get(op_key) is None:
|
||||
time.sleep(0.2)
|
||||
time.sleep(0.1)
|
||||
response = RESULT[op_key]
|
||||
del RESULT[op_key]
|
||||
return response
|
||||
|
||||
@@ -58,6 +58,10 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
||||
load_in_all_api_functions()
|
||||
API.overwrite_fn(open_file)
|
||||
|
||||
# Create a shared threads dictionary for both HTTP and Webview modes
|
||||
shared_threads = {}
|
||||
tasks.BAKEND_THREADS = shared_threads
|
||||
|
||||
# Start HTTP API server if requested
|
||||
http_server = None
|
||||
if app_opts.http_api:
|
||||
@@ -72,6 +76,7 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
||||
swagger_dist=Path(swagger_dist) if swagger_dist else None,
|
||||
host=app_opts.http_host,
|
||||
port=app_opts.http_port,
|
||||
shared_threads=shared_threads,
|
||||
)
|
||||
|
||||
# Add middleware to HTTP server
|
||||
@@ -103,7 +108,10 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
||||
# Create webview if not running in HTTP-only mode
|
||||
if not app_opts.http_api:
|
||||
webview = Webview(
|
||||
debug=app_opts.debug, title="Clan App", size=Size(1280, 1024, SizeHint.NONE)
|
||||
debug=app_opts.debug,
|
||||
title="Clan App",
|
||||
size=Size(1280, 1024, SizeHint.NONE),
|
||||
shared_threads=shared_threads,
|
||||
)
|
||||
|
||||
# Add middleware to the webview
|
||||
@@ -111,12 +119,6 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
||||
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
|
||||
webview.add_middleware(MethodExecutionMiddleware(api=API))
|
||||
|
||||
# Create the bridge
|
||||
webview.create_bridge()
|
||||
|
||||
# Init BAKEND_THREADS global in tasks module
|
||||
tasks.BAKEND_THREADS = webview.threads
|
||||
|
||||
webview.bind_jsonschema_api(API, log_manager=log_manager)
|
||||
webview.navigate(content_uri)
|
||||
webview.run()
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import uuid
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from pathlib import Path
|
||||
@@ -9,7 +8,6 @@ from urllib.parse import urlparse
|
||||
|
||||
from clan_lib.api import MethodRegistry, SuccessDataClass, dataclass_to_dict
|
||||
from clan_lib.api.tasks import WebThread
|
||||
from clan_lib.async_run import set_should_cancel
|
||||
|
||||
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
|
||||
|
||||
@@ -35,11 +33,12 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
|
||||
*,
|
||||
openapi_file: Path | None = None,
|
||||
swagger_dist: Path | None = None,
|
||||
shared_threads: dict[str, WebThread] | None = None,
|
||||
) -> None:
|
||||
# Initialize API bridge fields
|
||||
self.api = api
|
||||
self.middleware_chain = middleware_chain
|
||||
self.threads: dict[str, WebThread] = {}
|
||||
self.threads = shared_threads if shared_threads is not None else {}
|
||||
|
||||
# Initialize OpenAPI/Swagger fields
|
||||
self.openapi_file = openapi_file
|
||||
@@ -329,31 +328,13 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
|
||||
self, api_request: BackendRequest, method_name: str
|
||||
) -> None:
|
||||
"""Process the API request in a separate thread."""
|
||||
op_key = api_request.op_key or "unknown"
|
||||
|
||||
def thread_task(stop_event: threading.Event) -> None:
|
||||
set_should_cancel(lambda: stop_event.is_set())
|
||||
try:
|
||||
self.process_request(api_request)
|
||||
finally:
|
||||
self.threads.pop(op_key, None)
|
||||
|
||||
stop_event = threading.Event()
|
||||
thread = threading.Thread(
|
||||
target=thread_task, args=(stop_event,), name="HttpThread"
|
||||
# Use the inherited thread processing method
|
||||
self.process_request_in_thread(
|
||||
api_request,
|
||||
thread_name="HttpThread",
|
||||
wait_for_completion=True,
|
||||
timeout=60.0,
|
||||
)
|
||||
thread.start()
|
||||
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
|
||||
|
||||
# Wait for the thread to complete (this blocks until response is sent)
|
||||
thread.join(timeout=60.0)
|
||||
|
||||
# Handle timeout
|
||||
if thread.is_alive():
|
||||
stop_event.set() # Cancel the thread
|
||||
self.send_api_error_response(
|
||||
op_key, "Request timeout", ["http_bridge", method_name]
|
||||
)
|
||||
|
||||
def log_message(self, format: str, *args: Any) -> None: # noqa: A002
|
||||
"""Override default logging to use our logger."""
|
||||
|
||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from clan_lib.api import MethodRegistry
|
||||
from clan_lib.api.tasks import WebThread
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from clan_app.api.middleware import Middleware
|
||||
@@ -24,6 +25,7 @@ class HttpApiServer:
|
||||
port: int = 8080,
|
||||
openapi_file: Path | None = None,
|
||||
swagger_dist: Path | None = None,
|
||||
shared_threads: dict[str, WebThread] | None = None,
|
||||
) -> None:
|
||||
self.api = api
|
||||
self.openapi = openapi_file
|
||||
@@ -34,6 +36,7 @@ class HttpApiServer:
|
||||
self._server_thread: threading.Thread | None = None
|
||||
# Bridge is now the request handler itself, no separate instance needed
|
||||
self._middleware: list[Middleware] = []
|
||||
self.shared_threads = shared_threads or {}
|
||||
|
||||
def add_middleware(self, middleware: "Middleware") -> None:
|
||||
"""Add middleware to the middleware chain."""
|
||||
@@ -58,6 +61,7 @@ class HttpApiServer:
|
||||
middleware_chain = tuple(self._middleware)
|
||||
openapi_file = self.openapi
|
||||
swagger_dist = self.swagger_dist
|
||||
shared_threads = self.shared_threads
|
||||
|
||||
class RequestHandler(HttpBridge):
|
||||
def __init__(self, request: Any, client_address: Any, server: Any) -> None:
|
||||
@@ -69,6 +73,7 @@ class HttpApiServer:
|
||||
server=server,
|
||||
openapi_file=openapi_file,
|
||||
swagger_dist=swagger_dist,
|
||||
shared_threads=shared_threads,
|
||||
)
|
||||
|
||||
return RequestHandler
|
||||
|
||||
@@ -45,6 +45,7 @@ class Webview:
|
||||
debug: bool = False
|
||||
size: Size | None = None
|
||||
window: int | None = None
|
||||
shared_threads: dict[str, WebThread] | None = None
|
||||
|
||||
# initialized later
|
||||
_bridge: "WebviewBridge | None" = None
|
||||
@@ -116,7 +117,17 @@ class Webview:
|
||||
"""Create and initialize the WebviewBridge with current middleware."""
|
||||
from .webview_bridge import WebviewBridge
|
||||
|
||||
bridge = WebviewBridge(webview=self, middleware_chain=tuple(self._middleware))
|
||||
# Use shared_threads if provided, otherwise let WebviewBridge use its default
|
||||
if self.shared_threads is not None:
|
||||
bridge = WebviewBridge(
|
||||
webview=self,
|
||||
middleware_chain=tuple(self._middleware),
|
||||
threads=self.shared_threads,
|
||||
)
|
||||
else:
|
||||
bridge = WebviewBridge(
|
||||
webview=self, middleware_chain=tuple(self._middleware)
|
||||
)
|
||||
self._bridge = bridge
|
||||
return bridge
|
||||
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from clan_lib.api import dataclass_to_dict
|
||||
from clan_lib.api.tasks import WebThread
|
||||
from clan_lib.async_run import set_should_cancel
|
||||
|
||||
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
|
||||
|
||||
@@ -23,7 +21,7 @@ class WebviewBridge(ApiBridge):
|
||||
"""Webview-specific implementation of the API bridge."""
|
||||
|
||||
webview: "Webview"
|
||||
threads: dict[str, WebThread] = field(default_factory=dict)
|
||||
threads: dict[str, WebThread] # Inherited from ApiBridge
|
||||
|
||||
def send_api_response(self, response: BackendResponse) -> None:
|
||||
"""Send response back to the webview client."""
|
||||
@@ -84,21 +82,9 @@ class WebviewBridge(ApiBridge):
|
||||
)
|
||||
return
|
||||
|
||||
# Process in a separate thread
|
||||
def thread_task(stop_event: threading.Event) -> None:
|
||||
set_should_cancel(lambda: stop_event.is_set())
|
||||
|
||||
try:
|
||||
log.debug(
|
||||
f"Calling {method_name}({json.dumps(api_request.args, indent=4)}) with header {json.dumps(api_request.header, indent=4)} and op_key {op_key}"
|
||||
)
|
||||
self.process_request(api_request)
|
||||
finally:
|
||||
self.threads.pop(op_key, None)
|
||||
|
||||
stop_event = threading.Event()
|
||||
thread = threading.Thread(
|
||||
target=thread_task, args=(stop_event,), name="WebviewThread"
|
||||
# Process in a separate thread using the inherited method
|
||||
self.process_request_in_thread(
|
||||
api_request,
|
||||
thread_name="WebviewThread",
|
||||
wait_for_completion=False,
|
||||
)
|
||||
thread.start()
|
||||
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
|
||||
|
||||
Reference in New Issue
Block a user