clan-app: Add plug and play middleware interface

This commit is contained in:
Qubasa
2025-07-08 15:45:32 +07:00
parent 2b11cc64eb
commit 389e726521
5 changed files with 151 additions and 236 deletions

View File

@@ -4,6 +4,9 @@ from pathlib import Path
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
from clan_lib.api import ApiError, ErrorDataClass, SuccessDataClass
from clan_lib.api.directory import FileRequest
from gi.repository import Gio, GLib, Gtk

View File

@@ -1,13 +1,9 @@
import logging
from clan_cli.profiler import profile
log = logging.getLogger(__name__)
import os
from dataclasses import dataclass
from pathlib import Path
import clan_lib.machines.actions # noqa: F401
from clan_cli.profiler import profile
from clan_lib.api import API, load_in_all_api_functions, tasks
from clan_lib.custom_logger import setup_logging
from clan_lib.dirs import user_data_dir
@@ -15,8 +11,15 @@ from clan_lib.log_manager import LogGroupConfig, LogManager
from clan_lib.log_manager import api as log_manager_api
from clan_app.api.file_gtk import open_file
from clan_app.deps.webview.middleware import (
ArgumentParsingMiddleware,
LoggingMiddleware,
MethodExecutionMiddleware,
)
from clan_app.deps.webview.webview import Size, SizeHint, Webview
log = logging.getLogger(__name__)
@dataclass
class ClanAppOptions:
@@ -39,9 +42,6 @@ def app_run(app_opts: ClanAppOptions) -> int:
site_index: Path = Path(os.getenv("WEBUI_PATH", ".")).resolve() / "index.html"
content_uri = f"file://{site_index}"
webview = Webview(debug=app_opts.debug)
webview.title = "Clan App"
# Add a log group ["clans", <dynamic_name>, "machines", <dynamic_name>]
log_manager = LogManager(base_dir=user_data_dir() / "clan-app" / "logs")
clan_log_group = LogGroupConfig("clans", "Clans").add_child(
@@ -51,15 +51,23 @@ def app_run(app_opts: ClanAppOptions) -> int:
# Init LogManager global in log_manager_api module
log_manager_api.LOG_MANAGER_INSTANCE = log_manager
# Populate the API global with all functions
load_in_all_api_functions()
API.overwrite_fn(open_file)
webview = Webview(
debug=app_opts.debug, title="Clan App", size=Size(1280, 1024, SizeHint.NONE)
)
# Add middleware to the webview
webview.add_middleware(ArgumentParsingMiddleware(api=API))
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
webview.add_middleware(MethodExecutionMiddleware(api=API))
# Init BAKEND_THREADS global in tasks module
tasks.BAKEND_THREADS = webview.threads
# Populate the API global with all functions
load_in_all_api_functions()
API.overwrite_fn(open_file)
webview.bind_jsonschema_api(API, log_manager=log_manager_api.LOG_MANAGER_INSTANCE)
webview.size = Size(1280, 1024, SizeHint.NONE)
webview.bind_jsonschema_api(API, log_manager=log_manager)
webview.navigate(content_uri)
webview.run()
return 0

View File

@@ -1,27 +1,21 @@
# ruff: noqa: TRY301
import functools
import io
import json
import logging
import threading
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any
from typing import TYPE_CHECKING, Any
from clan_lib.api import (
ApiError,
ErrorDataClass,
MethodRegistry,
dataclass_to_dict,
from_dict,
)
from clan_lib.api import MethodRegistry
from clan_lib.api.tasks import WebThread
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import setup_logging
from clan_lib.log_manager import LogManager
from ._webview_ffi import _encode_c_string, _webview_lib
if TYPE_CHECKING:
from .middleware import Middleware
from .webview_bridge import WebviewBridge
log = logging.getLogger(__name__)
@@ -37,226 +31,120 @@ class FuncStatus(IntEnum):
FAILURE = 1
@dataclass(frozen=True)
class Size:
def __init__(self, width: int, height: int, hint: SizeHint) -> None:
self.width = width
self.height = height
self.hint = hint
width: int
height: int
hint: SizeHint
@dataclass
class Webview:
def __init__(
self, debug: bool = False, size: Size | None = None, window: int | None = None
) -> None:
self._handle = _webview_lib.webview_create(int(debug), window)
self._callbacks: dict[str, Callable[..., Any]] = {}
self.threads: dict[str, WebThread] = {}
title: str
debug: bool = False
size: Size | None = None
window: int | None = None
if size:
self.size = size
# initialized later
_bridge: "WebviewBridge | None" = None
_handle: Any | None = None
_callbacks: dict[str, Callable[..., Any]] = field(default_factory=dict)
_middleware: list["Middleware"] = field(default_factory=list)
def _create_handle(self) -> None:
# Initialize the webview handle
handle = _webview_lib.webview_create(int(self.debug), self.window)
callbacks: dict[str, Callable[..., Any]] = {}
# Since we can't use object.__setattr__, we'll initialize differently
# by storing in __dict__ directly (this works for init=False fields)
self._handle = handle
self._callbacks = callbacks
if self.title:
self.set_title(self.title)
if self.size:
self.set_size(self.size)
@property
def handle(self) -> Any:
"""Get the webview handle, creating it if necessary."""
if self._handle is None:
self._create_handle()
return self._handle
@property
def bridge(self) -> "WebviewBridge":
"""Get the bridge, creating it if necessary."""
if self._bridge is None:
self.create_bridge()
assert self._bridge is not None, "Bridge should be created"
return self._bridge
def api_wrapper(
self,
log_manager: LogManager,
api: MethodRegistry,
method_name: str,
wrap_method: Callable[..., Any],
op_key_bytes: bytes,
request_data: bytes,
arg: int,
) -> None:
op_key = op_key_bytes.decode()
args = json.loads(request_data.decode())
log.debug(f"Calling {method_name}({json.dumps(args, indent=4)})")
header: dict[str, Any]
try:
# Initialize dataclasses from the payload
reconciled_arguments = {}
if len(args) == 1:
request = args[0]
header = request.get("header", {})
msg = f"Expected header to be a dict, got {type(header)}"
if not isinstance(header, dict):
raise TypeError(msg)
body = request.get("body", {})
msg = f"Expected body to be a dict, got {type(body)}"
if not isinstance(body, dict):
raise TypeError(msg)
for k, v in body.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = api.get_method_argtype(method_name, k)
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
elif len(args) > 1:
msg = (
"Expected a single argument, got multiple arguments to api_wrapper"
)
raise ValueError(msg)
reconciled_arguments["op_key"] = op_key
except Exception as e:
log.exception(f"Error while parsing arguments for {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
return
def thread_task(stop_event: threading.Event) -> None:
ctx: AsyncContext = get_async_ctx()
ctx.should_cancel = lambda: stop_event.is_set()
try:
# If the API call has set log_group in metadata,
# create the log file under that group.
log_group: list[str] = header.get("logging", {}).get("group_path", None)
if log_group is not None:
if not isinstance(log_group, list):
msg = f"Expected log_group to be a list, got {type(log_group)}"
raise TypeError(msg)
log.warning(
f"Using log group {log_group} for {method_name} with op_key {op_key}"
)
log_file = log_manager.create_log_file(
wrap_method, op_key=op_key, group_path=log_group
).get_file_path()
except Exception as e:
log.exception(f"Error while handling request header of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["header_middleware", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
with log_file.open("ab") as log_f:
# Redirect all cmd.run logs to this file.
ctx.stderr = log_f
ctx.stdout = log_f
set_async_ctx(ctx)
# Add a new handler to the root logger that writes to log_f
handler_stream = io.TextIOWrapper(
log_f, encoding="utf-8", write_through=True, line_buffering=True
)
handler = setup_logging(
log.getEffectiveLevel(), log_file=handler_stream
)
try:
# Original logic: call the wrapped API method.
result = wrap_method(**reconciled_arguments)
wrapped_result = {"body": dataclass_to_dict(result), "header": {}}
# Serialize the result to JSON.
serialized = json.dumps(
dataclass_to_dict(wrapped_result), indent=4, ensure_ascii=False
)
# This log message will now also be written to log_f
# through the thread_log_handler.
log.debug(f"Result for {method_name}: {serialized}")
# Return the successful result.
self.return_(op_key, FuncStatus.SUCCESS, serialized)
except Exception as e:
log.exception(f"Error while handling result of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
finally:
# Crucial cleanup: remove the handler from the root logger.
# This stops redirecting logs for this thread to log_f and prevents
# the handler from being used after log_f is closed.
handler.root_logger.removeHandler(handler.new_handler)
# Close the handler. For a StreamHandler using a stream it doesn't
# own (log_f is managed by the 'with' statement), this typically
# flushes the stream.
handler.new_handler.close()
del self.threads[op_key]
stop_event = threading.Event()
thread = threading.Thread(
target=thread_task, args=(stop_event,), name="WebviewThread"
"""Legacy API wrapper - delegates to the bridge."""
self.bridge.handle_webview_call(
method_name=method_name,
op_key_bytes=op_key_bytes,
request_data=request_data,
arg=arg,
)
thread.start()
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
def __enter__(self) -> "Webview":
return self
@property
def size(self) -> Size:
return self._size
def threads(self) -> dict[str, WebThread]:
"""Access threads from the bridge for compatibility."""
return self.bridge.threads
@size.setter
def size(self, value: Size) -> None:
def add_middleware(self, middleware: "Middleware") -> None:
"""Add middleware to the middleware chain."""
if self._bridge is not None:
msg = "Cannot add middleware after bridge creation."
raise RuntimeError(msg)
self._middleware.append(middleware)
def create_bridge(self) -> "WebviewBridge":
"""Create and initialize the WebviewBridge with current middleware."""
from .webview_bridge import WebviewBridge
bridge = WebviewBridge(webview=self, middleware_chain=tuple(self._middleware))
self._bridge = bridge
return bridge
# Legacy methods for compatibility
def set_size(self, value: Size) -> None:
"""Set the webview size (legacy compatibility)."""
_webview_lib.webview_set_size(
self._handle, value.width, value.height, value.hint
self.handle, value.width, value.height, value.hint
)
self._size = value
@property
def title(self) -> str:
return self._title
@title.setter
def title(self, value: str) -> None:
_webview_lib.webview_set_title(self._handle, _encode_c_string(value))
self._title = value
def set_title(self, value: str) -> None:
"""Set the webview title (legacy compatibility)."""
_webview_lib.webview_set_title(self.handle, _encode_c_string(value))
def destroy(self) -> None:
"""Destroy the webview."""
for name in list(self._callbacks.keys()):
self.unbind(name)
_webview_lib.webview_terminate(self._handle)
_webview_lib.webview_destroy(self._handle)
self._handle = None
_webview_lib.webview_terminate(self.handle)
_webview_lib.webview_destroy(self.handle)
# Can't set _handle to None on frozen dataclass
def navigate(self, url: str) -> None:
_webview_lib.webview_navigate(self._handle, _encode_c_string(url))
"""Navigate to a URL."""
_webview_lib.webview_navigate(self.handle, _encode_c_string(url))
def run(self) -> None:
_webview_lib.webview_run(self._handle)
"""Run the webview."""
_webview_lib.webview_run(self.handle)
log.info("Shutting down webview...")
self.destroy()
@@ -264,8 +152,6 @@ class Webview:
for name, method in api.functions.items():
wrapper = functools.partial(
self.api_wrapper,
log_manager,
api,
name,
method,
)
@@ -277,7 +163,7 @@ class Webview:
self._callbacks[name] = c_callback
_webview_lib.webview_bind(
self._handle, _encode_c_string(name), c_callback, None
self.handle, _encode_c_string(name), c_callback, None
)
def bind(self, name: str, callback: Callable[..., Any]) -> None:
@@ -293,29 +179,23 @@ class Webview:
c_callback = _webview_lib.binding_callback_t(wrapper)
self._callbacks[name] = c_callback
_webview_lib.webview_bind(
self._handle, _encode_c_string(name), c_callback, None
)
_webview_lib.webview_bind(self.handle, _encode_c_string(name), c_callback, None)
def unbind(self, name: str) -> None:
if name in self._callbacks:
_webview_lib.webview_unbind(self._handle, _encode_c_string(name))
_webview_lib.webview_unbind(self.handle, _encode_c_string(name))
del self._callbacks[name]
def return_(self, seq: str, status: int, result: str) -> None:
_webview_lib.webview_return(
self._handle, _encode_c_string(seq), status, _encode_c_string(result)
self.handle, _encode_c_string(seq), status, _encode_c_string(result)
)
def eval(self, source: str) -> None:
_webview_lib.webview_eval(self._handle, _encode_c_string(source))
def init(self, source: str) -> None:
_webview_lib.webview_init(self._handle, _encode_c_string(source))
_webview_lib.webview_eval(self.handle, _encode_c_string(source))
if __name__ == "__main__":
wv = Webview()
wv.title = "Hello, World!"
wv = Webview(title="Hello, World!")
wv.navigate("https://www.google.com")
wv.run()