Merge pull request 'Add middleware interface to clan-app' (#4265) from Qubasa/clan-core:generalize_webview into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4265
This commit is contained in:
Luis Hebendanz
2025-07-08 11:16:36 +00:00
12 changed files with 577 additions and 235 deletions

View File

@@ -0,0 +1,87 @@
import logging
from abc import ABC, abstractmethod
from contextlib import ExitStack
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from .middleware import Middleware
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class BackendRequest:
method_name: str
args: dict[str, Any]
header: dict[str, Any]
op_key: str
@dataclass(frozen=True)
class BackendResponse:
body: Any
header: dict[str, Any]
_op_key: str
@dataclass
class ApiBridge(ABC):
"""Generic interface for API bridges that can handle method calls from different sources."""
middleware_chain: tuple["Middleware", ...]
@abstractmethod
def send_response(self, response: BackendResponse) -> None:
"""Send response back to the client."""
def process_request(self, request: BackendRequest) -> None:
"""Process an API request through the middleware chain."""
from .middleware import MiddlewareContext
with ExitStack() as stack:
context = MiddlewareContext(
request=request,
bridge=self,
exit_stack=stack,
)
# Process through middleware chain
for middleware in self.middleware_chain:
try:
log.debug(
f"{middleware.__class__.__name__} => {request.method_name}"
)
middleware.process(context)
except Exception as e:
# If middleware fails, handle error
self.send_error_response(
request.op_key, str(e), ["middleware_error"]
)
return
def send_error_response(
self, op_key: str, error_message: str, location: list[str]
) -> None:
"""Send an error response."""
from clan_lib.api import ApiError, ErrorDataClass
error_data = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=error_message,
location=location,
)
],
)
response = BackendResponse(
body=error_data,
header={},
_op_key=op_key,
)
self.send_response(response)

View File

@@ -4,6 +4,9 @@ from pathlib import Path
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
from clan_lib.api import ApiError, ErrorDataClass, SuccessDataClass
from clan_lib.api.directory import FileRequest
from gi.repository import Gio, GLib, Gtk

View File

@@ -0,0 +1,14 @@
"""Middleware components for the webview API bridge."""
from .argument_parsing import ArgumentParsingMiddleware
from .base import Middleware, MiddlewareContext
from .logging import LoggingMiddleware
from .method_execution import MethodExecutionMiddleware
__all__ = [
"ArgumentParsingMiddleware",
"LoggingMiddleware",
"MethodExecutionMiddleware",
"Middleware",
"MiddlewareContext",
]

View File

@@ -0,0 +1,55 @@
import logging
from dataclasses import dataclass
from clan_lib.api import MethodRegistry, from_dict
from clan_app.api.api_bridge import BackendRequest
from .base import Middleware, MiddlewareContext
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class ArgumentParsingMiddleware(Middleware):
"""Middleware that handles argument parsing and dataclass construction."""
api: MethodRegistry
def process(self, context: MiddlewareContext) -> None:
try:
# Convert dictionary arguments to dataclass instances
reconciled_arguments = {}
for k, v in context.request.args.items():
if k == "op_key":
continue
# Get the expected argument type from the API
arg_class = self.api.get_method_argtype(context.request.method_name, k)
# Convert dictionary to dataclass instance
reconciled_arguments[k] = from_dict(arg_class, v)
# Add op_key to arguments
reconciled_arguments["op_key"] = context.request.op_key
# Create a new request with reconciled arguments
updated_request = BackendRequest(
method_name=context.request.method_name,
args=reconciled_arguments,
header=context.request.header,
op_key=context.request.op_key,
)
context.request = updated_request
except Exception as e:
log.exception(
f"Error while parsing arguments for {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["argument_parsing", context.request.method_name],
)
raise

View File

@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager, ExitStack
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from clan_app.api.api_bridge import ApiBridge, BackendRequest
@dataclass
class MiddlewareContext:
request: "BackendRequest"
bridge: "ApiBridge"
exit_stack: ExitStack
@dataclass(frozen=True)
class Middleware(ABC):
"""Abstract base class for middleware components."""
@abstractmethod
def process(self, context: MiddlewareContext) -> None:
"""Process the request through this middleware."""
def register_context_manager(
self, context: MiddlewareContext, cm: AbstractContextManager[Any]
) -> Any:
"""Register a context manager with the exit stack."""
return context.exit_stack.enter_context(cm)

