clan-app: Generalize architecture for API requests

This commit is contained in:
Qubasa
2025-07-08 16:48:55 +07:00
parent 6d8fd42faa
commit eb6166796c
11 changed files with 261 additions and 231 deletions

View File

@@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
@dataclass(frozen=True) @dataclass(frozen=True)
class ApiRequest: class BackendRequest:
method_name: str method_name: str
args: dict[str, Any] args: dict[str, Any]
header: dict[str, Any] header: dict[str, Any]
@@ -19,11 +19,10 @@ class ApiRequest:
@dataclass(frozen=True) @dataclass(frozen=True)
class ApiResponse: class BackendResponse:
op_key: str body: Any
success: bool header: dict[str, Any]
data: Any _op_key: str
error: str | None = None
@dataclass @dataclass
@@ -33,10 +32,10 @@ class ApiBridge(ABC):
middleware_chain: tuple["Middleware", ...] middleware_chain: tuple["Middleware", ...]
@abstractmethod @abstractmethod
def send_response(self, response: ApiResponse) -> None: def send_response(self, response: BackendResponse) -> None:
"""Send response back to the client.""" """Send response back to the client."""
def process_request(self, request: ApiRequest) -> None: def process_request(self, request: BackendRequest) -> None:
"""Process an API request through the middleware chain.""" """Process an API request through the middleware chain."""
from .middleware import MiddlewareContext from .middleware import MiddlewareContext
@@ -79,8 +78,10 @@ class ApiBridge(ABC):
], ],
) )
response = ApiResponse( response = BackendResponse(
op_key=op_key, success=False, data=error_data, error=error_message body=error_data,
header={},
_op_key=op_key,
) )
self.send_response(response) self.send_response(response)

View File

@@ -0,0 +1,14 @@
"""Middleware components for the webview API bridge."""
from .argument_parsing import ArgumentParsingMiddleware
from .base import Middleware, MiddlewareContext
from .logging import LoggingMiddleware
from .method_execution import MethodExecutionMiddleware
__all__ = [
"ArgumentParsingMiddleware",
"LoggingMiddleware",
"MethodExecutionMiddleware",
"Middleware",
"MiddlewareContext",
]

View File

@@ -0,0 +1,55 @@
import logging
from dataclasses import dataclass
from clan_lib.api import MethodRegistry, from_dict
from clan_app.api.api_bridge import BackendRequest
from .base import Middleware, MiddlewareContext
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class ArgumentParsingMiddleware(Middleware):
"""Middleware that handles argument parsing and dataclass construction."""
api: MethodRegistry
def process(self, context: MiddlewareContext) -> None:
try:
# Convert dictionary arguments to dataclass instances
reconciled_arguments = {}
for k, v in context.request.args.items():
if k == "op_key":
continue
# Get the expected argument type from the API
arg_class = self.api.get_method_argtype(context.request.method_name, k)
# Convert dictionary to dataclass instance
reconciled_arguments[k] = from_dict(arg_class, v)
# Add op_key to arguments
reconciled_arguments["op_key"] = context.request.op_key
# Create a new request with reconciled arguments
updated_request = BackendRequest(
method_name=context.request.method_name,
args=reconciled_arguments,
header=context.request.header,
op_key=context.request.op_key,
)
context.request = updated_request
except Exception as e:
log.exception(
f"Error while parsing arguments for {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["argument_parsing", context.request.method_name],
)
raise

View File

@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, ExitStack
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from clan_app.api.api_bridge import ApiBridge, BackendRequest
@dataclass
class MiddlewareContext:
request: "BackendRequest"
bridge: "ApiBridge"
exit_stack: ExitStack
@dataclass(frozen=True)
class Middleware(ABC):
"""Abstract base class for middleware components."""
@abstractmethod
def process(self, context: MiddlewareContext) -> None:
"""Process the request through this middleware."""
def register_context_manager(
self, context: MiddlewareContext, cm: AbstractContextManager[Any]
) -> Any:
"""Register a context manager with the exit stack."""
return context.exit_stack.enter_context(cm)

View File

