diff --git a/pkgs/clan-app/clan_app/__init__.py b/pkgs/clan-app/clan_app/__init__.py index 35976eb00..47125c580 100644 --- a/pkgs/clan-app/clan_app/__init__.py +++ b/pkgs/clan-app/clan_app/__init__.py @@ -19,6 +19,10 @@ def main(argv: list[str] = sys.argv) -> int: args = parser.parse_args(argv[1:]) app_opts = ClanAppOptions(content_uri=args.content_uri, debug=args.debug) - app_run(app_opts) + try: + app_run(app_opts) + except KeyboardInterrupt: + log.info("Keyboard interrupt received, exiting...") + return 0 return 0 diff --git a/pkgs/clan-app/clan_app/app.py b/pkgs/clan-app/clan_app/app.py index 4446f14d7..999d77c81 100644 --- a/pkgs/clan-app/clan_app/app.py +++ b/pkgs/clan-app/clan_app/app.py @@ -43,14 +43,13 @@ def app_run(app_opts: ClanAppOptions) -> int: task_id: str, *, op_key: str ) -> SuccessDataClass[None] | ErrorDataClass: """Cancel a task by its op_key.""" - log.info(f"Cancelling task with op_key: {task_id}") - with webview.lock: - if task_id in webview.threads: - future = webview.threads[task_id] - future.stop_event.set() - log.info(f"Task {task_id} cancelled.") - else: - log.warning(f"Task {task_id} not found.") + log.debug(f"Cancelling task with op_key: {task_id}") + future = webview.threads.get(task_id) + if future: + future.stop_event.set() + log.debug(f"Task {task_id} cancelled.") + else: + log.warning(f"Task {task_id} not found.") return SuccessDataClass( op_key=op_key, data=None, @@ -62,9 +61,8 @@ def app_run(app_opts: ClanAppOptions) -> int: op_key: str, ) -> SuccessDataClass[list[str]] | ErrorDataClass: """List all tasks.""" - log.info("Listing all tasks.") - with webview.lock: - tasks = list(webview.threads.keys()) + log.debug("Listing all tasks.") + tasks = list(webview.threads.keys()) return SuccessDataClass( op_key=op_key, data=tasks, diff --git a/pkgs/clan-app/clan_app/deps/webview/webview.py b/pkgs/clan-app/clan_app/deps/webview/webview.py index f47207c13..126a30fda 100644 --- a/pkgs/clan-app/clan_app/deps/webview/webview.py +++ b/pkgs/clan-app/clan_app/deps/webview/webview.py @@ -1,8 +1,8 @@ import ctypes +import functools import json import logging import threading -import time from collections.abc import Callable from dataclasses import dataclass from enum import IntEnum @@ -54,13 +54,78 @@ class Webview: self._handle = _webview_lib.webview_create(int(debug), window) self._callbacks: dict[str, Callable[..., Any]] = {} self.threads: dict[str, WebThread] = {} - self.stopped_threads: set[str] = set() - self.lock = threading.Lock() - self.stop_garbage_collection: threading.Event = threading.Event() if size: self.size = size + def api_wrapper( + self, + api: MethodRegistry, + method_name: str, + wrap_method: Callable[..., Any], + op_key_bytes: bytes, + request_data: bytes, + arg: int, + ) -> None: + op_key = op_key_bytes.decode() + args = json.loads(request_data.decode()) + log.debug(f"Calling {method_name}({args[0]})") + + # Initialize dataclasses from the payload + reconciled_arguments = {} + for k, v in args[0].items(): + # Some functions expect to be called with dataclass instances + # But the js api returns dictionaries. + # Introspect the function and create the expected dataclass from dict dynamically + # Depending on the introspected argument_type + arg_class = api.get_method_argtype(method_name, k) + + # TODO: rename from_dict into something like construct_checked_value + # from_dict really takes Anything and returns an instance of the type/class + reconciled_arguments[k] = from_dict(arg_class, v) + + reconciled_arguments["op_key"] = op_key + # TODO: We could remove the wrapper in the MethodRegistry + # and just call the method directly + + def thread_task(stop_event: threading.Event) -> None: + try: + set_should_cancel(lambda: stop_event.is_set()) + result = wrap_method(**reconciled_arguments) + + serialized = json.dumps( + dataclass_to_dict(result), indent=4, ensure_ascii=False + ) + + log.debug(f"Result for {method_name}: {serialized}") + self.return_(op_key, FuncStatus.SUCCESS, serialized) + except Exception as e: + log.exception(f"Error while handling result of {method_name}") + result = ErrorDataClass( + op_key=op_key, + status="error", + errors=[ + ApiError( + message="An internal error occured", + description=str(e), + location=["bind_jsonschema_api", method_name], + ) + ], + ) + serialized = json.dumps( + dataclass_to_dict(result), indent=4, ensure_ascii=False + ) + self.return_(op_key, FuncStatus.FAILURE, serialized) + finally: + del self.threads[op_key] + + stop_event = threading.Event() + thread = threading.Thread( + target=thread_task, args=(stop_event,), name="WebviewThread" + ) + thread.start() + self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event) + def __enter__(self) -> "Webview": return self @@ -94,102 +159,19 @@ class Webview: def navigate(self, url: str) -> None: _webview_lib.webview_navigate(self._handle, _encode_c_string(url)) - def collect_garbage(self) -> None: - while not self.stop_garbage_collection.is_set(): - with self.lock: - for op_key in list(self.threads.keys()): - if op_key in self.stopped_threads: - log.debug(f"Collecting garbage op_key: {op_key}") - del self.threads[op_key] - self.stopped_threads.remove(op_key) - if self.stop_garbage_collection.is_set(): - break - time.sleep(1) - def run(self) -> None: - thread = threading.Thread( - target=self.collect_garbage, name="WebviewGarbageCollector" - ) - thread.start() _webview_lib.webview_run(self._handle) - self.stop_garbage_collection.set() log.info("Shutting down webview...") - if self.lock.locked(): - self.lock.release() self.destroy() def bind_jsonschema_api(self, api: MethodRegistry) -> None: for name, method in api.functions.items(): - - def wrapper( - seq: bytes, - req: bytes, - arg: int, - wrap_method: Callable[..., Any] = method, - method_name: str = name, - ) -> None: - def thread_task(stop_event: threading.Event) -> None: - try: - args = json.loads(req.decode()) - - log.debug(f"Calling {method_name}({args[0]})") - # Initialize dataclasses from the payload - reconciled_arguments = {} - for k, v in args[0].items(): - # Some functions expect to be called with dataclass instances - # But the js api returns dictionaries. - # Introspect the function and create the expected dataclass from dict dynamically - # Depending on the introspected argument_type - arg_class = api.get_method_argtype(method_name, k) - - # TODO: rename from_dict into something like construct_checked_value - # from_dict really takes Anything and returns an instance of the type/class - reconciled_arguments[k] = from_dict(arg_class, v) - - op_key = seq.decode() - reconciled_arguments["op_key"] = op_key - # TODO: We could remove the wrapper in the MethodRegistry - # and just call the method directly - - set_should_cancel(lambda: stop_event.is_set()) - result = wrap_method(**reconciled_arguments) - - serialized = json.dumps( - dataclass_to_dict(result), indent=4, ensure_ascii=False - ) - - log.debug(f"Result for {method_name}: {serialized}") - self.return_(seq.decode(), FuncStatus.SUCCESS, serialized) - except Exception as e: - log.exception(f"Error while handling result of {method_name}") - result = ErrorDataClass( - op_key=seq.decode(), - status="error", - errors=[ - ApiError( - message="An internal error occured", - description=str(e), - location=["bind_jsonschema_api", method_name], - ) - ], - ) - serialized = json.dumps( - dataclass_to_dict(result), indent=4, ensure_ascii=False - ) - self.return_(seq.decode(), FuncStatus.FAILURE, serialized) - finally: - self.stopped_threads.add(seq.decode()) - - stop_event = threading.Event() - thread = threading.Thread( - target=thread_task, args=(stop_event,), name="WebviewThread" - ) - thread.start() - with self.lock: - self.threads[seq.decode()] = WebThread( - thread=thread, stop_event=stop_event - ) - + wrapper = functools.partial( + self.api_wrapper, + api, + name, + method, + ) c_callback = _webview_lib.CFUNCTYPE( None, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p )(wrapper)