View File

@@ -0,0 +1,99 @@
import io
import logging
import types
from dataclasses import dataclass
from typing import Any
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import RegisteredHandler, setup_logging
from clan_lib.log_manager import LogManager
from .base import Middleware, MiddlewareContext
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class LoggingMiddleware(Middleware):
"""Middleware that sets up logging context without executing methods."""
log_manager: LogManager
def process(self, context: MiddlewareContext) -> None:
method = context.request.method_name
try:
# Handle log group configuration
log_group: list[str] | None = context.request.header.get("logging", {}).get(
"group_path", None
)
if log_group is not None:
if not isinstance(log_group, list):
msg = f"Expected log_group to be a list, got {type(log_group)}"
raise TypeError(msg) # noqa: TRY301
log.warning(
f"Using log group {log_group} for {context.request.method_name} with op_key {context.request.op_key}"
)
# Create log file
log_file = self.log_manager.create_log_file(
method, op_key=context.request.op_key, group_path=log_group
).get_file_path()
except Exception as e:
log.exception(
f"Error while handling request header of {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["header_middleware", context.request.method_name],
)
return
# Register logging context manager
class LoggingContextManager:
def __init__(self, log_file: Any) -> None:
self.log_file = log_file
self.log_f: Any = None
self.handler: RegisteredHandler | None = None
self.original_ctx: AsyncContext | None = None
def __enter__(self) -> "LoggingContextManager":
self.log_f = self.log_file.open("ab")
self.original_ctx = get_async_ctx()
# Set up async context for logging
ctx = AsyncContext(**self.original_ctx.__dict__)
ctx.stderr = self.log_f
ctx.stdout = self.log_f
set_async_ctx(ctx)
# Set up logging handler
handler_stream = io.TextIOWrapper(
self.log_f, # type: ignore[arg-type]
encoding="utf-8",
write_through=True,
line_buffering=True,
)
self.handler = setup_logging(
log.getEffectiveLevel(), log_file=handler_stream
)
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: types.TracebackType | None,
) -> None:
if self.handler:
self.handler.root_logger.removeHandler(self.handler.new_handler)
self.handler.new_handler.close()
if self.log_f:
self.log_f.close()
if self.original_ctx:
set_async_ctx(self.original_ctx)
# Register the logging context manager
self.register_context_manager(context, LoggingContextManager(log_file))

View File

@@ -0,0 +1,41 @@
import logging
from dataclasses import dataclass
from clan_lib.api import MethodRegistry
from clan_app.api.api_bridge import BackendResponse
from .base import Middleware, MiddlewareContext
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class MethodExecutionMiddleware(Middleware):
"""Middleware that handles actual method execution."""
api: MethodRegistry
def process(self, context: MiddlewareContext) -> None:
method = self.api.functions[context.request.method_name]
try:
# Execute the actual method
result = method(**context.request.args)
response = BackendResponse(
body=result,
header={},
_op_key=context.request.op_key,
)
context.bridge.send_response(response)
except Exception as e:
log.exception(
f"Error while handling result of {context.request.method_name}"
)
context.bridge.send_error_response(
context.request.op_key,
str(e),
["method_execution", context.request.method_name],
)

View File

