clan-app: WebExecutor now mirrors jsonschema api types generically
This commit is contained in:
@@ -1,6 +1,14 @@
|
||||
import inspect
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any, ClassVar, Generic, ParamSpec, TypeVar, cast
|
||||
from typing import (
|
||||
Any,
|
||||
ClassVar,
|
||||
Generic,
|
||||
ParamSpec,
|
||||
TypeVar,
|
||||
cast,
|
||||
)
|
||||
|
||||
from gi.repository import GLib, GObject
|
||||
|
||||
@@ -12,9 +20,8 @@ class GResult(GObject.Object):
|
||||
op_key: str
|
||||
method_name: str
|
||||
|
||||
def __init__(self, result: Any, method_name: str, op_key: str) -> None:
|
||||
def __init__(self, result: Any, method_name: str) -> None:
|
||||
super().__init__()
|
||||
self.op_key = op_key
|
||||
self.result = result
|
||||
self.method_name = method_name
|
||||
|
||||
@@ -32,9 +39,13 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
def returns(self, result: B, *, method_name: str | None = None) -> None:
|
||||
if method_name is None:
|
||||
method_name = self.__class__.__name__
|
||||
if self.op_key is None:
|
||||
raise ValueError(f"op_key is not set for the function {method_name}")
|
||||
self.emit("returns", GResult(result, method_name, self.op_key))
|
||||
|
||||
self.emit("returns", GResult(result, method_name))
|
||||
|
||||
def _signature_check(self, *args: P.args, **kwargs: P.kwargs) -> B:
|
||||
raise RuntimeError(
|
||||
"This method is only for typechecking and should never be called"
|
||||
)
|
||||
|
||||
def await_result(self, fn: Callable[["ImplFunc[..., Any]", B], None]) -> None:
|
||||
self.connect("returns", fn)
|
||||
@@ -42,8 +53,7 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool:
|
||||
raise NotImplementedError("Method 'async_run' must be implemented")
|
||||
|
||||
def _async_run(self, data: Any, op_key: str) -> bool:
|
||||
self.op_key = op_key
|
||||
def _async_run(self, data: Any) -> bool:
|
||||
result = GLib.SOURCE_REMOVE
|
||||
try:
|
||||
result = self.async_run(**data)
|
||||
@@ -62,33 +72,33 @@ class GObjApi:
|
||||
def overwrite_fn(self, obj: type[ImplFunc]) -> None:
|
||||
fn_name = obj.__name__
|
||||
|
||||
if not isinstance(obj, type(ImplFunc)):
|
||||
raise ValueError(f"Object '{fn_name}' is not an instance of ImplFunc")
|
||||
|
||||
if fn_name in self._obj_registry:
|
||||
raise ValueError(f"Function '{fn_name}' already registered")
|
||||
self._obj_registry[fn_name] = obj
|
||||
|
||||
def check_signature(self, method_annotations: dict[str, dict[str, Any]]) -> None:
|
||||
def check_signature(self, fn_signatures: dict[str, inspect.Signature]) -> None:
|
||||
overwrite_fns = self._obj_registry
|
||||
|
||||
# iterate over the methods and check if all are implemented
|
||||
for m_name, m_annotations in method_annotations.items():
|
||||
for m_name, m_signature in fn_signatures.items():
|
||||
if m_name not in overwrite_fns:
|
||||
continue
|
||||
else:
|
||||
# check if the signature of the abstract method matches the implementation
|
||||
# abstract signature
|
||||
values = list(m_annotations.values())
|
||||
expected_signature = (tuple(values[:-1]), values[-1:][0])
|
||||
# check if the signature of the overriden method matches
|
||||
# the implementation signature
|
||||
exp_args = []
|
||||
exp_return = m_signature.return_annotation
|
||||
for param in dict(m_signature.parameters).values():
|
||||
exp_args.append(param.annotation)
|
||||
exp_signature = (tuple(exp_args), exp_return)
|
||||
|
||||
# implementation signature
|
||||
obj = dict(overwrite_fns[m_name].__dict__)
|
||||
obj_type = obj["__orig_bases__"][0]
|
||||
got_signature = obj_type.__args__
|
||||
|
||||
if expected_signature != got_signature:
|
||||
log.error(f"Expected signature: {expected_signature}")
|
||||
if exp_signature != got_signature:
|
||||
log.error(f"Expected signature: {exp_signature}")
|
||||
log.error(f"Actual signature: {got_signature}")
|
||||
raise ValueError(
|
||||
f"Overwritten method '{m_name}' has different signature than the implementation"
|
||||
|
||||
@@ -5,6 +5,7 @@ gi.require_version("Gtk", "4.0")
|
||||
|
||||
import logging
|
||||
|
||||
from clan_cli.api import ErrorDataClass, SuccessDataClass
|
||||
from clan_cli.api.directory import FileRequest
|
||||
from gi.repository import Gio, GLib, Gtk
|
||||
|
||||
@@ -15,17 +16,23 @@ log = logging.getLogger(__name__)
|
||||
|
||||
# This implements the abstract function open_file with one argument, file_request,
|
||||
# which is a FileRequest object and returns a string or None.
|
||||
class open_file(ImplFunc[[FileRequest], str | None]):
|
||||
class open_file(
|
||||
ImplFunc[[FileRequest, str], SuccessDataClass[str | None] | ErrorDataClass]
|
||||
):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def async_run(self, file_request: FileRequest) -> bool:
|
||||
def async_run(self, file_request: FileRequest, op_key: str) -> bool:
|
||||
def on_file_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
|
||||
try:
|
||||
gfile = file_dialog.open_finish(task)
|
||||
if gfile:
|
||||
selected_path = gfile.get_path()
|
||||
self.returns(selected_path)
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected file or directory: {e}")
|
||||
|
||||
@@ -34,7 +41,11 @@ class open_file(ImplFunc[[FileRequest], str | None]):
|
||||
gfile = file_dialog.select_folder_finish(task)
|
||||
if gfile:
|
||||
selected_path = gfile.get_path()
|
||||
self.returns(selected_path)
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected directory: {e}")
|
||||
|
||||
@@ -43,7 +54,11 @@ class open_file(ImplFunc[[FileRequest], str | None]):
|
||||
gfile = file_dialog.save_finish(task)
|
||||
if gfile:
|
||||
selected_path = gfile.get_path()
|
||||
self.returns(selected_path)
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected file: {e}")
|
||||
|
||||
|
||||
@@ -17,9 +17,9 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebExecutor(GObject.Object):
|
||||
def __init__(self, content_uri: str, plain_api: MethodRegistry) -> None:
|
||||
def __init__(self, content_uri: str, jschema_api: MethodRegistry) -> None:
|
||||
super().__init__()
|
||||
self.plain_api: MethodRegistry = plain_api
|
||||
self.jschema_api: MethodRegistry = jschema_api
|
||||
self.webview: WebKit.WebView = WebKit.WebView()
|
||||
|
||||
settings: WebKit.Settings = self.webview.get_settings()
|
||||
@@ -40,10 +40,10 @@ class WebExecutor(GObject.Object):
|
||||
self.webview.load_uri(content_uri)
|
||||
self.content_uri = content_uri
|
||||
|
||||
self.api: GObjApi = GObjApi(self.plain_api.functions)
|
||||
self.api: GObjApi = GObjApi(self.jschema_api.functions)
|
||||
|
||||
self.api.overwrite_fn(open_file)
|
||||
self.api.check_signature(self.plain_api.annotations)
|
||||
self.api.check_signature(self.jschema_api.signatures)
|
||||
|
||||
def on_decide_policy(
|
||||
self,
|
||||
@@ -94,30 +94,25 @@ class WebExecutor(GObject.Object):
|
||||
|
||||
# Initialize dataclasses from the payload
|
||||
reconciled_arguments = {}
|
||||
op_key = data.pop("op_key", None)
|
||||
|
||||
for k, v in data.items():
|
||||
# Some functions expect to be called with dataclass instances
|
||||
# But the js api returns dictionaries.
|
||||
# Introspect the function and create the expected dataclass from dict dynamically
|
||||
# Depending on the introspected argument_type
|
||||
arg_class = self.plain_api.get_method_argtype(method_name, k)
|
||||
arg_class = self.jschema_api.get_method_argtype(method_name, k)
|
||||
if dataclasses.is_dataclass(arg_class):
|
||||
reconciled_arguments[k] = from_dict(arg_class, v)
|
||||
else:
|
||||
reconciled_arguments[k] = v
|
||||
|
||||
GLib.idle_add(
|
||||
fn_instance._async_run,
|
||||
reconciled_arguments,
|
||||
op_key,
|
||||
)
|
||||
GLib.idle_add(fn_instance._async_run, reconciled_arguments)
|
||||
|
||||
def on_result(self, source: ImplFunc, data: GResult) -> None:
|
||||
result = dict()
|
||||
result["result"] = dataclass_to_dict(data.result)
|
||||
result["op_key"] = data.op_key
|
||||
result = dataclass_to_dict(data.result)
|
||||
serialized = json.dumps(result, indent=4)
|
||||
log.debug(f"Result for {data.method_name}: {serialized}")
|
||||
|
||||
# Use idle_add to queue the response call to js on the main GTK thread
|
||||
self.return_data_to_js(data.method_name, serialized)
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ class MainWindow(Adw.ApplicationWindow):
|
||||
|
||||
stack_view = ViewStack.use().view
|
||||
|
||||
webexec = WebExecutor(plain_api=API, content_uri=config.content_uri)
|
||||
webexec = WebExecutor(jschema_api=API, content_uri=config.content_uri)
|
||||
|
||||
stack_view.add_named(webexec.get_webview(), "webview")
|
||||
stack_view.set_visible_child_name(config.initial_view)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from functools import wraps
|
||||
from inspect import Parameter, signature
|
||||
from inspect import Parameter, Signature, signature
|
||||
from typing import Annotated, Any, Generic, Literal, TypeVar, get_type_hints
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
@@ -20,12 +20,14 @@ class ApiError:
|
||||
|
||||
@dataclass
|
||||
class SuccessDataClass(Generic[ResponseDataType]):
|
||||
op_key: str
|
||||
status: Annotated[Literal["success"], "The status of the response."]
|
||||
data: ResponseDataType
|
||||
|
||||
|
||||
@dataclass
|
||||
class ErrorDataClass:
|
||||
op_key: str
|
||||
status: Literal["error"]
|
||||
errors: list[ApiError]
|
||||
|
||||
@@ -39,7 +41,7 @@ def update_wrapper_signature(wrapper: Callable, wrapped: Callable) -> None:
|
||||
|
||||
# Add 'op_key' parameter
|
||||
op_key_param = Parameter(
|
||||
"op_key", Parameter.KEYWORD_ONLY, default=None, annotation=str | None
|
||||
"op_key", Parameter.KEYWORD_ONLY, default=None, annotation=str
|
||||
)
|
||||
params.append(op_key_param)
|
||||
|
||||
@@ -50,26 +52,28 @@ def update_wrapper_signature(wrapper: Callable, wrapped: Callable) -> None:
|
||||
|
||||
class MethodRegistry:
|
||||
def __init__(self) -> None:
|
||||
self._orig_annotations: dict[str, dict[str, Any]] = {}
|
||||
self._orig_signature: dict[str, Signature] = {}
|
||||
self._registry: dict[str, Callable[..., Any]] = {}
|
||||
|
||||
@property
|
||||
def annotations(self) -> dict[str, dict[str, Any]]:
|
||||
return self._orig_annotations
|
||||
def orig_signatures(self) -> dict[str, Signature]:
|
||||
return self._orig_signature
|
||||
|
||||
@property
|
||||
def signatures(self) -> dict[str, Signature]:
|
||||
return {name: signature(fn) for name, fn in self.functions.items()}
|
||||
|
||||
@property
|
||||
def functions(self) -> dict[str, Callable[..., Any]]:
|
||||
return self._registry
|
||||
|
||||
def reset(self) -> None:
|
||||
self._orig_annotations.clear()
|
||||
self._orig_signature.clear()
|
||||
self._registry.clear()
|
||||
|
||||
def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]:
|
||||
@wraps(fn)
|
||||
def wrapper(
|
||||
*args: Any, op_key: str | None = None, **kwargs: Any
|
||||
) -> ApiResponse[T]:
|
||||
def wrapper(*args: Any, op_key: str, **kwargs: Any) -> ApiResponse[T]:
|
||||
raise NotImplementedError(
|
||||
f"""{fn.__name__} - The platform didn't implement this function.
|
||||
|
||||
@@ -96,20 +100,19 @@ API.register(open_file)
|
||||
def register(self, fn: Callable[..., T]) -> Callable[..., T]:
|
||||
if fn.__name__ in self._registry:
|
||||
raise ValueError(f"Function {fn.__name__} already registered")
|
||||
if fn.__name__ in self._orig_annotations:
|
||||
if fn.__name__ in self._orig_signature:
|
||||
raise ValueError(f"Function {fn.__name__} already registered")
|
||||
# make copy of original function
|
||||
self._orig_annotations[fn.__name__] = fn.__annotations__.copy()
|
||||
self._orig_signature[fn.__name__] = signature(fn)
|
||||
|
||||
@wraps(fn)
|
||||
def wrapper(
|
||||
*args: Any, op_key: str | None = None, **kwargs: Any
|
||||
) -> ApiResponse[T]:
|
||||
def wrapper(*args: Any, op_key: str, **kwargs: Any) -> ApiResponse[T]:
|
||||
try:
|
||||
data: T = fn(*args, **kwargs)
|
||||
return SuccessDataClass(status="success", data=data)
|
||||
return SuccessDataClass(status="success", data=data, op_key=op_key)
|
||||
except ClanError as e:
|
||||
return ErrorDataClass(
|
||||
op_key=op_key,
|
||||
status="error",
|
||||
errors=[
|
||||
ApiError(
|
||||
@@ -127,7 +130,7 @@ API.register(open_file)
|
||||
wrapper.__annotations__["return"] = ApiResponse[orig_return_type] # type: ignore
|
||||
|
||||
# Add additional argument for the operation key
|
||||
wrapper.__annotations__["op_key"] = str | None # type: ignore
|
||||
wrapper.__annotations__["op_key"] = str # type: ignore
|
||||
|
||||
update_wrapper_signature(wrapper, fn)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user