@@ -0,0 +1,99 @@
import io
import logging
import types
from dataclasses import dataclass
from typing import Any
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import RegisteredHandler, setup_logging
from clan_lib.log_manager import LogManager
from .base import Middleware, MiddlewareContext
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class LoggingMiddleware(Middleware):
"""Middleware that sets up logging context without executing methods."""
log_manager: LogManager
def process(self, context: MiddlewareContext) -> None:
method = context.request.method_name
try:
# Handle log group configuration
log_group: list[str] | None = context.request.header.get("logging", {}).get(
"group_path", None
)
if log_group is not None:
if not isinstance(log_group, list):
msg = f"Expected log_group to be a list, got {type(log_group)}"
raise TypeError(msg)
log.warning(
f"Using log group {log_group} for {context.request.method_name} with op_key {context.request.op_key}"
)
# Create log file
log_file = self.log_manager.create_log_file(
method, op_key=context.request.op_key, group_path=log_group
).get_file_path()
except Exception as e:
log.exception(
f"Error while handling request header of {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["header_middleware", context.request.method_name],
)
return
# Register logging context manager
class LoggingContextManager:
def __init__(self, log_file: Any) -> None:
self.log_file = log_file
self.log_f: Any = None
self.handler: RegisteredHandler | None = None
self.original_ctx: AsyncContext | None = None
def __enter__(self) -> "LoggingContextManager":
self.log_f = self.log_file.open("ab")
self.original_ctx = get_async_ctx()
# Set up async context for logging
ctx = AsyncContext(**self.original_ctx.__dict__)
ctx.stderr = self.log_f
ctx.stdout = self.log_f
set_async_ctx(ctx)
# Set up logging handler
handler_stream = io.TextIOWrapper(
self.log_f, # type: ignore[arg-type]
encoding="utf-8",
write_through=True,
line_buffering=True,
)
self.handler = setup_logging(
log.getEffectiveLevel(), log_file=handler_stream
)
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: types.TracebackType | None,
) -> None:
if self.handler:
self.handler.root_logger.removeHandler(self.handler.new_handler)
self.handler.new_handler.close()
if self.log_f:
self.log_f.close()
if self.original_ctx:
set_async_ctx(self.original_ctx)
# Register the logging context manager
self.register_context_manager(context, LoggingContextManager(log_file))

View File

@@ -0,0 +1,41 @@
import logging
from dataclasses import dataclass
from clan_lib.api import MethodRegistry
from clan_app.api.api_bridge import BackendResponse
from .base import Middleware, MiddlewareContext
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class MethodExecutionMiddleware(Middleware):
"""Middleware that handles actual method execution."""
api: MethodRegistry
def process(self, context: MiddlewareContext) -> None:
method = self.api.functions[context.request.method_name]
try:
# Execute the actual method
result = method(**context.request.args)
response = BackendResponse(
body=result,
header={},
_op_key=context.request.op_key,
)
context.bridge.send_response(response)
except Exception as e:
log.exception(
f"Error while handling result of {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["method_execution", context.request.method_name],
)

View File