@@ -1,13 +1,9 @@
import logging
from clan_cli.profiler import profile
log = logging.getLogger(__name__)
import os
from dataclasses import dataclass
from pathlib import Path
import clan_lib.machines.actions # noqa: F401
from clan_cli.profiler import profile
from clan_lib.api import API, load_in_all_api_functions, tasks
from clan_lib.custom_logger import setup_logging
from clan_lib.dirs import user_data_dir
@@ -15,8 +11,15 @@ from clan_lib.log_manager import LogGroupConfig, LogManager
from clan_lib.log_manager import api as log_manager_api
from clan_app.api.file_gtk import open_file
from clan_app.api.middleware import (
ArgumentParsingMiddleware,
LoggingMiddleware,
MethodExecutionMiddleware,
)
from clan_app.deps.webview.webview import Size, SizeHint, Webview
log = logging.getLogger(__name__)
@dataclass
class ClanAppOptions:
@@ -39,9 +42,6 @@ def app_run(app_opts: ClanAppOptions) -> int:
site_index: Path = Path(os.getenv("WEBUI_PATH", ".")).resolve() / "index.html"
content_uri = f"file://{site_index}"
webview = Webview(debug=app_opts.debug)
webview.title = "Clan App"
# Add a log group ["clans", <dynamic_name>, "machines", <dynamic_name>]
log_manager = LogManager(base_dir=user_data_dir() / "clan-app" / "logs")
clan_log_group = LogGroupConfig("clans", "Clans").add_child(
@@ -51,15 +51,23 @@ def app_run(app_opts: ClanAppOptions) -> int:
# Init LogManager global in log_manager_api module
log_manager_api.LOG_MANAGER_INSTANCE = log_manager
# Populate the API global with all functions
load_in_all_api_functions()
API.overwrite_fn(open_file)
webview = Webview(
debug=app_opts.debug, title="Clan App", size=Size(1280, 1024, SizeHint.NONE)
)
# Add middleware to the webview
webview.add_middleware(ArgumentParsingMiddleware(api=API))
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
webview.add_middleware(MethodExecutionMiddleware(api=API))
# Init BAKEND_THREADS global in tasks module
tasks.BAKEND_THREADS = webview.threads
# Populate the API global with all functions
load_in_all_api_functions()
API.overwrite_fn(open_file)
webview.bind_jsonschema_api(API, log_manager=log_manager_api.LOG_MANAGER_INSTANCE)
webview.size = Size(1280, 1024, SizeHint.NONE)
webview.bind_jsonschema_api(API, log_manager=log_manager)
webview.navigate(content_uri)
webview.run()
return 0

View File

@@ -1,27 +1,22 @@
# ruff: noqa: TRY301
import functools
import io
import json
import logging
import threading
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any
from typing import TYPE_CHECKING, Any
from clan_lib.api import (
ApiError,
ErrorDataClass,
MethodRegistry,
dataclass_to_dict,
from_dict,
)
from clan_lib.api import MethodRegistry
from clan_lib.api.tasks import WebThread
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import setup_logging
from clan_lib.log_manager import LogManager
from ._webview_ffi import _encode_c_string, _webview_lib
if TYPE_CHECKING:
from clan_app.api.middleware import Middleware
from .webview_bridge import WebviewBridge
log = logging.getLogger(__name__)
@@ -37,226 +32,120 @@ class FuncStatus(IntEnum):
FAILURE = 1
@dataclass(frozen=True)
class Size:
def __init__(self, width: int, height: int, hint: SizeHint) -> None:
self.width = width
self.height = height
self.hint = hint
width: int
height: int
hint: SizeHint
@dataclass
class Webview:
def __init__(
self, debug: bool = False, size: Size | None = None, window: int | None = None
) -> None:
self._handle = _webview_lib.webview_create(int(debug), window)
self._callbacks: dict[str, Callable[..., Any]] = {}
self.threads: dict[str, WebThread] = {}
title: str
debug: bool = False
size: Size | None = None
window: int | None = None
if size:
self.size = size
# initialized later
_bridge: "WebviewBridge | None" = None
_handle: Any | None = None
_callbacks: dict[str, Callable[..., Any]] = field(default_factory=dict)
_middleware: list["Middleware"] = field(default_factory=list)
def _create_handle(self) -> None:
# Initialize the webview handle
handle = _webview_lib.webview_create(int(self.debug), self.window)
callbacks: dict[str, Callable[..., Any]] = {}
# Since we can't use object.__setattr__, we'll initialize differently
# by storing in __dict__ directly (this works for init=False fields)
self._handle = handle
self._callbacks = callbacks
if self.title:
self.set_title(self.title)
if self.size:
self.set_size(self.size)
@property
def handle(self) -> Any:
"""Get the webview handle, creating it if necessary."""
if self._handle is None:
self._create_handle()
return self._handle
@property
def bridge(self) -> "WebviewBridge":
"""Get the bridge, creating it if necessary."""
if self._bridge is None:
self.create_bridge()
assert self._bridge is not None, "Bridge should be created"
return self._bridge
def api_wrapper(
self,
log_manager: LogManager,
api: MethodRegistry,
method_name: str,
wrap_method: Callable[..., Any],
op_key_bytes: bytes,
request_data: bytes,
arg: int,
) -> None:
op_key = op_key_bytes.decode()
args = json.loads(request_data.decode())
log.debug(f"Calling {method_name}({json.dumps(args, indent=4)})")
header: dict[str, Any]
try:
# Initialize dataclasses from the payload
reconciled_arguments = {}
if len(args) == 1:
request = args[0]
header = request.get("header", {})
msg = f"Expected header to be a dict, got {type(header)}"
if not isinstance(header, dict):
raise TypeError(msg)
body = request.get("body", {})
msg = f"Expected body to be a dict, got {type(body)}"
if not isinstance(body, dict):
raise TypeError(msg)
for k, v in body.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = api.get_method_argtype(method_name, k)
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
elif len(args) > 1:
msg = (
"Expected a single argument, got multiple arguments to api_wrapper"
)
raise ValueError(msg)
reconciled_arguments["op_key"] = op_key
except Exception as e:
log.exception(f"Error while parsing arguments for {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
return
def thread_task(stop_event: threading.Event) -> None:
ctx: AsyncContext = get_async_ctx()
ctx.should_cancel = lambda: stop_event.is_set()
try:
# If the API call has set log_group in metadata,
# create the log file under that group.
log_group: list[str] = header.get("logging", {}).get("group_path", None)
if log_group is not None:
if not isinstance(log_group, list):
msg = f"Expected log_group to be a list, got {type(log_group)}"
raise TypeError(msg)
log.warning(
f"Using log group {log_group} for {method_name} with op_key {op_key}"
)
log_file = log_manager.create_log_file(
wrap_method, op_key=op_key, group_path=log_group
).get_file_path()
except Exception as e:
log.exception(f"Error while handling request header of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["header_middleware", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
with log_file.open("ab") as log_f:
# Redirect all cmd.run logs to this file.
ctx.stderr = log_f
ctx.stdout = log_f
set_async_ctx(ctx)
# Add a new handler to the root logger that writes to log_f
handler_stream = io.TextIOWrapper(
log_f, encoding="utf-8", write_through=True, line_buffering=True
)
handler = setup_logging(
log.getEffectiveLevel(), log_file=handler_stream
)
try:
# Original logic: call the wrapped API method.
result = wrap_method(**reconciled_arguments)
wrapped_result = {"body": dataclass_to_dict(result), "header": {}}
# Serialize the result to JSON.
serialized = json.dumps(
dataclass_to_dict(wrapped_result), indent=4, ensure_ascii=False
)
# This log message will now also be written to log_f
# through the thread_log_handler.
log.debug(f"Result for {method_name}: {serialized}")
# Return the successful result.
self.return_(op_key, FuncStatus.SUCCESS, serialized)
except Exception as e:
log.exception(f"Error while handling result of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
finally:
# Crucial cleanup: remove the handler from the root logger.
# This stops redirecting logs for this thread to log_f and prevents
# the handler from being used after log_f is closed.
handler.root_logger.removeHandler(handler.new_handler)
# Close the handler. For a StreamHandler using a stream it doesn't
# own (log_f is managed by the 'with' statement), this typically
# flushes the stream.
handler.new_handler.close()
del self.threads[op_key]
stop_event = threading.Event()
thread = threading.Thread(
target=thread_task, args=(stop_event,), name="WebviewThread"
"""Legacy API wrapper - delegates to the bridge."""
self.bridge.handle_webview_call(
method_name=method_name,
op_key_bytes=op_key_bytes,
request_data=request_data,
arg=arg,
)
thread.start()
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
def __enter__(self) -> "Webview":
return self
@property
def size(self) -> Size:
return self._size
def threads(self) -> dict[str, WebThread]:
"""Access threads from the bridge for compatibility."""
return self.bridge.threads
@size.setter
def size(self, value: Size) -> None:
def add_middleware(self, middleware: "Middleware") -> None:
"""Add middleware to the middleware chain."""
if self._bridge is not None:
msg = "Cannot add middleware after bridge creation."
raise RuntimeError(msg)
self._middleware.append(middleware)
def create_bridge(self) -> "WebviewBridge":
"""Create and initialize the WebviewBridge with current middleware."""
from .webview_bridge import WebviewBridge
bridge = WebviewBridge(webview=self, middleware_chain=tuple(self._middleware))
self._bridge = bridge
return bridge
# Legacy methods for compatibility
def set_size(self, value: Size) -> None:
"""Set the webview size (legacy compatibility)."""
_webview_lib.webview_set_size(
self._handle, value.width, value.height, value.hint
self.handle, value.width, value.height, value.hint
)
self._size = value
@property
def title(self) -> str:
return self._title
@title.setter
def title(self, value: str) -> None:
_webview_lib.webview_set_title(self._handle, _encode_c_string(value))
self._title = value
def set_title(self, value: str) -> None:
"""Set the webview title (legacy compatibility)."""
_webview_lib.webview_set_title(self.handle, _encode_c_string(value))
def destroy(self) -> None:
"""Destroy the webview."""
for name in list(self._callbacks.keys()):
self.unbind(name)
_webview_lib.webview_terminate(self._handle)
_webview_lib.webview_destroy(self._handle)
self._handle = None
_webview_lib.webview_terminate(self.handle)
_webview_lib.webview_destroy(self.handle)
# Can't set _handle to None on frozen dataclass
def navigate(self, url: str) -> None:
_webview_lib.webview_navigate(self._handle, _encode_c_string(url))
"""Navigate to a URL."""
_webview_lib.webview_navigate(self.handle, _encode_c_string(url))
def run(self) -> None:
_webview_lib.webview_run(self._handle)
"""Run the webview."""
_webview_lib.webview_run(self.handle)
log.info("Shutting down webview...")
self.destroy()
@@ -264,8 +153,6 @@ class Webview:
for name, method in api.functions.items():
wrapper = functools.partial(
self.api_wrapper,
log_manager,
api,
name,
method,
)
@@ -277,7 +164,7 @@ class Webview:
self._callbacks[name] = c_callback
_webview_lib.webview_bind(
self._handle, _encode_c_string(name), c_callback, None
self.handle, _encode_c_string(name), c_callback, None
)
def bind(self, name: str, callback: Callable[..., Any]) -> None:
@@ -293,29 +180,23 @@ class Webview:
c_callback = _webview_lib.binding_callback_t(wrapper)
self._callbacks[name] = c_callback
_webview_lib.webview_bind(
self._handle, _encode_c_string(name), c_callback, None
)
_webview_lib.webview_bind(self.handle, _encode_c_string(name), c_callback, None)
def unbind(self, name: str) -> None:
if name in self._callbacks:
_webview_lib.webview_unbind(self._handle, _encode_c_string(name))
_webview_lib.webview_unbind(self.handle, _encode_c_string(name))
del self._callbacks[name]
def return_(self, seq: str, status: int, result: str) -> None:
_webview_lib.webview_return(
self._handle, _encode_c_string(seq), status, _encode_c_string(result)
self.handle, _encode_c_string(seq), status, _encode_c_string(result)
)
def eval(self, source: str) -> None:
_webview_lib.webview_eval(self._handle, _encode_c_string(source))
def init(self, source: str) -> None:
_webview_lib.webview_init(self._handle, _encode_c_string(source))
_webview_lib.webview_eval(self.handle, _encode_c_string(source))
if __name__ == "__main__":
wv = Webview()
wv.title = "Hello, World!"
wv = Webview(title="Hello, World!")
wv.navigate("https://www.google.com")
wv.run()

View File

@@ -0,0 +1,102 @@
import json
import logging
import threading
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from clan_lib.api import dataclass_to_dict
from clan_lib.api.tasks import WebThread
from clan_lib.async_run import set_should_cancel
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
from .webview import FuncStatus
if TYPE_CHECKING:
from .webview import Webview
log = logging.getLogger(__name__)
@dataclass
class WebviewBridge(ApiBridge):
"""Webview-specific implementation of the API bridge."""
webview: "Webview"
threads: dict[str, WebThread] = field(default_factory=dict)
def send_response(self, response: BackendResponse) -> None:
"""Send response back to the webview client."""
serialized = json.dumps(
dataclass_to_dict(response), indent=4, ensure_ascii=False
)
log.debug(f"Sending response: {serialized}")
self.webview.return_(response._op_key, FuncStatus.SUCCESS, serialized) # noqa: SLF001
def handle_webview_call(
self,
method_name: str,
op_key_bytes: bytes,
request_data: bytes,
arg: int,
) -> None:
"""Handle a call from webview's JavaScript bridge."""
try:
op_key = op_key_bytes.decode()
raw_args = json.loads(request_data.decode())
# Parse the webview-specific request format
header = {}
args = {}
if len(raw_args) == 1:
request = raw_args[0]
header = request.get("header", {})
if not isinstance(header, dict):
msg = f"Expected header to be a dict, got {type(header)}"
raise TypeError(msg) # noqa: TRY301
body = request.get("body", {})
if not isinstance(body, dict):
msg = f"Expected body to be a dict, got {type(body)}"
raise TypeError(msg) # noqa: TRY301
args = body
elif len(raw_args) > 1:
msg = "Expected a single argument, got multiple arguments"
raise ValueError(msg) # noqa: TRY301
# Create API request
api_request = BackendRequest(
method_name=method_name, args=args, header=header, op_key=op_key
)
except Exception as e:
msg = (
f"Error while handling webview call {method_name} with op_key {op_key}"
)
log.exception(msg)
self.send_error_response(op_key, str(e), ["webview_bridge", method_name])
return
# Process in a separate thread
def thread_task(stop_event: threading.Event) -> None:
set_should_cancel(lambda: stop_event.is_set())
try:
log.debug(
f"Calling {method_name}({json.dumps(api_request.args, indent=4)}) with header {json.dumps(api_request.header, indent=4)} and op_key {op_key}"
)
self.process_request(api_request)
finally:
self.threads.pop(op_key, None)
stop_event = threading.Event()
thread = threading.Thread(
target=thread_task, args=(stop_event,), name="WebviewThread"
)
thread.start()
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)

View File

@@ -1,8 +1,10 @@
import logging
import threading
import time
from dataclasses import dataclass
from clan_lib.api import API
from clan_lib.async_run import get_async_ctx, is_async_cancelled
log = logging.getLogger(__name__)
@@ -16,11 +18,12 @@ class WebThread:
BAKEND_THREADS: dict[str, WebThread] | None = None
@API.register_abstract
@API.register
def delete_task(task_id: str) -> None:
"""Cancel a task by its op_key."""
assert BAKEND_THREADS is not None, "Backend threads not initialized"
future = BAKEND_THREADS.get(task_id)
log.debug(f"Thread ID: {threading.get_ident()}")
if future:
future.stop_event.set()
log.debug(f"Task with id {task_id} has been cancelled.")
@@ -29,6 +32,23 @@ def delete_task(task_id: str) -> None:
raise ValueError(msg)
@API.register
def run_blocking_task(somearg: str) -> str:
"""A long blocking task that simulates a long-running operation."""
time.sleep(1)
ctx = get_async_ctx()
log.debug(f"Thread ID: {threading.get_ident()}")
for i in range(30):
if is_async_cancelled():
log.debug("Task was cancelled")
return "Task was cancelled"
log.debug(
f"Processing {i} for {somearg}. ctx.should_cancel={ctx.should_cancel()}"
)
time.sleep(1)
return f"Task completed with argument: {somearg}"
@API.register
def list_tasks() -> list[str]:
"""List all tasks."""

View File

@@ -301,7 +301,7 @@ class LogManager:
return current_config
def create_log_file(
self, func: Callable, op_key: str, group_path: list[str] | None = None
self, func: Callable | str, op_key: str, group_path: list[str] | None = None
) -> LogFile:
"""Create a new log file for the given function and operation.
@@ -339,12 +339,15 @@ class LogManager:
# Convert encoded path to string for LogFile
group_str = "/".join(encoded_group_path)
# Use function name or string directly
func_name = func.__name__ if callable(func) else func
log_file = LogFile(
op_key=op_key,
date_day=now_utc.strftime("%Y-%m-%d"),
group=group_str,
date_second=now_utc.strftime("%H-%M-%S"), # Corrected original's %H-$M-%S
func_name=func.__name__,
func_name=func_name,
_base_dir=self.base_dir,
)