Merge pull request 'clan-app: Add threaded api execution by default' (#1769) from Qubasa/clan-core:Qubasa-main into main
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from typing import (
|
||||
Any,
|
||||
@@ -42,11 +43,6 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
|
||||
self.emit("returns", GResult(result, method_name))
|
||||
|
||||
def _signature_check(self, *args: P.args, **kwargs: P.kwargs) -> B:
|
||||
raise RuntimeError(
|
||||
"This method is only for typechecking and should never be called"
|
||||
)
|
||||
|
||||
def await_result(self, fn: Callable[["ImplFunc[..., Any]", B], None]) -> None:
|
||||
self.connect("returns", fn)
|
||||
|
||||
@@ -64,6 +60,26 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
return result
|
||||
|
||||
|
||||
class MethodExecutor(threading.Thread):
|
||||
def __init__(
|
||||
self, function: Callable[..., Any], *args: Any, **kwargs: dict[str, Any]
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.function = function
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.result: Any = None
|
||||
self.finished = False
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.result = self.function(*self.args, **self.kwargs)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
finally:
|
||||
self.finished = True
|
||||
|
||||
|
||||
class GObjApi:
|
||||
def __init__(self, methods: dict[str, Callable[..., Any]]) -> None:
|
||||
self._methods: dict[str, Callable[..., Any]] = methods
|
||||
@@ -104,23 +120,32 @@ class GObjApi:
|
||||
f"Overwritten method '{m_name}' has different signature than the implementation"
|
||||
)
|
||||
|
||||
def get_obj(self, name: str) -> type[ImplFunc]:
|
||||
result = self._obj_registry.get(name, None)
|
||||
def get_obj(self, fn_name: str) -> type[ImplFunc]:
|
||||
result = self._obj_registry.get(fn_name, None)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
plain_method = self._methods.get(name, None)
|
||||
if plain_method is None:
|
||||
raise ValueError(f"Method '{name}' not found in Api")
|
||||
plain_fn = self._methods.get(fn_name, None)
|
||||
if plain_fn is None:
|
||||
raise ValueError(f"Method '{fn_name}' not found in Api")
|
||||
|
||||
class GenericFnRuntime(ImplFunc[..., Any]):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.thread: MethodExecutor | None = None
|
||||
|
||||
def async_run(self, *args: Any, **kwargs: dict[str, Any]) -> bool:
|
||||
assert plain_method is not None
|
||||
result = plain_method(*args, **kwargs)
|
||||
self.returns(method_name=name, result=result)
|
||||
return GLib.SOURCE_REMOVE
|
||||
assert plain_fn is not None
|
||||
|
||||
if self.thread is None:
|
||||
self.thread = MethodExecutor(plain_fn, *args, **kwargs)
|
||||
self.thread.start()
|
||||
return GLib.SOURCE_CONTINUE
|
||||
elif self.thread.finished:
|
||||
result = self.thread.result
|
||||
self.returns(method_name=fn_name, result=result)
|
||||
return GLib.SOURCE_REMOVE
|
||||
else:
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
return cast(type[ImplFunc], GenericFnRuntime)
|
||||
|
||||
@@ -89,12 +89,19 @@ class WebExecutor(GObject.Object):
|
||||
# Extract the data from the payload
|
||||
data = payload.get("data")
|
||||
if data is None:
|
||||
log.error(f"Method '{method_name}' has no data field. Skipping execution.")
|
||||
log.error(
|
||||
f"JS function call '{method_name}' has no data field. Skipping execution."
|
||||
)
|
||||
return
|
||||
|
||||
if data.get("op_key") is None:
|
||||
log.error(
|
||||
f"JS function call '{method_name}' has no op_key field. Skipping execution."
|
||||
)
|
||||
return
|
||||
|
||||
# Initialize dataclasses from the payload
|
||||
reconciled_arguments = {}
|
||||
|
||||
for k, v in data.items():
|
||||
# Some functions expect to be called with dataclass instances
|
||||
# But the js api returns dictionaries.
|
||||
|
||||
Reference in New Issue
Block a user