@@ -11,7 +11,7 @@ from clan_lib.log_manager import LogGroupConfig, LogManager
from clan_lib.log_manager import api as log_manager_api from clan_lib.log_manager import api as log_manager_api
from clan_app.api.file_gtk import open_file from clan_app.api.file_gtk import open_file
from clan_app.deps.webview.middleware import ( from clan_app.api.middleware import (
ArgumentParsingMiddleware, ArgumentParsingMiddleware,
LoggingMiddleware, LoggingMiddleware,
MethodExecutionMiddleware, MethodExecutionMiddleware,

View File

@@ -1,201 +0,0 @@
import io
import json
import logging
from abc import ABC, abstractmethod
from contextlib import ExitStack
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, ContextManager
from clan_lib.api import MethodRegistry, dataclass_to_dict, from_dict
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import setup_logging
from clan_lib.log_manager import LogManager
from .api_bridge import ApiRequest, ApiResponse
if TYPE_CHECKING:
from .api_bridge import ApiBridge, ApiRequest
log = logging.getLogger(__name__)
@dataclass
class MiddlewareContext:
request: "ApiRequest"
bridge: "ApiBridge"
exit_stack: ExitStack
@dataclass(frozen=True)
class Middleware(ABC):
"""Abstract base class for middleware components."""
@abstractmethod
def process(self, context: MiddlewareContext) -> None:
"""Process the request through this middleware."""
def register_context_manager(
self, context: MiddlewareContext, cm: ContextManager[Any]
) -> Any:
"""Register a context manager with the exit stack."""
return context.exit_stack.enter_context(cm)
@dataclass(frozen=True)
class ArgumentParsingMiddleware(Middleware):
"""Middleware that handles argument parsing and dataclass construction."""
api: MethodRegistry
def process(self, context: MiddlewareContext) -> None:
try:
# Convert dictionary arguments to dataclass instances
reconciled_arguments = {}
for k, v in context.request.args.items():
if k == "op_key":
continue
# Get the expected argument type from the API
arg_class = self.api.get_method_argtype(context.request.method_name, k)
# Convert dictionary to dataclass instance
reconciled_arguments[k] = from_dict(arg_class, v)
# Add op_key to arguments
reconciled_arguments["op_key"] = context.request.op_key
# Create a new request with reconciled arguments
updated_request = ApiRequest(
method_name=context.request.method_name,
args=reconciled_arguments,
header=context.request.header,
op_key=context.request.op_key,
)
context.request = updated_request
except Exception as e:
log.exception(
f"Error while parsing arguments for {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["argument_parsing", context.request.method_name],
)
raise
@dataclass(frozen=True)
class LoggingMiddleware(Middleware):
"""Middleware that sets up logging context without executing methods."""
log_manager: LogManager
def process(self, context: MiddlewareContext) -> None:
method = context.request.method_name
try:
# Handle log group configuration
log_group: list[str] | None = context.request.header.get("logging", {}).get(
"group_path", None
)
if log_group is not None:
if not isinstance(log_group, list):
msg = f"Expected log_group to be a list, got {type(log_group)}"
raise TypeError(msg)
log.warning(
f"Using log group {log_group} for {context.request.method_name} with op_key {context.request.op_key}"
)
# Create log file
log_file = self.log_manager.create_log_file(
method, op_key=context.request.op_key, group_path=log_group
).get_file_path()
except Exception as e:
log.exception(
f"Error while handling request header of {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["header_middleware", context.request.method_name],
)
return
# Register logging context manager
class LoggingContextManager:
def __init__(self, log_file) -> None:
self.log_file = log_file
self.log_f = None
self.handler = None
self.original_ctx = None
def __enter__(self):
self.log_f = self.log_file.open("ab")
self.original_ctx = get_async_ctx()
# Set up async context for logging
ctx = AsyncContext(**self.original_ctx.__dict__)
ctx.stderr = self.log_f
ctx.stdout = self.log_f
set_async_ctx(ctx)
# Set up logging handler
handler_stream = io.TextIOWrapper(
self.log_f,
encoding="utf-8",
write_through=True,
line_buffering=True,
)
self.handler = setup_logging(
log.getEffectiveLevel(), log_file=handler_stream
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.handler:
self.handler.root_logger.removeHandler(self.handler.new_handler)
self.handler.new_handler.close()
if self.log_f:
self.log_f.close()
if self.original_ctx:
set_async_ctx(self.original_ctx)
# Register the logging context manager
self.register_context_manager(context, LoggingContextManager(log_file))
@dataclass(frozen=True)
class MethodExecutionMiddleware(Middleware):
"""Middleware that handles actual method execution."""
api: MethodRegistry
def process(self, context: MiddlewareContext) -> None:
method = self.api.functions[context.request.method_name]
try:
# Execute the actual method
result = method(**context.request.args)
wrapped_result = {"body": dataclass_to_dict(result), "header": {}}
log.debug(
f"Result for {context.request.method_name}: {json.dumps(dataclass_to_dict(wrapped_result), indent=4)}"
)
response = ApiResponse(
op_key=context.request.op_key, success=True, data=wrapped_result
)
context.bridge.send_response(response)
except Exception as e:
log.exception(
f"Error while handling result of {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["method_execution", context.request.method_name],
)

View File

@@ -13,7 +13,8 @@ from clan_lib.log_manager import LogManager
from ._webview_ffi import _encode_c_string, _webview_lib from ._webview_ffi import _encode_c_string, _webview_lib
if TYPE_CHECKING: if TYPE_CHECKING:
from .middleware import Middleware from clan_app.api.middleware import Middleware
from .webview_bridge import WebviewBridge from .webview_bridge import WebviewBridge
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View File

@@ -8,7 +8,8 @@ from clan_lib.api import dataclass_to_dict
from clan_lib.api.tasks import WebThread from clan_lib.api.tasks import WebThread
from clan_lib.async_run import set_should_cancel from clan_lib.async_run import set_should_cancel
from .api_bridge import ApiBridge, ApiRequest, ApiResponse from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
from .webview import FuncStatus from .webview import FuncStatus
if TYPE_CHECKING: if TYPE_CHECKING:
@@ -24,24 +25,15 @@ class WebviewBridge(ApiBridge):
webview: "Webview" webview: "Webview"
threads: dict[str, WebThread] = field(default_factory=dict) threads: dict[str, WebThread] = field(default_factory=dict)
def send_response(self, response: ApiResponse) -> None: def send_response(self, response: BackendResponse) -> None:
"""Send response back to the webview client.""" """Send response back to the webview client."""
if response.success:
serialized = json.dumps( serialized = json.dumps(
dataclass_to_dict(response.data), indent=4, ensure_ascii=False dataclass_to_dict(response), indent=4, ensure_ascii=False
) )
status = FuncStatus.SUCCESS
else:
serialized = json.dumps(
dataclass_to_dict(response.data), indent=4, ensure_ascii=False
)
status = FuncStatus.SUCCESS # Even errors are sent as SUCCESS to webview
log.debug( log.debug(f"Sending response: {serialized}")
f"Sending response for op_key {response.op_key} with status {status.name} and data: {serialized}" self.webview.return_(response._op_key, FuncStatus.SUCCESS, serialized) # noqa: SLF001
)
self.webview.return_(response.op_key, status, serialized)
def handle_webview_call( def handle_webview_call(
self, self,
@@ -77,7 +69,7 @@ class WebviewBridge(ApiBridge):
raise ValueError(msg) raise ValueError(msg)
# Create API request # Create API request
api_request = ApiRequest( api_request = BackendRequest(
method_name=method_name, args=args, header=header, op_key=op_key method_name=method_name, args=args, header=header, op_key=op_key
) )

View File

@@ -38,7 +38,6 @@ def long_blocking_task(somearg: str) -> str:
time.sleep(1) time.sleep(1)
ctx = get_async_ctx() ctx = get_async_ctx()
log.debug(f"Thread ID: {threading.get_ident()}") log.debug(f"Thread ID: {threading.get_ident()}")
for i in range(30): for i in range(30):
if is_async_cancelled(): if is_async_cancelled():
log.debug("Task was cancelled") log.debug("Task was cancelled")