clan-app: working file dialogue
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
|
||||
@@ -5,14 +6,7 @@ from clan_cli.profiler import profile
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
import argparse
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.custom_logger import setup_logging
|
||||
|
||||
from clan_app.deps.webview.webview import Size, SizeHint, Webview
|
||||
from clan_app.app import ClanAppOptions, app_run
|
||||
|
||||
|
||||
@profile
|
||||
@@ -24,24 +18,7 @@ def main(argv: list[str] = sys.argv) -> int:
|
||||
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
|
||||
args = parser.parse_args(argv[1:])
|
||||
|
||||
if args.debug:
|
||||
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
|
||||
setup_logging(logging.DEBUG, root_log_name="clan_cli")
|
||||
else:
|
||||
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])
|
||||
app_opts = ClanAppOptions(content_uri=args.content_uri, debug=args.debug)
|
||||
app_run(app_opts)
|
||||
|
||||
log.debug("Debug mode enabled")
|
||||
|
||||
if args.content_uri:
|
||||
content_uri = args.content_uri
|
||||
else:
|
||||
site_index: Path = Path(os.getenv("WEBUI_PATH", ".")).resolve() / "index.html"
|
||||
content_uri = f"file://{site_index}"
|
||||
|
||||
webview = Webview(debug=args.debug)
|
||||
webview.bind_jsonschema_api(API)
|
||||
|
||||
webview.size = Size(1280, 1024, SizeHint.NONE)
|
||||
webview.navigate(content_uri)
|
||||
webview.run()
|
||||
return 0
|
||||
|
||||
@@ -1,158 +0,0 @@
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from typing import (
|
||||
Any,
|
||||
ClassVar,
|
||||
Generic,
|
||||
ParamSpec,
|
||||
TypeVar,
|
||||
cast,
|
||||
)
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
from gi.repository import GLib, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GResult(GObject.Object):
|
||||
result: Any
|
||||
op_key: str
|
||||
method_name: str
|
||||
|
||||
def __init__(self, result: Any, method_name: str) -> None:
|
||||
super().__init__()
|
||||
self.result = result
|
||||
self.method_name = method_name
|
||||
|
||||
|
||||
B = TypeVar("B")
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
op_key: str | None = None
|
||||
__gsignals__: ClassVar = {
|
||||
"returns": (GObject.SignalFlags.RUN_FIRST, None, [GResult]),
|
||||
}
|
||||
|
||||
def returns(self, result: B, *, method_name: str | None = None) -> None:
|
||||
if method_name is None:
|
||||
method_name = self.__class__.__name__
|
||||
|
||||
self.emit("returns", GResult(result, method_name))
|
||||
|
||||
def await_result(self, fn: Callable[["ImplFunc[..., Any]", B], None]) -> None:
|
||||
self.connect("returns", fn)
|
||||
|
||||
def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool:
|
||||
msg = "Method 'async_run' must be implemented"
|
||||
raise NotImplementedError(msg)
|
||||
|
||||
def internal_async_run(self, data: Any) -> bool:
|
||||
result = GLib.SOURCE_REMOVE
|
||||
try:
|
||||
result = self.async_run(**data)
|
||||
except Exception:
|
||||
log.exception("Error in async_run")
|
||||
# TODO: send error to js
|
||||
return result
|
||||
|
||||
|
||||
# TODO: Reimplement this such that it uses a multiprocessing.Array of type ctypes.c_char
|
||||
# all fn arguments are serialized to json and passed to the new process over the Array
|
||||
# the new process deserializes the json and calls the function
|
||||
# the result is serialized to json and passed back to the main process over another Array
|
||||
class MethodExecutor(threading.Thread):
|
||||
def __init__(
|
||||
self, function: Callable[..., Any], *args: Any, **kwargs: dict[str, Any]
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.function = function
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.result: Any = None
|
||||
self.finished = False
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
self.result = self.function(*self.args, **self.kwargs)
|
||||
except Exception:
|
||||
log.exception("Error in MethodExecutor")
|
||||
finally:
|
||||
self.finished = True
|
||||
|
||||
|
||||
class GObjApi:
|
||||
def __init__(self, methods: dict[str, Callable[..., Any]]) -> None:
|
||||
self._methods: dict[str, Callable[..., Any]] = methods
|
||||
self._obj_registry: dict[str, type[ImplFunc]] = {}
|
||||
|
||||
def overwrite_fn(self, obj: type[ImplFunc]) -> None:
|
||||
fn_name = obj.__name__
|
||||
|
||||
if fn_name in self._obj_registry:
|
||||
msg = f"Function '{fn_name}' already registered"
|
||||
raise ClanError(msg)
|
||||
self._obj_registry[fn_name] = obj
|
||||
|
||||
def check_signature(self, fn_signatures: dict[str, inspect.Signature]) -> None:
|
||||
overwrite_fns = self._obj_registry
|
||||
|
||||
# iterate over the methods and check if all are implemented
|
||||
for m_name, m_signature in fn_signatures.items():
|
||||
if m_name not in overwrite_fns:
|
||||
continue
|
||||
# check if the signature of the overridden method matches
|
||||
# the implementation signature
|
||||
exp_args = []
|
||||
exp_return = m_signature.return_annotation
|
||||
for param in dict(m_signature.parameters).values():
|
||||
exp_args.append(param.annotation)
|
||||
exp_signature = (tuple(exp_args), exp_return)
|
||||
|
||||
# implementation signature
|
||||
obj = dict(overwrite_fns[m_name].__dict__)
|
||||
obj_type = obj["__orig_bases__"][0]
|
||||
got_signature = obj_type.__args__
|
||||
|
||||
if exp_signature != got_signature:
|
||||
log.error(f"Expected signature: {exp_signature}")
|
||||
log.error(f"Actual signature: {got_signature}")
|
||||
msg = f"Overwritten method '{m_name}' has different signature than the implementation"
|
||||
raise ClanError(msg)
|
||||
|
||||
def has_obj(self, fn_name: str) -> bool:
|
||||
return fn_name in self._obj_registry or fn_name in self._methods
|
||||
|
||||
def get_obj(self, fn_name: str) -> type[ImplFunc]:
|
||||
result = self._obj_registry.get(fn_name, None)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
plain_fn = self._methods.get(fn_name, None)
|
||||
if plain_fn is None:
|
||||
msg = f"Method '{fn_name}' not found in Api"
|
||||
raise ClanError(msg)
|
||||
|
||||
class GenericFnRuntime(ImplFunc[..., Any]):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.thread: MethodExecutor | None = None
|
||||
|
||||
def async_run(self, *args: Any, **kwargs: dict[str, Any]) -> bool:
|
||||
assert plain_fn is not None
|
||||
|
||||
if self.thread is None:
|
||||
self.thread = MethodExecutor(plain_fn, *args, **kwargs)
|
||||
self.thread.start()
|
||||
return GLib.SOURCE_CONTINUE
|
||||
if self.thread.finished:
|
||||
result = self.thread.result
|
||||
self.returns(method_name=fn_name, result=result)
|
||||
return GLib.SOURCE_REMOVE
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
return cast(type[ImplFunc], GenericFnRuntime)
|
||||
|
||||
@@ -1,146 +1,95 @@
|
||||
# ruff: noqa: N801
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from tkinter import Tk, filedialog
|
||||
|
||||
from clan_cli.api import ErrorDataClass, SuccessDataClass
|
||||
from clan_cli.api.directory import FileRequest
|
||||
from gi.repository import Gio, GLib, Gtk
|
||||
|
||||
from clan_app.api import ImplFunc
|
||||
from clan_cli.api import ApiError, ErrorDataClass, SuccessDataClass
|
||||
from clan_cli.api.directory import FileFilter, FileRequest
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def remove_none(_list: list) -> list:
|
||||
return [i for i in _list if i is not None]
|
||||
def _apply_filters(filters: FileFilter | None) -> list[tuple[str, str]]:
|
||||
if not filters:
|
||||
return []
|
||||
|
||||
filter_patterns = []
|
||||
|
||||
if filters.mime_types:
|
||||
# Tkinter does not directly support MIME types, so this section can be adjusted
|
||||
# if you wish to handle them differently
|
||||
filter_patterns.extend(filters.mime_types)
|
||||
|
||||
if filters.patterns:
|
||||
filter_patterns.extend(filters.patterns)
|
||||
|
||||
if filters.suffixes:
|
||||
suffix_patterns = [f"*.{suffix}" for suffix in filters.suffixes]
|
||||
filter_patterns.extend(suffix_patterns)
|
||||
|
||||
filter_title = filters.title if filters.title else "Custom Files"
|
||||
|
||||
return [(filter_title, " ".join(filter_patterns))]
|
||||
|
||||
|
||||
# This implements the abstract function open_file with one argument, file_request,
|
||||
# which is a FileRequest object and returns a string or None.
|
||||
class open_file(
|
||||
ImplFunc[[FileRequest, str], SuccessDataClass[list[str] | None] | ErrorDataClass]
|
||||
):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def async_run(self, file_request: FileRequest, op_key: str) -> bool:
|
||||
def on_file_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
|
||||
def open_file(
|
||||
file_request: FileRequest, *, op_key: str
|
||||
) -> SuccessDataClass[list[str] | None] | ErrorDataClass:
|
||||
try:
|
||||
gfile = file_dialog.open_finish(task)
|
||||
if gfile:
|
||||
selected_path = remove_none([gfile.get_path()])
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
root = Tk()
|
||||
root.withdraw() # Hide the main window
|
||||
root.attributes("-topmost", True) # Bring the dialogs to the front
|
||||
|
||||
file_paths: list[str] | None = None
|
||||
|
||||
if file_request.mode == "open_file":
|
||||
file_path = filedialog.askopenfilename(
|
||||
title=file_request.title,
|
||||
initialdir=file_request.initial_folder,
|
||||
initialfile=file_request.initial_file,
|
||||
filetypes=_apply_filters(file_request.filters),
|
||||
)
|
||||
file_paths = [file_path]
|
||||
elif file_request.mode == "select_folder":
|
||||
file_path = filedialog.askdirectory(
|
||||
title=file_request.title, initialdir=file_request.initial_folder
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected file or directory: {e}")
|
||||
|
||||
def on_file_select_multiple(
|
||||
file_dialog: Gtk.FileDialog, task: Gio.Task
|
||||
) -> None:
|
||||
try:
|
||||
gfiles: Any = file_dialog.open_multiple_finish(task)
|
||||
if gfiles:
|
||||
selected_paths = remove_none([gfile.get_path() for gfile in gfiles])
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_paths, status="success"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.returns(
|
||||
SuccessDataClass(op_key=op_key, data=None, status="success")
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected files: {e}")
|
||||
|
||||
def on_folder_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
|
||||
try:
|
||||
gfile = file_dialog.select_folder_finish(task)
|
||||
if gfile:
|
||||
selected_path = remove_none([gfile.get_path()])
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.returns(
|
||||
SuccessDataClass(op_key=op_key, data=None, status="success")
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected directory: {e}")
|
||||
|
||||
def on_save_finish(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
|
||||
try:
|
||||
gfile = file_dialog.save_finish(task)
|
||||
if gfile:
|
||||
selected_path = remove_none([gfile.get_path()])
|
||||
self.returns(
|
||||
SuccessDataClass(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.returns(
|
||||
SuccessDataClass(op_key=op_key, data=None, status="success")
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected file: {e}")
|
||||
|
||||
dialog = Gtk.FileDialog()
|
||||
|
||||
if file_request.title:
|
||||
dialog.set_title(file_request.title)
|
||||
|
||||
if file_request.filters:
|
||||
filters = Gio.ListStore.new(Gtk.FileFilter)
|
||||
file_filters = Gtk.FileFilter()
|
||||
|
||||
if file_request.filters.title:
|
||||
file_filters.set_name(file_request.filters.title)
|
||||
|
||||
if file_request.filters.mime_types:
|
||||
for mime in file_request.filters.mime_types:
|
||||
file_filters.add_mime_type(mime)
|
||||
filters.append(file_filters)
|
||||
|
||||
if file_request.filters.patterns:
|
||||
for pattern in file_request.filters.patterns:
|
||||
file_filters.add_pattern(pattern)
|
||||
|
||||
if file_request.filters.suffixes:
|
||||
for suffix in file_request.filters.suffixes:
|
||||
file_filters.add_suffix(suffix)
|
||||
|
||||
filters.append(file_filters)
|
||||
dialog.set_filters(filters)
|
||||
|
||||
if file_request.initial_file:
|
||||
p = Path(file_request.initial_file).expanduser()
|
||||
f = Gio.File.new_for_path(str(p))
|
||||
dialog.set_initial_file(f)
|
||||
|
||||
if file_request.initial_folder:
|
||||
p = Path(file_request.initial_folder).expanduser()
|
||||
f = Gio.File.new_for_path(str(p))
|
||||
dialog.set_initial_folder(f)
|
||||
|
||||
# if select_folder
|
||||
if file_request.mode == "select_folder":
|
||||
dialog.select_folder(callback=on_folder_select)
|
||||
if file_request.mode == "open_multiple_files":
|
||||
dialog.open_multiple(callback=on_file_select_multiple)
|
||||
elif file_request.mode == "open_file":
|
||||
dialog.open(callback=on_file_select)
|
||||
file_paths = [file_path]
|
||||
elif file_request.mode == "save":
|
||||
dialog.save(callback=on_save_finish)
|
||||
file_path = filedialog.asksaveasfilename(
|
||||
title=file_request.title,
|
||||
initialdir=file_request.initial_folder,
|
||||
initialfile=file_request.initial_file,
|
||||
filetypes=_apply_filters(file_request.filters),
|
||||
)
|
||||
file_paths = [file_path]
|
||||
elif file_request.mode == "open_multiple_files":
|
||||
file_paths = list(
|
||||
filedialog.askopenfilenames(
|
||||
title=file_request.title,
|
||||
initialdir=file_request.initial_folder,
|
||||
filetypes=_apply_filters(file_request.filters),
|
||||
)
|
||||
)
|
||||
|
||||
return GLib.SOURCE_REMOVE
|
||||
if not file_paths:
|
||||
msg = "No file selected or operation canceled by the user"
|
||||
raise ValueError(msg) # noqa: TRY301
|
||||
|
||||
return SuccessDataClass(op_key, status="success", data=file_paths)
|
||||
|
||||
except Exception as e:
|
||||
log.exception("Error opening file")
|
||||
return ErrorDataClass(
|
||||
op_key=op_key,
|
||||
status="error",
|
||||
errors=[
|
||||
ApiError(
|
||||
message=e.__class__.__name__,
|
||||
description=str(e),
|
||||
location=["open_file"],
|
||||
)
|
||||
],
|
||||
)
|
||||
finally:
|
||||
root.destroy()
|
||||
|
||||
48
pkgs/clan-app/clan_app/app.py
Normal file
48
pkgs/clan-app/clan_app/app.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import logging
|
||||
|
||||
from clan_cli.profiler import profile
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.api import API
|
||||
from clan_cli.custom_logger import setup_logging
|
||||
|
||||
from clan_app.api.file import open_file
|
||||
from clan_app.deps.webview.webview import Size, SizeHint, Webview
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClanAppOptions:
|
||||
content_uri: str
|
||||
debug: bool
|
||||
|
||||
|
||||
@profile
|
||||
def app_run(app_opts: ClanAppOptions) -> int:
|
||||
if app_opts.debug:
|
||||
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
|
||||
setup_logging(logging.DEBUG, root_log_name="clan_cli")
|
||||
else:
|
||||
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])
|
||||
|
||||
log.debug("Debug mode enabled")
|
||||
|
||||
if app_opts.content_uri:
|
||||
content_uri = app_opts.content_uri
|
||||
else:
|
||||
site_index: Path = Path(os.getenv("WEBUI_PATH", ".")).resolve() / "index.html"
|
||||
content_uri = f"file://{site_index}"
|
||||
|
||||
webview = Webview(debug=app_opts.debug)
|
||||
|
||||
API.overwrite_fn(open_file)
|
||||
webview.bind_jsonschema_api(API)
|
||||
webview.size = Size(1280, 1024, SizeHint.NONE)
|
||||
webview.navigate(content_uri)
|
||||
webview.run()
|
||||
return 0
|
||||
@@ -69,6 +69,7 @@ class _WebviewLibrary:
|
||||
except Exception as e:
|
||||
print(f"Failed to load webview library: {e}")
|
||||
raise
|
||||
|
||||
# Define FFI functions
|
||||
self.webview_create = self.lib.webview_create
|
||||
self.webview_create.argtypes = [c_int, c_void_p]
|
||||
|
||||
@@ -1,211 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
from clan_cli.api import MethodRegistry, dataclass_to_dict, from_dict
|
||||
|
||||
from clan_app.api import GObjApi, GResult, ImplFunc
|
||||
from clan_app.api.file import open_file
|
||||
|
||||
gi.require_version("WebKit", "6.0")
|
||||
from gi.repository import Gio, GLib, GObject, WebKit
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WebExecutor(GObject.Object):
|
||||
def __init__(self, content_uri: str, jschema_api: MethodRegistry) -> None:
|
||||
super().__init__()
|
||||
self.jschema_api: MethodRegistry = jschema_api
|
||||
self.webview: WebKit.WebView = WebKit.WebView()
|
||||
|
||||
settings: WebKit.Settings = self.webview.get_settings()
|
||||
# settings.
|
||||
settings.set_property("enable-developer-extras", True)
|
||||
self.webview.set_settings(settings)
|
||||
# FIXME: This filtering is incomplete, it only triggers if a user clicks a link
|
||||
self.webview.connect("decide-policy", self.on_decide_policy)
|
||||
# For when the page is fully loaded
|
||||
self.webview.connect("load-changed", self.on_load_changed)
|
||||
self.manager: WebKit.UserContentManager = (
|
||||
self.webview.get_user_content_manager()
|
||||
)
|
||||
# Can be called with: window.webkit.messageHandlers.gtk.postMessage("...")
|
||||
# Important: it seems postMessage must be given some payload, otherwise it won't trigger the event
|
||||
self.manager.register_script_message_handler("gtk")
|
||||
self.manager.connect("script-message-received", self.on_message_received)
|
||||
|
||||
self.webview.load_uri(content_uri)
|
||||
self.content_uri = content_uri
|
||||
|
||||
self.api: GObjApi = GObjApi(self.jschema_api.functions)
|
||||
|
||||
self.api.overwrite_fn(open_file)
|
||||
self.api.check_signature(self.jschema_api.signatures)
|
||||
|
||||
def on_load_changed(
|
||||
self, webview: WebKit.WebView, load_event: WebKit.LoadEvent
|
||||
) -> None:
|
||||
if load_event == WebKit.LoadEvent.FINISHED:
|
||||
if log.isEnabledFor(logging.DEBUG):
|
||||
pass
|
||||
# inspector = webview.get_inspector()
|
||||
# inspector.show()
|
||||
|
||||
def on_decide_policy(
|
||||
self,
|
||||
webview: WebKit.WebView,
|
||||
decision: WebKit.NavigationPolicyDecision,
|
||||
decision_type: WebKit.PolicyDecisionType,
|
||||
) -> bool:
|
||||
if decision_type != WebKit.PolicyDecisionType.NAVIGATION_ACTION:
|
||||
return False # Continue with the default handler
|
||||
|
||||
navigation_action: WebKit.NavigationAction = decision.get_navigation_action()
|
||||
request: WebKit.URIRequest = navigation_action.get_request()
|
||||
uri = request.get_uri()
|
||||
if self.content_uri.startswith("http://") and uri.startswith(self.content_uri):
|
||||
log.debug(f"Allow navigation request: {uri}")
|
||||
return False
|
||||
if self.content_uri.startswith("file://") and uri.startswith(self.content_uri):
|
||||
log.debug(f"Allow navigation request: {uri}")
|
||||
return False
|
||||
log.warning(
|
||||
f"Do not allow navigation request: {uri}. Current content uri: {self.content_uri}"
|
||||
)
|
||||
decision.ignore()
|
||||
return True # Stop other handlers from being invoked
|
||||
|
||||
def on_message_received(
|
||||
self, user_content_manager: WebKit.UserContentManager, message: Any
|
||||
) -> None:
|
||||
json_msg = message.to_json(4) # 4 is num of indents
|
||||
log.debug(f"Webview Request: {json_msg}")
|
||||
payload = json.loads(json_msg)
|
||||
method_name = payload["method"]
|
||||
data = payload.get("data")
|
||||
|
||||
# Get the function gobject from the api
|
||||
if not self.api.has_obj(method_name):
|
||||
self.return_data_to_js(
|
||||
method_name,
|
||||
json.dumps(
|
||||
{
|
||||
"op_key": data["op_key"],
|
||||
"status": "error",
|
||||
"errors": [
|
||||
{
|
||||
"message": "Internal API Error",
|
||||
"description": f"Function '{method_name}' not found",
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
)
|
||||
return
|
||||
|
||||
function_obj = self.api.get_obj(method_name)
|
||||
|
||||
# Create an instance of the function gobject
|
||||
fn_instance = function_obj()
|
||||
fn_instance.await_result(self.on_result)
|
||||
|
||||
# Extract the data from the payload
|
||||
if data is None:
|
||||
log.error(
|
||||
f"JS function call '{method_name}' has no data field. Skipping execution."
|
||||
)
|
||||
return
|
||||
|
||||
if data.get("op_key") is None:
|
||||
log.error(
|
||||
f"JS function call '{method_name}' has no op_key field. Skipping execution."
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
# Initialize dataclasses from the payload
|
||||
reconciled_arguments = {}
|
||||
for k, v in data.items():
|
||||
# Some functions expect to be called with dataclass instances
|
||||
# But the js api returns dictionaries.
|
||||
# Introspect the function and create the expected dataclass from dict dynamically
|
||||
# Depending on the introspected argument_type
|
||||
arg_class = self.jschema_api.get_method_argtype(method_name, k)
|
||||
|
||||
# TODO: rename from_dict into something like construct_checked_value
|
||||
# from_dict really takes Anything and returns an instance of the type/class
|
||||
reconciled_arguments[k] = from_dict(arg_class, v)
|
||||
|
||||
GLib.idle_add(fn_instance.internal_async_run, reconciled_arguments)
|
||||
except Exception as e:
|
||||
self.return_data_to_js(
|
||||
method_name,
|
||||
json.dumps(
|
||||
{
|
||||
"op_key": data["op_key"],
|
||||
"status": "error",
|
||||
"errors": [
|
||||
{
|
||||
"message": "Internal API Error",
|
||||
"description": traceback.format_exception(e),
|
||||
}
|
||||
],
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
def on_result(self, source: ImplFunc, data: GResult) -> None:
|
||||
result = dataclass_to_dict(data.result)
|
||||
# Important:
|
||||
# 2. ensure_ascii = False. non-ASCII characters are correctly handled, instead of being escaped.
|
||||
try:
|
||||
serialized = json.dumps(result, indent=4, ensure_ascii=False)
|
||||
except TypeError:
|
||||
log.exception(f"Error serializing result for {data.method_name}")
|
||||
raise
|
||||
log.debug(f"Result for {data.method_name}: {serialized}")
|
||||
|
||||
# Use idle_add to queue the response call to js on the main GTK thread
|
||||
self.return_data_to_js(data.method_name, serialized)
|
||||
|
||||
def return_data_to_js(self, method_name: str, serialized: str) -> bool:
|
||||
js = f"""
|
||||
window.clan.{method_name}({serialized});
|
||||
"""
|
||||
|
||||
def dump_failed_code() -> None:
|
||||
tmp_file = Path("/tmp/clan-pyjs-bridge-error.js")
|
||||
with tmp_file.open("w") as f:
|
||||
f.write(js)
|
||||
log.debug(f"Failed code dumped in JS file: {tmp_file}")
|
||||
|
||||
# Error handling if the JavaScript evaluation fails
|
||||
def on_js_evaluation_finished(
|
||||
webview: WebKit.WebView, task: Gio.AsyncResult
|
||||
) -> None:
|
||||
try:
|
||||
# Get the result of the JavaScript evaluation
|
||||
value = webview.evaluate_javascript_finish(task)
|
||||
if not value:
|
||||
log.exception("No value returned")
|
||||
dump_failed_code()
|
||||
except GLib.Error:
|
||||
log.exception("Error evaluating JS")
|
||||
dump_failed_code()
|
||||
|
||||
self.webview.evaluate_javascript(
|
||||
js,
|
||||
-1,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
on_js_evaluation_finished,
|
||||
)
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def get_webview(self) -> WebKit.WebView:
|
||||
return self.webview
|
||||
@@ -1,18 +1,11 @@
|
||||
{
|
||||
python3,
|
||||
python3Full,
|
||||
runCommand,
|
||||
setuptools,
|
||||
copyDesktopItems,
|
||||
pygobject3,
|
||||
wrapGAppsHook4,
|
||||
gtk4,
|
||||
adwaita-icon-theme,
|
||||
pygobject-stubs,
|
||||
gobject-introspection,
|
||||
clan-cli,
|
||||
makeDesktopItem,
|
||||
libadwaita,
|
||||
webkitgtk_6_0,
|
||||
pytest, # Testing framework
|
||||
pytest-cov, # Generate coverage reports
|
||||
pytest-subprocess, # fake the real subprocess behavior to make your tests more independent.
|
||||
@@ -35,17 +28,11 @@ let
|
||||
|
||||
# Dependencies that are directly used in the project but nor from internal python packages
|
||||
externalPythonDeps = [
|
||||
pygobject3
|
||||
pygobject-stubs
|
||||
gtk4
|
||||
libadwaita
|
||||
webkitgtk_6_0
|
||||
adwaita-icon-theme
|
||||
|
||||
];
|
||||
|
||||
# Deps including python packages from the local project
|
||||
allPythonDeps = [ (python3.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps;
|
||||
allPythonDeps = [ (python3Full.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps;
|
||||
|
||||
# Runtime binary dependencies required by the application
|
||||
runtimeDependencies = [
|
||||
@@ -68,9 +55,9 @@ let
|
||||
testDependencies = runtimeDependencies ++ allPythonDeps ++ externalTestDeps;
|
||||
|
||||
# Setup Python environment with all dependencies for running tests
|
||||
pythonWithTestDeps = python3.withPackages (_ps: testDependencies);
|
||||
pythonWithTestDeps = python3Full.withPackages (_ps: testDependencies);
|
||||
in
|
||||
python3.pkgs.buildPythonApplication rec {
|
||||
python3Full.pkgs.buildPythonApplication rec {
|
||||
name = "clan-app";
|
||||
src = source;
|
||||
format = "pyproject";
|
||||
@@ -79,7 +66,7 @@ python3.pkgs.buildPythonApplication rec {
|
||||
preFixup = ''
|
||||
makeWrapperArgs+=(
|
||||
--set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf
|
||||
--set WEBUI_PATH "$out/${python3.sitePackages}/clan_app/.webui"
|
||||
--set WEBUI_PATH "$out/${python3Full.sitePackages}/clan_app/.webui"
|
||||
--set WEBVIEW_LIB_DIR "${webview-lib}/lib"
|
||||
# This prevents problems with mixed glibc versions that might occur when the
|
||||
# cli is called through a browser built against another glibc
|
||||
@@ -93,8 +80,6 @@ python3.pkgs.buildPythonApplication rec {
|
||||
setuptools
|
||||
copyDesktopItems
|
||||
wrapGAppsHook4
|
||||
|
||||
gobject-introspection
|
||||
];
|
||||
|
||||
# The necessity of setting buildInputs and propagatedBuildInputs to the
|
||||
@@ -149,8 +134,8 @@ python3.pkgs.buildPythonApplication rec {
|
||||
passthru.testDependencies = testDependencies;
|
||||
|
||||
postInstall = ''
|
||||
mkdir -p $out/${python3.sitePackages}/clan_app/.webui
|
||||
cp -r ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* $out/${python3.sitePackages}/clan_app/.webui
|
||||
mkdir -p $out/${python3Full.sitePackages}/clan_app/.webui
|
||||
cp -r ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* $out/${python3Full.sitePackages}/clan_app/.webui
|
||||
mkdir -p $out/share/icons/hicolor
|
||||
cp -r ./clan_app/assets/white-favicons/* $out/share/icons/hicolor
|
||||
'';
|
||||
|
||||
@@ -68,6 +68,7 @@ mkShell {
|
||||
export XDG_DATA_DIRS=${gtk4}/share/gsettings-schemas/gtk4-4.14.4:$XDG_DATA_DIRS
|
||||
export XDG_DATA_DIRS=${gsettings-desktop-schemas}/share/gsettings-schemas/gsettings-desktop-schemas-46.0:$XDG_DATA_DIRS
|
||||
|
||||
export WEBVIEW_LIB_DIR=${webview-lib}/lib
|
||||
# export WEBVIEW_LIB_DIR=${webview-lib}/lib
|
||||
export WEBVIEW_LIB_DIR=$HOME/Projects/webview/build/core
|
||||
'';
|
||||
}
|
||||
|
||||
@@ -54,9 +54,7 @@ def update_wrapper_signature(wrapper: Callable, wrapped: Callable) -> None:
|
||||
params = list(sig.parameters.values())
|
||||
|
||||
# Add 'op_key' parameter
|
||||
op_key_param = Parameter(
|
||||
"op_key", Parameter.KEYWORD_ONLY, default=None, annotation=str
|
||||
)
|
||||
op_key_param = Parameter("op_key", Parameter.KEYWORD_ONLY, annotation=str)
|
||||
params.append(op_key_param)
|
||||
|
||||
# Create a new signature
|
||||
@@ -110,6 +108,21 @@ API.register(open_file)
|
||||
self.register(wrapper)
|
||||
return fn
|
||||
|
||||
def overwrite_fn(self, fn: Callable[..., Any]) -> None:
|
||||
fn_name = fn.__name__
|
||||
|
||||
if fn_name not in self._registry:
|
||||
msg = f"Function '{fn_name}' is not registered as an API method"
|
||||
raise ClanError(msg)
|
||||
|
||||
fn_signature = signature(fn)
|
||||
abstract_signature = signature(self._registry[fn_name])
|
||||
if fn_signature != abstract_signature:
|
||||
msg = f"Expected signature: {abstract_signature}\nActual signature: {fn_signature}"
|
||||
raise ClanError(msg)
|
||||
|
||||
self._registry[fn_name] = fn
|
||||
|
||||
F = TypeVar("F", bound=Callable[..., Any])
|
||||
|
||||
def register(self, fn: F) -> F:
|
||||
@@ -125,7 +138,7 @@ API.register(open_file)
|
||||
@wraps(fn)
|
||||
def wrapper(*args: Any, op_key: str, **kwargs: Any) -> ApiResponse[T]:
|
||||
try:
|
||||
data: T = fn(*args, op_key=op_key, **kwargs)
|
||||
data: T = fn(*args, **kwargs)
|
||||
return SuccessDataClass(status="success", data=data, op_key=op_key)
|
||||
except ClanError as e:
|
||||
log.exception(f"Error calling wrapped {fn.__name__}")
|
||||
|
||||
@@ -393,10 +393,9 @@ def run(
|
||||
)
|
||||
|
||||
if options.check and process.returncode != 0:
|
||||
err = ClanCmdError(cmd_out)
|
||||
err.msg = str(stderr_buf)
|
||||
err.description = "Command has been cancelled"
|
||||
raise err
|
||||
if is_async_cancelled():
|
||||
cmd_out.msg = "Command cancelled"
|
||||
raise ClanCmdError(cmd_out)
|
||||
|
||||
return cmd_out
|
||||
|
||||
|
||||
Reference in New Issue
Block a user