Merge pull request 'Move clan-app to platform independent webview lib' (#2690) from Qubasa/clan-core:Qubasa-main into main

This commit is contained in:
clan-bot
2025-01-06 23:25:56 +00:00
35 changed files with 579 additions and 1259 deletions

View File

@@ -4,6 +4,7 @@ from pathlib import Path
module_path = Path(__file__).parent.parent.absolute() module_path = Path(__file__).parent.parent.absolute()
sys.path.insert(0, str(module_path)) sys.path.insert(0, str(module_path))
sys.path.insert(0, str(module_path.parent / "clan_cli")) sys.path.insert(0, str(module_path.parent / "clan_cli"))

View File

@@ -17,6 +17,9 @@
}, },
{ {
"path": "../webview-ui" "path": "../webview-ui"
},
{
"path": "../webview-lib"
} }
], ],
"settings": { "settings": {

View File

@@ -1,18 +1,24 @@
import argparse
import logging import logging
import sys import sys
# Remove working directory from sys.path
if "" in sys.path:
sys.path.remove("")
from clan_cli.profiler import profile from clan_cli.profiler import profile
from clan_app.app import MainApplication
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from clan_app.app import ClanAppOptions, app_run
@profile @profile
def main(argv: list[str] = sys.argv) -> int: def main(argv: list[str] = sys.argv) -> int:
app = MainApplication() parser = argparse.ArgumentParser(description="Clan App")
return app.run(argv) parser.add_argument(
"--content-uri", type=str, help="The URI of the content to display"
)
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args(argv[1:])
app_opts = ClanAppOptions(content_uri=args.content_uri, debug=args.debug)
app_run(app_opts)
return 0

View File

@@ -1,158 +0,0 @@
import inspect
import logging
import threading
from collections.abc import Callable
from typing import (
Any,
ClassVar,
Generic,
ParamSpec,
TypeVar,
cast,
)
from clan_cli.errors import ClanError
from gi.repository import GLib, GObject
log = logging.getLogger(__name__)
class GResult(GObject.Object):
result: Any
op_key: str
method_name: str
def __init__(self, result: Any, method_name: str) -> None:
super().__init__()
self.result = result
self.method_name = method_name
B = TypeVar("B")
P = ParamSpec("P")
class ImplFunc(GObject.Object, Generic[P, B]):
op_key: str | None = None
__gsignals__: ClassVar = {
"returns": (GObject.SignalFlags.RUN_FIRST, None, [GResult]),
}
def returns(self, result: B, *, method_name: str | None = None) -> None:
if method_name is None:
method_name = self.__class__.__name__
self.emit("returns", GResult(result, method_name))
def await_result(self, fn: Callable[["ImplFunc[..., Any]", B], None]) -> None:
self.connect("returns", fn)
def async_run(self, *args: P.args, **kwargs: P.kwargs) -> bool:
msg = "Method 'async_run' must be implemented"
raise NotImplementedError(msg)
def internal_async_run(self, data: Any) -> bool:
result = GLib.SOURCE_REMOVE
try:
result = self.async_run(**data)
except Exception:
log.exception("Error in async_run")
# TODO: send error to js
return result
# TODO: Reimplement this such that it uses a multiprocessing.Array of type ctypes.c_char
# all fn arguments are serialized to json and passed to the new process over the Array
# the new process deserializes the json and calls the function
# the result is serialized to json and passed back to the main process over another Array
class MethodExecutor(threading.Thread):
def __init__(
self, function: Callable[..., Any], *args: Any, **kwargs: dict[str, Any]
) -> None:
super().__init__()
self.function = function
self.args = args
self.kwargs = kwargs
self.result: Any = None
self.finished = False
def run(self) -> None:
try:
self.result = self.function(*self.args, **self.kwargs)
except Exception:
log.exception("Error in MethodExecutor")
finally:
self.finished = True
class GObjApi:
def __init__(self, methods: dict[str, Callable[..., Any]]) -> None:
self._methods: dict[str, Callable[..., Any]] = methods
self._obj_registry: dict[str, type[ImplFunc]] = {}
def overwrite_fn(self, obj: type[ImplFunc]) -> None:
fn_name = obj.__name__
if fn_name in self._obj_registry:
msg = f"Function '{fn_name}' already registered"
raise ClanError(msg)
self._obj_registry[fn_name] = obj
def check_signature(self, fn_signatures: dict[str, inspect.Signature]) -> None:
overwrite_fns = self._obj_registry
# iterate over the methods and check if all are implemented
for m_name, m_signature in fn_signatures.items():
if m_name not in overwrite_fns:
continue
# check if the signature of the overridden method matches
# the implementation signature
exp_args = []
exp_return = m_signature.return_annotation
for param in dict(m_signature.parameters).values():
exp_args.append(param.annotation)
exp_signature = (tuple(exp_args), exp_return)
# implementation signature
obj = dict(overwrite_fns[m_name].__dict__)
obj_type = obj["__orig_bases__"][0]
got_signature = obj_type.__args__
if exp_signature != got_signature:
log.error(f"Expected signature: {exp_signature}")
log.error(f"Actual signature: {got_signature}")
msg = f"Overwritten method '{m_name}' has different signature than the implementation"
raise ClanError(msg)
def has_obj(self, fn_name: str) -> bool:
return fn_name in self._obj_registry or fn_name in self._methods
def get_obj(self, fn_name: str) -> type[ImplFunc]:
result = self._obj_registry.get(fn_name, None)
if result is not None:
return result
plain_fn = self._methods.get(fn_name, None)
if plain_fn is None:
msg = f"Method '{fn_name}' not found in Api"
raise ClanError(msg)
class GenericFnRuntime(ImplFunc[..., Any]):
def __init__(self) -> None:
super().__init__()
self.thread: MethodExecutor | None = None
def async_run(self, *args: Any, **kwargs: dict[str, Any]) -> bool:
assert plain_fn is not None
if self.thread is None:
self.thread = MethodExecutor(plain_fn, *args, **kwargs)
self.thread.start()
return GLib.SOURCE_CONTINUE
if self.thread.finished:
result = self.thread.result
self.returns(method_name=fn_name, result=result)
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
return cast(type[ImplFunc], GenericFnRuntime)

View File

@@ -1,146 +1,96 @@
# ruff: noqa: N801 # ruff: noqa: N801
import gi
gi.require_version("Gtk", "4.0")
import logging import logging
from pathlib import Path from tkinter import Tk, filedialog
from typing import Any
from clan_cli.api import ErrorDataClass, SuccessDataClass from clan_cli.api import ApiError, ErrorDataClass, SuccessDataClass
from clan_cli.api.directory import FileRequest from clan_cli.api.directory import FileFilter, FileRequest
from gi.repository import Gio, GLib, Gtk
from clan_app.api import ImplFunc
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def remove_none(_list: list) -> list: def _apply_filters(filters: FileFilter | None) -> list[tuple[str, str]]:
return [i for i in _list if i is not None] if not filters:
return []
filter_patterns = []
if filters.mime_types:
# Tkinter does not directly support MIME types, so this section can be adjusted
# if you wish to handle them differently
filter_patterns.extend(filters.mime_types)
if filters.patterns:
filter_patterns.extend(filters.patterns)
if filters.suffixes:
suffix_patterns = [f"*.{suffix}" for suffix in filters.suffixes]
filter_patterns.extend(suffix_patterns)
filter_title = filters.title if filters.title else "Custom Files"
return [(filter_title, " ".join(filter_patterns))]
# This implements the abstract function open_file with one argument, file_request, def open_file(
# which is a FileRequest object and returns a string or None. file_request: FileRequest, *, op_key: str
class open_file( ) -> SuccessDataClass[list[str] | None] | ErrorDataClass:
ImplFunc[[FileRequest, str], SuccessDataClass[list[str] | None] | ErrorDataClass] try:
): root = Tk()
def __init__(self) -> None: root.withdraw() # Hide the main window
super().__init__() root.attributes("-topmost", True) # Bring the dialogs to the front
def async_run(self, file_request: FileRequest, op_key: str) -> bool: file_path: str = ""
def on_file_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None: multiple_files: list[str] = []
try:
gfile = file_dialog.open_finish(task)
if gfile:
selected_path = remove_none([gfile.get_path()])
self.returns(
SuccessDataClass(
op_key=op_key, data=selected_path, status="success"
)
)
except Exception as e:
print(f"Error getting selected file or directory: {e}")
def on_file_select_multiple( if file_request.mode == "open_file":
file_dialog: Gtk.FileDialog, task: Gio.Task file_path = filedialog.askopenfilename(
) -> None: title=file_request.title,
try: initialdir=file_request.initial_folder,
gfiles: Any = file_dialog.open_multiple_finish(task) initialfile=file_request.initial_file,
if gfiles: filetypes=_apply_filters(file_request.filters),
selected_paths = remove_none([gfile.get_path() for gfile in gfiles]) )
self.returns(
SuccessDataClass(
op_key=op_key, data=selected_paths, status="success"
)
)
else:
self.returns(
SuccessDataClass(op_key=op_key, data=None, status="success")
)
except Exception as e:
print(f"Error getting selected files: {e}")
def on_folder_select(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None: elif file_request.mode == "select_folder":
try: file_path = filedialog.askdirectory(
gfile = file_dialog.select_folder_finish(task) title=file_request.title, initialdir=file_request.initial_folder
if gfile: )
selected_path = remove_none([gfile.get_path()])
self.returns(
SuccessDataClass(
op_key=op_key, data=selected_path, status="success"
)
)
else:
self.returns(
SuccessDataClass(op_key=op_key, data=None, status="success")
)
except Exception as e:
print(f"Error getting selected directory: {e}")
def on_save_finish(file_dialog: Gtk.FileDialog, task: Gio.Task) -> None:
try:
gfile = file_dialog.save_finish(task)
if gfile:
selected_path = remove_none([gfile.get_path()])
self.returns(
SuccessDataClass(
op_key=op_key, data=selected_path, status="success"
)
)
else:
self.returns(
SuccessDataClass(op_key=op_key, data=None, status="success")
)
except Exception as e:
print(f"Error getting selected file: {e}")
dialog = Gtk.FileDialog()
if file_request.title:
dialog.set_title(file_request.title)
if file_request.filters:
filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters = Gtk.FileFilter()
if file_request.filters.title:
file_filters.set_name(file_request.filters.title)
if file_request.filters.mime_types:
for mime in file_request.filters.mime_types:
file_filters.add_mime_type(mime)
filters.append(file_filters)
if file_request.filters.patterns:
for pattern in file_request.filters.patterns:
file_filters.add_pattern(pattern)
if file_request.filters.suffixes:
for suffix in file_request.filters.suffixes:
file_filters.add_suffix(suffix)
filters.append(file_filters)
dialog.set_filters(filters)
if file_request.initial_file:
p = Path(file_request.initial_file).expanduser()
f = Gio.File.new_for_path(str(p))
dialog.set_initial_file(f)
if file_request.initial_folder:
p = Path(file_request.initial_folder).expanduser()
f = Gio.File.new_for_path(str(p))
dialog.set_initial_folder(f)
# if select_folder
if file_request.mode == "select_folder":
dialog.select_folder(callback=on_folder_select)
if file_request.mode == "open_multiple_files":
dialog.open_multiple(callback=on_file_select_multiple)
elif file_request.mode == "open_file":
dialog.open(callback=on_file_select)
elif file_request.mode == "save": elif file_request.mode == "save":
dialog.save(callback=on_save_finish) file_path = filedialog.asksaveasfilename(
title=file_request.title,
initialdir=file_request.initial_folder,
initialfile=file_request.initial_file,
filetypes=_apply_filters(file_request.filters),
)
return GLib.SOURCE_REMOVE elif file_request.mode == "open_multiple_files":
tresult = filedialog.askopenfilenames(
title=file_request.title,
initialdir=file_request.initial_folder,
filetypes=_apply_filters(file_request.filters),
)
multiple_files = list(tresult)
if len(file_path) == 0 and len(multiple_files) == 0:
msg = "No file selected"
raise ValueError(msg) # noqa: TRY301
multiple_files = [file_path] if len(multiple_files) == 0 else multiple_files
return SuccessDataClass(op_key, status="success", data=multiple_files)
except Exception as e:
log.exception("Error opening file")
return ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message=e.__class__.__name__,
description=str(e),
location=["open_file"],
)
],
)
finally:
root.destroy()

View File

@@ -1,143 +1,49 @@
#!/usr/bin/env python3
import logging import logging
import os
from typing import Any, ClassVar
import gi from clan_cli.profiler import profile
from clan_app import assets
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from pathlib import Path
from clan_cli.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
from clan_app.components.interfaces import ClanConfig
from .windows.main_window import MainWindow
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class MainApplication(Adw.Application): import os
""" from dataclasses import dataclass
This class is initialized every time the app is started from pathlib import Path
Only the Adw.ApplicationWindow is a singleton.
So don't use any singletons in the Adw.Application class.
"""
__gsignals__: ClassVar = { from clan_cli.api import API
"join_request": (GObject.SignalFlags.RUN_FIRST, None, [str]), from clan_cli.custom_logger import setup_logging
}
def __init__(self, *args: Any, **kwargs: Any) -> None: from clan_app.api.file import open_file
super().__init__( from clan_app.deps.webview.webview import Size, SizeHint, Webview
application_id="org.clan.app",
flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE,
)
self.add_main_option(
"debug",
ord("d"),
GLib.OptionFlags.NONE,
GLib.OptionArg.NONE,
"enable debug mode",
None,
)
self.add_main_option( @dataclass
"content-uri", class ClanAppOptions:
GLib.OptionFlags.NONE, content_uri: str
GLib.OptionFlags.NONE, debug: bool
GLib.OptionArg.STRING,
"set the webview content uri",
None,
)
@profile
def app_run(app_opts: ClanAppOptions) -> int:
if app_opts.debug:
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
setup_logging(logging.DEBUG, root_log_name="clan_cli")
else:
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])
setup_logging(logging.INFO, root_log_name="clan_cli")
log.debug("Debug mode enabled")
if app_opts.content_uri:
content_uri = app_opts.content_uri
else:
site_index: Path = Path(os.getenv("WEBUI_PATH", ".")).resolve() / "index.html" site_index: Path = Path(os.getenv("WEBUI_PATH", ".")).resolve() / "index.html"
self.content_uri = f"file://{site_index}" content_uri = f"file://{site_index}"
self.window: MainWindow | None = None
self.connect("activate", self.on_activate)
self.connect("shutdown", self.on_shutdown)
def on_shutdown(self, source: "MainApplication") -> None: webview = Webview(debug=app_opts.debug)
log.debug("Shutting down Adw.Application")
if self.get_windows() == []: API.overwrite_fn(open_file)
log.debug("No windows to destroy") webview.bind_jsonschema_api(API)
if self.window: webview.size = Size(1280, 1024, SizeHint.NONE)
# TODO: Doesn't seem to raise the destroy signal. Need to investigate webview.navigate(content_uri)
# self.get_windows() returns an empty list. Desync between window and application? webview.run()
self.window.close() return 0
else:
log.error("No window to destroy")
def do_command_line(self, command_line: Any) -> int:
options = command_line.get_options_dict()
# convert GVariantDict -> GVariant -> dict
options = options.end().unpack()
if "debug" in options and self.window is None:
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
setup_logging(logging.DEBUG, root_log_name="clan_cli")
elif self.window is None:
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])
log.debug("Debug logging enabled")
if "content-uri" in options:
self.content_uri = options["content-uri"]
log.debug(f"Setting content uri to {self.content_uri}")
args = command_line.get_arguments()
self.activate()
# Check if there are arguments that are not inside the options
if len(args) > 1:
non_option_args = [arg for arg in args[1:] if arg not in options.values()]
if non_option_args:
uri = non_option_args[0]
self.emit("join_request", uri)
return 0
def on_window_hide_unhide(self, *_args: Any) -> None:
if not self.window:
log.error("No window to hide/unhide")
return
if self.window.is_visible():
self.window.hide()
else:
self.window.present()
def dummy_menu_entry(self) -> None:
log.info("Dummy menu entry called")
def on_activate(self, source: "MainApplication") -> None:
if not self.window:
self.init_style()
self.window = MainWindow(
config=ClanConfig(initial_view="webview", content_uri=self.content_uri)
)
self.window.set_application(self)
self.window.show()
# TODO: For css styling
def init_style(self) -> None:
resource_path = assets.loc / "style.css"
log.debug(f"Style css path: {resource_path}")
css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(resource_path))
display = Gdk.Display.get_default()
assert display is not None
Gtk.StyleContext.add_provider_for_display(
display,
css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
)

View File

@@ -1,127 +0,0 @@
import dataclasses
import logging
import multiprocessing as mp
import os
import signal
import sys
import traceback
from collections.abc import Callable
from pathlib import Path
from typing import Any
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with out_file.open("w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@@ -1,11 +0,0 @@
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "4.0")
@dataclass
class ClanConfig:
initial_view: str
content_uri: str

View File

@@ -0,0 +1,114 @@
import ctypes
import ctypes.util
import os
import platform
from ctypes import CFUNCTYPE, c_char_p, c_int, c_void_p
from pathlib import Path
def _encode_c_string(s: str) -> bytes:
return s.encode("utf-8")
def _get_webview_version() -> str:
"""Get webview version from environment variable or use default"""
return os.getenv("WEBVIEW_VERSION", "0.8.1")
def _get_lib_names() -> list[str]:
"""Get platform-specific library names."""
system = platform.system().lower()
machine = platform.machine().lower()
if system == "windows":
if machine == "amd64" or machine == "x86_64":
return ["webview.dll", "WebView2Loader.dll"]
if machine == "arm64":
msg = "arm64 is not supported on Windows"
raise RuntimeError(msg)
msg = f"Unsupported architecture: {machine}"
raise RuntimeError(msg)
if system == "darwin":
if machine == "arm64":
return ["libwebview.aarch64.dylib"]
return ["libwebview.x86_64.dylib"]
# linux
return ["libwebview.so"]
def _be_sure_libraries() -> list[Path] | None:
"""Ensure libraries exist and return paths."""
lib_dir = os.environ.get("WEBVIEW_LIB_DIR")
if not lib_dir:
msg = "WEBVIEW_LIB_DIR environment variable is not set"
raise RuntimeError(msg)
lib_dir_p = Path(lib_dir)
lib_names = _get_lib_names()
lib_paths = [lib_dir_p / lib_name for lib_name in lib_names]
# Check if any library is missing
missing_libs = [path for path in lib_paths if not path.exists()]
if not missing_libs:
return lib_paths
return None
class _WebviewLibrary:
def __init__(self) -> None:
lib_names = _get_lib_names()
library_path = ctypes.util.find_library(lib_names[0])
if not library_path:
library_paths = _be_sure_libraries()
if not library_paths:
msg = f"Failed to find required library: {lib_names}"
raise RuntimeError(msg)
try:
self.lib = ctypes.cdll.LoadLibrary(str(library_paths[0]))
except Exception as e:
print(f"Failed to load webview library: {e}")
raise
# Define FFI functions
self.webview_create = self.lib.webview_create
self.webview_create.argtypes = [c_int, c_void_p]
self.webview_create.restype = c_void_p
self.webview_destroy = self.lib.webview_destroy
self.webview_destroy.argtypes = [c_void_p]
self.webview_run = self.lib.webview_run
self.webview_run.argtypes = [c_void_p]
self.webview_terminate = self.lib.webview_terminate
self.webview_terminate.argtypes = [c_void_p]
self.webview_set_title = self.lib.webview_set_title
self.webview_set_title.argtypes = [c_void_p, c_char_p]
self.webview_set_size = self.lib.webview_set_size
self.webview_set_size.argtypes = [c_void_p, c_int, c_int, c_int]
self.webview_navigate = self.lib.webview_navigate
self.webview_navigate.argtypes = [c_void_p, c_char_p]
self.webview_init = self.lib.webview_init
self.webview_init.argtypes = [c_void_p, c_char_p]
self.webview_eval = self.lib.webview_eval
self.webview_eval.argtypes = [c_void_p, c_char_p]
self.webview_bind = self.lib.webview_bind
self.webview_bind.argtypes = [c_void_p, c_char_p, c_void_p, c_void_p]
self.webview_unbind = self.lib.webview_unbind
self.webview_unbind.argtypes = [c_void_p, c_char_p]
self.webview_return = self.lib.webview_return
self.webview_return.argtypes = [c_void_p, c_char_p, c_int, c_char_p]
self.CFUNCTYPE = CFUNCTYPE
_webview_lib = _WebviewLibrary()

View File

@@ -0,0 +1,179 @@
import ctypes
import json
import logging
import threading
from collections.abc import Callable
from enum import IntEnum
from typing import Any
from clan_cli.api import MethodRegistry, dataclass_to_dict, from_dict
from ._webview_ffi import _encode_c_string, _webview_lib
log = logging.getLogger(__name__)
class SizeHint(IntEnum):
NONE = 0
MIN = 1
MAX = 2
FIXED = 3
class Size:
def __init__(self, width: int, height: int, hint: SizeHint) -> None:
self.width = width
self.height = height
self.hint = hint
class Webview:
def __init__(
self, debug: bool = False, size: Size | None = None, window: int | None = None
) -> None:
self._handle = _webview_lib.webview_create(int(debug), window)
self._callbacks: dict[str, Callable[..., Any]] = {}
if size:
self.size = size
@property
def size(self) -> Size:
return self._size
@size.setter
def size(self, value: Size) -> None:
_webview_lib.webview_set_size(
self._handle, value.width, value.height, value.hint
)
self._size = value
@property
def title(self) -> str:
return self._title
@title.setter
def title(self, value: str) -> None:
_webview_lib.webview_set_title(self._handle, _encode_c_string(value))
self._title = value
def destroy(self) -> None:
for name in list(self._callbacks.keys()):
self.unbind(name)
_webview_lib.webview_terminate(self._handle)
_webview_lib.webview_destroy(self._handle)
self._handle = None
def navigate(self, url: str) -> None:
_webview_lib.webview_navigate(self._handle, _encode_c_string(url))
def run(self) -> None:
_webview_lib.webview_run(self._handle)
self.destroy()
def bind_jsonschema_api(self, api: MethodRegistry) -> None:
for name, method in api.functions.items():
def wrapper(
seq: bytes,
req: bytes,
arg: int,
wrap_method: Callable[..., Any] = method,
method_name: str = name,
) -> None:
def thread_task() -> None:
args = json.loads(req.decode())
try:
log.debug(f"Calling {method_name}({args[0]})")
# Initialize dataclasses from the payload
reconciled_arguments = {}
for k, v in args[0].items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = api.get_method_argtype(method_name, k)
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
reconciled_arguments["op_key"] = seq.decode()
# TODO: We could remove the wrapper in the MethodRegistry
# and just call the method directly
result = wrap_method(**reconciled_arguments)
success = True
except Exception as e:
log.exception(f"Error calling {method_name}")
result = str(e)
success = False
try:
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
except TypeError:
log.exception(f"Error serializing result for {method_name}")
raise
log.debug(f"Result for {method_name}: {serialized}")
self.return_(seq.decode(), 0 if success else 1, serialized)
thread = threading.Thread(target=thread_task)
thread.start()
c_callback = _webview_lib.CFUNCTYPE(
None, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p
)(wrapper)
if name in self._callbacks:
msg = f"Callback {name} already exists. Skipping binding."
raise RuntimeError(msg)
self._callbacks[name] = c_callback
_webview_lib.webview_bind(
self._handle, _encode_c_string(name), c_callback, None
)
def bind(self, name: str, callback: Callable[..., Any]) -> None:
def wrapper(seq: bytes, req: bytes, arg: int) -> None:
args = json.loads(req.decode())
try:
result = callback(*args)
success = True
except Exception as e:
result = str(e)
success = False
self.return_(seq.decode(), 0 if success else 1, json.dumps(result))
c_callback = _webview_lib.CFUNCTYPE(
None, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p
)(wrapper)
self._callbacks[name] = c_callback
_webview_lib.webview_bind(
self._handle, _encode_c_string(name), c_callback, None
)
def unbind(self, name: str) -> None:
if name in self._callbacks:
_webview_lib.webview_unbind(self._handle, _encode_c_string(name))
del self._callbacks[name]
def return_(self, seq: str, status: int, result: str) -> None:
_webview_lib.webview_return(
self._handle, _encode_c_string(seq), status, _encode_c_string(result)
)
def eval(self, source: str) -> None:
_webview_lib.webview_eval(self._handle, _encode_c_string(source))
def init(self, source: str) -> None:
_webview_lib.webview_init(self._handle, _encode_c_string(source))
if __name__ == "__main__":
wv = Webview()
wv.title = "Hello, World!"
wv.navigate("https://www.google.com")
wv.run()

View File

@@ -1,118 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
log = logging.getLogger(__name__)
class ToastOverlay:
"""
The ToastOverlay is a class that manages the display of toasts
It should be used as a singleton in your application to prevent duplicate toasts
Usage
"""
# For some reason, the adw toast overlay cannot be subclassed
# Thats why it is added as a class property
overlay: Adw.ToastOverlay
active_toasts: set[str]
_instance: "None | ToastOverlay" = None
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ToastOverlay":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.overlay = Adw.ToastOverlay()
cls.active_toasts = set()
return cls._instance
def add_toast_unique(self, toast: Adw.Toast, key: str) -> None:
if key not in self.active_toasts:
self.active_toasts.add(key)
self.overlay.add_toast(toast)
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class WarningToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"<span foreground='orange'>⚠ Warning </span> {message}"
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class InfoToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span>❕</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class SuccessToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span foreground='green'>✅</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class LogToast:
toast: Adw.Toast
def __init__(
self,
message: str,
on_button_click: Callable[[], None],
button_label: str = "More",
persistent: bool = False,
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""Logs are available <span weight="regular">{message}</span>"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
self.toast.set_button_label(button_label)
self.toast.connect(
"button-clicked",
lambda _: on_button_click(),
)

View File

@@ -1,37 +0,0 @@
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
class ViewStack:
"""
This is a singleton.
It is initialized with the first call of use()
Usage:
ViewStack.use().set_visible()
ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
"""
_instance: "None | ViewStack" = None
view: Adw.ViewStack
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ViewStack":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.view = Adw.ViewStack()
return cls._instance

View File

@@ -1,211 +0,0 @@
import json
import logging
import traceback
from pathlib import Path
from typing import Any
import gi
from clan_cli.api import MethodRegistry, dataclass_to_dict, from_dict
from clan_app.api import GObjApi, GResult, ImplFunc
from clan_app.api.file import open_file
gi.require_version("WebKit", "6.0")
from gi.repository import Gio, GLib, GObject, WebKit
log = logging.getLogger(__name__)
class WebExecutor(GObject.Object):
def __init__(self, content_uri: str, jschema_api: MethodRegistry) -> None:
super().__init__()
self.jschema_api: MethodRegistry = jschema_api
self.webview: WebKit.WebView = WebKit.WebView()
settings: WebKit.Settings = self.webview.get_settings()
# settings.
settings.set_property("enable-developer-extras", True)
self.webview.set_settings(settings)
# FIXME: This filtering is incomplete, it only triggers if a user clicks a link
self.webview.connect("decide-policy", self.on_decide_policy)
# For when the page is fully loaded
self.webview.connect("load-changed", self.on_load_changed)
self.manager: WebKit.UserContentManager = (
self.webview.get_user_content_manager()
)
# Can be called with: window.webkit.messageHandlers.gtk.postMessage("...")
# Important: it seems postMessage must be given some payload, otherwise it won't trigger the event
self.manager.register_script_message_handler("gtk")
self.manager.connect("script-message-received", self.on_message_received)
self.webview.load_uri(content_uri)
self.content_uri = content_uri
self.api: GObjApi = GObjApi(self.jschema_api.functions)
self.api.overwrite_fn(open_file)
self.api.check_signature(self.jschema_api.signatures)
def on_load_changed(
self, webview: WebKit.WebView, load_event: WebKit.LoadEvent
) -> None:
if load_event == WebKit.LoadEvent.FINISHED:
if log.isEnabledFor(logging.DEBUG):
pass
# inspector = webview.get_inspector()
# inspector.show()
def on_decide_policy(
self,
webview: WebKit.WebView,
decision: WebKit.NavigationPolicyDecision,
decision_type: WebKit.PolicyDecisionType,
) -> bool:
if decision_type != WebKit.PolicyDecisionType.NAVIGATION_ACTION:
return False # Continue with the default handler
navigation_action: WebKit.NavigationAction = decision.get_navigation_action()
request: WebKit.URIRequest = navigation_action.get_request()
uri = request.get_uri()
if self.content_uri.startswith("http://") and uri.startswith(self.content_uri):
log.debug(f"Allow navigation request: {uri}")
return False
if self.content_uri.startswith("file://") and uri.startswith(self.content_uri):
log.debug(f"Allow navigation request: {uri}")
return False
log.warning(
f"Do not allow navigation request: {uri}. Current content uri: {self.content_uri}"
)
decision.ignore()
return True # Stop other handlers from being invoked
def on_message_received(
self, user_content_manager: WebKit.UserContentManager, message: Any
) -> None:
json_msg = message.to_json(4) # 4 is num of indents
log.debug(f"Webview Request: {json_msg}")
payload = json.loads(json_msg)
method_name = payload["method"]
data = payload.get("data")
# Get the function gobject from the api
if not self.api.has_obj(method_name):
self.return_data_to_js(
method_name,
json.dumps(
{
"op_key": data["op_key"],
"status": "error",
"errors": [
{
"message": "Internal API Error",
"description": f"Function '{method_name}' not found",
}
],
}
),
)
return
function_obj = self.api.get_obj(method_name)
# Create an instance of the function gobject
fn_instance = function_obj()
fn_instance.await_result(self.on_result)
# Extract the data from the payload
if data is None:
log.error(
f"JS function call '{method_name}' has no data field. Skipping execution."
)
return
if data.get("op_key") is None:
log.error(
f"JS function call '{method_name}' has no op_key field. Skipping execution."
)
return
try:
# Initialize dataclasses from the payload
reconciled_arguments = {}
for k, v in data.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = self.jschema_api.get_method_argtype(method_name, k)
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
GLib.idle_add(fn_instance.internal_async_run, reconciled_arguments)
except Exception as e:
self.return_data_to_js(
method_name,
json.dumps(
{
"op_key": data["op_key"],
"status": "error",
"errors": [
{
"message": "Internal API Error",
"description": traceback.format_exception(e),
}
],
}
),
)
def on_result(self, source: ImplFunc, data: GResult) -> None:
result = dataclass_to_dict(data.result)
# Important:
# 2. ensure_ascii = False. non-ASCII characters are correctly handled, instead of being escaped.
try:
serialized = json.dumps(result, indent=4, ensure_ascii=False)
except TypeError:
log.exception(f"Error serializing result for {data.method_name}")
raise
log.debug(f"Result for {data.method_name}: {serialized}")
# Use idle_add to queue the response call to js on the main GTK thread
self.return_data_to_js(data.method_name, serialized)
def return_data_to_js(self, method_name: str, serialized: str) -> bool:
js = f"""
window.clan.{method_name}({serialized});
"""
def dump_failed_code() -> None:
tmp_file = Path("/tmp/clan-pyjs-bridge-error.js")
with tmp_file.open("w") as f:
f.write(js)
log.debug(f"Failed code dumped in JS file: {tmp_file}")
# Error handling if the JavaScript evaluation fails
def on_js_evaluation_finished(
webview: WebKit.WebView, task: Gio.AsyncResult
) -> None:
try:
# Get the result of the JavaScript evaluation
value = webview.evaluate_javascript_finish(task)
if not value:
log.exception("No value returned")
dump_failed_code()
except GLib.Error:
log.exception("Error evaluating JS")
dump_failed_code()
self.webview.evaluate_javascript(
js,
-1,
None,
None,
None,
on_js_evaluation_finished,
)
return GLib.SOURCE_REMOVE
def get_webview(self) -> WebKit.WebView:
return self.webview

View File

@@ -1,51 +0,0 @@
import logging
import os
import gi
from clan_cli.api import API
from clan_app.components.interfaces import ClanConfig
from clan_app.singletons.toast import ToastOverlay
from clan_app.singletons.use_views import ViewStack
from clan_app.views.webview import WebExecutor
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio
log = logging.getLogger(__name__)
class MainWindow(Adw.ApplicationWindow):
def __init__(self, config: ClanConfig) -> None:
super().__init__()
self.set_title("Clan App")
self.set_default_size(1280, 1024)
# Overlay for GTK side exclusive toasts
overlay = ToastOverlay.use().overlay
view = Adw.ToolbarView()
overlay.set_child(view)
self.set_content(overlay)
header = Adw.HeaderBar()
view.add_top_bar(header)
app = Gio.Application.get_default()
assert app is not None
stack_view = ViewStack.use().view
webexec = WebExecutor(jschema_api=API, content_uri=config.content_uri)
stack_view.add_named(webexec.get_webview(), "webview")
stack_view.set_visible_child_name(config.initial_view)
view.set_content(stack_view)
self.connect("destroy", self.on_destroy)
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.debug("Destroying Adw.ApplicationWindow")
os._exit(0)

View File

@@ -1,24 +1,12 @@
{ {
python3, python3Full,
runCommand, runCommand,
setuptools, setuptools,
copyDesktopItems, copyDesktopItems,
pygobject3,
wrapGAppsHook4,
gtk4,
adwaita-icon-theme,
pygobject-stubs,
gobject-introspection,
clan-cli, clan-cli,
makeDesktopItem, makeDesktopItem,
libadwaita,
webkitgtk_6_0,
pytest, # Testing framework
pytest-cov, # Generate coverage reports
pytest-subprocess, # fake the real subprocess behavior to make your tests more independent.
pytest-xdist, # Run tests in parallel on multiple cores
pytest-timeout, # Add timeouts to your tests
webview-ui, webview-ui,
webview-lib,
fontconfig, fontconfig,
}: }:
let let
@@ -32,43 +20,29 @@ let
mimeTypes = [ "x-scheme-handler/clan" ]; mimeTypes = [ "x-scheme-handler/clan" ];
}; };
# Dependencies that are directly used in the project but nor from internal python packages
externalPythonDeps = [
pygobject3
pygobject-stubs
gtk4
libadwaita
webkitgtk_6_0
adwaita-icon-theme
];
# Deps including python packages from the local project
allPythonDeps = [ (python3.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps;
# Runtime binary dependencies required by the application # Runtime binary dependencies required by the application
runtimeDependencies = [ runtimeDependencies = [
]; ];
# Dependencies required for running tests # Dependencies required for running tests
externalTestDeps = pyTestDeps =
externalPythonDeps ps:
++ runtimeDependencies with ps;
++ [ [
pytest # Testing framework (python3Full.pkgs.toPythonModule pytest)
# Testing framework
pytest-cov # Generate coverage reports pytest-cov # Generate coverage reports
pytest-subprocess # fake the real subprocess behavior to make your tests more independent. pytest-subprocess # fake the real subprocess behavior to make your tests more independent.
pytest-xdist # Run tests in parallel on multiple cores pytest-xdist # Run tests in parallel on multiple cores
pytest-timeout # Add timeouts to your tests pytest-timeout # Add timeouts to your tests
]; ]
++ pytest.propagatedBuildInputs;
# Dependencies required for running tests clan-cli-module = [ (python3Full.pkgs.toPythonModule clan-cli) ];
testDependencies = runtimeDependencies ++ allPythonDeps ++ externalTestDeps;
# Setup Python environment with all dependencies for running tests
pythonWithTestDeps = python3.withPackages (_ps: testDependencies);
in in
python3.pkgs.buildPythonApplication rec { python3Full.pkgs.buildPythonApplication rec {
name = "clan-app"; name = "clan-app";
src = source; src = source;
format = "pyproject"; format = "pyproject";
@@ -76,10 +50,9 @@ python3.pkgs.buildPythonApplication rec {
dontWrapGApps = true; dontWrapGApps = true;
preFixup = '' preFixup = ''
makeWrapperArgs+=( makeWrapperArgs+=(
# Use software rendering for webkit, mesa causes random crashes with css.
--set WEBKIT_DISABLE_COMPOSITING_MODE 1
--set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf --set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf
--set WEBUI_PATH "$out/${python3.sitePackages}/clan_app/.webui" --set WEBUI_PATH "$out/${python3Full.sitePackages}/clan_app/.webui"
--set WEBVIEW_LIB_DIR "${webview-lib}/lib"
# This prevents problems with mixed glibc versions that might occur when the # This prevents problems with mixed glibc versions that might occur when the
# cli is called through a browser built against another glibc # cli is called through a browser built against another glibc
--unset LD_LIBRARY_PATH --unset LD_LIBRARY_PATH
@@ -91,30 +64,27 @@ python3.pkgs.buildPythonApplication rec {
nativeBuildInputs = [ nativeBuildInputs = [
setuptools setuptools
copyDesktopItems copyDesktopItems
wrapGAppsHook4 fontconfig
gobject-introspection
]; ];
# The necessity of setting buildInputs and propagatedBuildInputs to the # The necessity of setting buildInputs and propagatedBuildInputs to the
# same values for your Python package within Nix largely stems from ensuring # same values for your Python package within Nix largely stems from ensuring
# that all necessary dependencies are consistently available both # that all necessary dependencies are consistently available both
# at build time and runtime, # at build time and runtime,
buildInputs = allPythonDeps ++ runtimeDependencies; buildInputs = clan-cli-module ++ runtimeDependencies;
propagatedBuildInputs = propagatedBuildInputs = buildInputs;
allPythonDeps
++ runtimeDependencies
++ [
# TODO: see postFixup clan-cli/default.nix:L188
clan-cli.propagatedBuildInputs
];
# also re-expose dependencies so we test them in CI # also re-expose dependencies so we test them in CI
passthru = { passthru = {
tests = { tests = {
clan-app-pytest = clan-app-pytest =
runCommand "clan-app-pytest" { inherit buildInputs propagatedBuildInputs nativeBuildInputs; } runCommand "clan-app-pytest"
{
buildInputs = runtimeDependencies ++ [
(python3Full.withPackages (ps: clan-cli-module ++ (pyTestDeps ps)))
fontconfig
];
}
'' ''
cp -r ${source} ./src cp -r ${source} ./src
chmod +w -R ./src chmod +w -R ./src
@@ -133,8 +103,9 @@ python3.pkgs.buildPythonApplication rec {
fc-list fc-list
echo "STARTING ..." echo "STARTING ..."
export WEBVIEW_LIB_DIR="${webview-lib}/lib"
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1 export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests python3 -m pytest -s -m "not impure" ./tests
touch $out touch $out
''; '';
}; };
@@ -142,14 +113,11 @@ python3.pkgs.buildPythonApplication rec {
# Additional pass-through attributes # Additional pass-through attributes
passthru.desktop-file = desktop-file; passthru.desktop-file = desktop-file;
passthru.externalPythonDeps = externalPythonDeps; passthru.devshellDeps = ps: (pyTestDeps ps);
passthru.externalTestDeps = externalTestDeps;
passthru.runtimeDependencies = runtimeDependencies;
passthru.testDependencies = testDependencies;
postInstall = '' postInstall = ''
mkdir -p $out/${python3.sitePackages}/clan_app/.webui mkdir -p $out/${python3Full.sitePackages}/clan_app/.webui
cp -r ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* $out/${python3.sitePackages}/clan_app/.webui cp -r ${webview-ui}/lib/node_modules/@clan/webview-ui/dist/* $out/${python3Full.sitePackages}/clan_app/.webui
mkdir -p $out/share/icons/hicolor mkdir -p $out/share/icons/hicolor
cp -r ./clan_app/assets/white-favicons/* $out/share/icons/hicolor cp -r ./clan_app/assets/white-favicons/* $out/share/icons/hicolor
''; '';

View File

@@ -14,11 +14,11 @@
else else
{ {
devShells.clan-app = pkgs.callPackage ./shell.nix { devShells.clan-app = pkgs.callPackage ./shell.nix {
inherit (config.packages) clan-app; inherit (config.packages) clan-app webview-lib;
inherit self'; inherit self';
}; };
packages.clan-app = pkgs.python3.pkgs.callPackage ./default.nix { packages.clan-app = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (config.packages) clan-cli webview-ui; inherit (config.packages) clan-cli webview-ui webview-lib;
}; };
checks = config.packages.clan-app.tests; checks = config.packages.clan-app.tests;

View File

@@ -1,57 +1,34 @@
{ {
lib,
glib,
gsettings-desktop-schemas, gsettings-desktop-schemas,
stdenv,
clan-app, clan-app,
mkShell, mkShell,
ruff, ruff,
desktop-file-utils,
xdg-utils,
mypy,
python3,
gtk4, gtk4,
libadwaita, webview-lib,
python3Full,
self', self',
}: }:
let
devshellTestDeps =
clan-app.externalTestDeps
++ (with python3.pkgs; [
rope
mypy
setuptools
wheel
pip
]);
in
mkShell { mkShell {
inherit (clan-app) nativeBuildInputs propagatedBuildInputs;
inputsFrom = [ self'.devShells.default ]; inputsFrom = [ self'.devShells.default ];
buildInputs = buildInputs = [
[ (python3Full.withPackages (
glib ps:
ruff with ps;
gtk4 [
gtk4.dev # has the demo called 'gtk4-widget-factory' ruff
libadwaita.devdoc # has the demo called 'adwaita-1-demo' mypy
] ]
++ devshellTestDeps ++ (clan-app.devshellDeps ps)
))
# Dependencies for testing for linux hosts ];
++ (lib.optionals stdenv.isLinux [
xdg-utils # install desktop files
desktop-file-utils # verify desktop files
]);
shellHook = '' shellHook = ''
export GIT_ROOT=$(git rev-parse --show-toplevel) export GIT_ROOT=$(git rev-parse --show-toplevel)
export PKG_ROOT=$GIT_ROOT/pkgs/clan-app export PKG_ROOT=$GIT_ROOT/pkgs/clan-app
export WEBKIT_DISABLE_COMPOSITING_MODE=1
# Add current package to PYTHONPATH # Add current package to PYTHONPATH
export PYTHONPATH="$PKG_ROOT''${PYTHONPATH:+:$PYTHONPATH:}" export PYTHONPATH="$PKG_ROOT''${PYTHONPATH:+:$PYTHONPATH:}"
@@ -63,5 +40,8 @@ mkShell {
export XDG_DATA_DIRS=${gtk4}/share/gsettings-schemas/gtk4-4.14.4:$XDG_DATA_DIRS export XDG_DATA_DIRS=${gtk4}/share/gsettings-schemas/gtk4-4.14.4:$XDG_DATA_DIRS
export XDG_DATA_DIRS=${gsettings-desktop-schemas}/share/gsettings-schemas/gsettings-desktop-schemas-46.0:$XDG_DATA_DIRS export XDG_DATA_DIRS=${gsettings-desktop-schemas}/share/gsettings-schemas/gsettings-desktop-schemas-46.0:$XDG_DATA_DIRS
export WEBVIEW_LIB_DIR=${webview-lib}/lib
# export WEBVIEW_LIB_DIR=$HOME/Projects/webview/build/core
''; '';
} }

View File

@@ -1,8 +1,5 @@
import time
from wayland import GtkProc from wayland import GtkProc
def test_open(app: GtkProc) -> None: def test_open(app: GtkProc) -> None:
time.sleep(0.5)
assert app.poll() is None assert app.poll() is None

View File

@@ -21,7 +21,11 @@ GtkProc = NewType("GtkProc", Popen)
@pytest.fixture @pytest.fixture
def app() -> Generator[GtkProc, None, None]: def app() -> Generator[GtkProc, None, None]:
rapp = Popen([sys.executable, "-m", "clan_app"], text=True) cmd = [sys.executable, "-m", "clan_app"]
print(f"Running: {cmd}")
rapp = Popen(
cmd, text=True, stdout=sys.stdout, stderr=sys.stderr, start_new_session=True
)
yield GtkProc(rapp) yield GtkProc(rapp)
# Cleanup: Terminate your application # Cleanup: Terminate your application
rapp.terminate() rapp.terminate()

View File

@@ -1,3 +1,4 @@
import logging
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from functools import wraps from functools import wraps
@@ -11,6 +12,8 @@ from typing import (
get_type_hints, get_type_hints,
) )
log = logging.getLogger(__name__)
from .serde import dataclass_to_dict, from_dict, sanitize_string from .serde import dataclass_to_dict, from_dict, sanitize_string
__all__ = ["dataclass_to_dict", "from_dict", "sanitize_string"] __all__ = ["dataclass_to_dict", "from_dict", "sanitize_string"]
@@ -52,7 +55,12 @@ def update_wrapper_signature(wrapper: Callable, wrapped: Callable) -> None:
# Add 'op_key' parameter # Add 'op_key' parameter
op_key_param = Parameter( op_key_param = Parameter(
"op_key", Parameter.KEYWORD_ONLY, default=None, annotation=str "op_key",
Parameter.KEYWORD_ONLY,
# we add a None default value so that typescript code gen drops the parameter
# FIXME: this is a hack, we should filter out op_key in the typescript code gen
default=None,
annotation=str,
) )
params.append(op_key_param) params.append(op_key_param)
@@ -107,6 +115,32 @@ API.register(open_file)
self.register(wrapper) self.register(wrapper)
return fn return fn
def overwrite_fn(self, fn: Callable[..., Any]) -> None:
fn_name = fn.__name__
if fn_name not in self._registry:
msg = f"Function '{fn_name}' is not registered as an API method"
raise ClanError(msg)
fn_signature = signature(fn)
abstract_signature = signature(self._registry[fn_name])
# Remove the default argument of op_key from abstract_signature
# FIXME: This is a hack to make the signature comparison work
# because the other hack above where default value of op_key is None in the wrapper
abstract_params = list(abstract_signature.parameters.values())
for i, param in enumerate(abstract_params):
if param.name == "op_key":
abstract_params[i] = param.replace(default=Parameter.empty)
break
abstract_signature = abstract_signature.replace(parameters=abstract_params)
if fn_signature != abstract_signature:
msg = f"Expected signature: {abstract_signature}\nActual signature: {fn_signature}"
raise ClanError(msg)
self._registry[fn_name] = fn
F = TypeVar("F", bound=Callable[..., Any]) F = TypeVar("F", bound=Callable[..., Any])
def register(self, fn: F) -> F: def register(self, fn: F) -> F:
@@ -125,6 +159,7 @@ API.register(open_file)
data: T = fn(*args, **kwargs) data: T = fn(*args, **kwargs)
return SuccessDataClass(status="success", data=data, op_key=op_key) return SuccessDataClass(status="success", data=data, op_key=op_key)
except ClanError as e: except ClanError as e:
log.exception(f"Error calling wrapped {fn.__name__}")
return ErrorDataClass( return ErrorDataClass(
op_key=op_key, op_key=op_key,
status="error", status="error",
@@ -137,6 +172,7 @@ API.register(open_file)
], ],
) )
except Exception as e: except Exception as e:
log.exception(f"Error calling wrapped {fn.__name__}")
return ErrorDataClass( return ErrorDataClass(
op_key=op_key, op_key=op_key,
status="error", status="error",

View File

@@ -302,7 +302,7 @@ def construct_dataclass(
field_value = data.get(data_field_name) field_value = data.get(data_field_name)
if field_value is None and ( if field_value is None and (
field.type is None or is_type_in_union(field.type, type(None)) field.type is None or is_type_in_union(field.type, type(None)) # type: ignore
): ):
field_values[field.name] = None field_values[field.name] = None
else: else:

View File

@@ -119,7 +119,9 @@ def type_to_dict(
f.type, str f.type, str
), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" ), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?"
properties[f.metadata.get("alias", f.name)] = type_to_dict( properties[f.metadata.get("alias", f.name)] = type_to_dict(
f.type, f"{scope} {t.__name__}.{f.name}", type_map f.type,
f"{scope} {t.__name__}.{f.name}", # type: ignore
type_map, # type: ignore
) )
required = set() required = set()

View File

@@ -393,10 +393,9 @@ def run(
) )
if options.check and process.returncode != 0: if options.check and process.returncode != 0:
err = ClanCmdError(cmd_out) if is_async_cancelled():
err.msg = str(stderr_buf) cmd_out.msg = "Command cancelled"
err.description = "Command has been cancelled" raise ClanCmdError(cmd_out)
raise err
return cmd_out return cmd_out

View File

@@ -10,6 +10,6 @@ class ClanJSONEncoder(json.JSONEncoder):
return o.to_json() return o.to_json()
# Check if the object is a dataclass # Check if the object is a dataclass
if dataclasses.is_dataclass(o): if dataclasses.is_dataclass(o):
return dataclasses.asdict(o) # type: ignore[call-overload] return dataclasses.asdict(o) # type: ignore
# Otherwise, use the default serialization # Otherwise, use the default serialization
return super().default(o) return super().default(o)

View File

@@ -33,6 +33,7 @@
editor = pkgs.callPackage ./editor/clan-edit-codium.nix { }; editor = pkgs.callPackage ./editor/clan-edit-codium.nix { };
classgen = pkgs.callPackage ./classgen { }; classgen = pkgs.callPackage ./classgen { };
zerotierone = pkgs.callPackage ./zerotierone { }; zerotierone = pkgs.callPackage ./zerotierone { };
webview-lib = pkgs.callPackage ./webview-lib { };
}; };
}; };
} }

View File

@@ -0,0 +1,38 @@
{ pkgs, ... }:
pkgs.stdenv.mkDerivation {
pname = "webview";
version = "nigthly";
src = pkgs.fetchFromGitHub {
owner = "webview";
repo = "webview";
rev = "83a4b4a5bbcb4b0ba2ca3ee226c2da1414719106";
sha256 = "sha256-5R8kllvP2EBuDANIl07fxv/EcbPpYgeav8Wfz7Kt13c=";
};
outputs = [
"out"
"dev"
];
# Dependencies used during the build process, if any
buildInputs =
with pkgs;
[
gnumake
cmake
pkg-config
]
++ pkgs.lib.optionals stdenv.isLinux [
webkitgtk_6_0
gtk4
];
meta = with pkgs.lib; {
description = "Tiny cross-platform webview library for C/C++. Uses WebKit (GTK/Cocoa) and Edge WebView2 (Windows)";
homepage = "https://github.com/webview/webview";
license = licenses.mit;
platforms = platforms.linux ++ platforms.darwin;
};
}

View File

@@ -43,133 +43,18 @@ export interface GtkResponse<T> {
op_key: string; op_key: string;
} }
declare global { export const callApi = async <K extends OperationNames>(
interface Window {
clan: ClanOperations;
webkit: {
messageHandlers: {
gtk: {
postMessage: (message: {
method: OperationNames;
data: OperationArgs<OperationNames>;
}) => void;
};
};
};
}
}
// Make sure window.webkit is defined although the type is not correctly filled yet.
window.clan = {} as ClanOperations;
const operations = schema.properties;
const operationNames = Object.keys(operations) as OperationNames[];
type ObserverRegistry = {
[K in OperationNames]: Record<
string,
(response: OperationResponse<K>) => void
>;
};
const registry: ObserverRegistry = operationNames.reduce(
(acc, opName) => ({
...acc,
[opName]: {},
}),
{} as ObserverRegistry,
);
function createFunctions<K extends OperationNames>(
operationName: K,
): {
dispatch: (args: OperationArgs<K>) => void;
receive: (fn: (response: OperationResponse<K>) => void, id: string) => void;
} {
window.clan[operationName] = (s: string) => {
const f = (response: OperationResponse<K>) => {
// Get the correct receiver function for the op_key
const receiver = registry[operationName][response.op_key];
if (receiver) {
receiver(response);
}
};
deserialize(f)(s);
};
return {
dispatch: (args: OperationArgs<K>) => {
// Send the data to the gtk app
window.webkit.messageHandlers.gtk.postMessage({
method: operationName,
data: args,
});
},
receive: (fn: (response: OperationResponse<K>) => void, id: string) => {
// @ts-expect-error: This should work although typescript doesn't let us write
registry[operationName][id] = fn;
},
};
}
type PyApi = {
[K in OperationNames]: {
dispatch: (args: OperationArgs<K>) => void;
receive: (fn: (response: OperationResponse<K>) => void, id: string) => void;
};
};
function download(filename: string, text: string) {
const element = document.createElement("a");
element.setAttribute(
"href",
"data:text/plain;charset=utf-8," + encodeURIComponent(text),
);
element.setAttribute("download", filename);
element.style.display = "none";
document.body.appendChild(element);
element.click();
document.body.removeChild(element);
}
export const callApi = <K extends OperationNames>(
method: K, method: K,
args: OperationArgs<K>, args: OperationArgs<K>,
) => { ): Promise<OperationResponse<K>> => {
return new Promise<OperationResponse<K>>((resolve) => { console.log("Calling API", method, args);
const id = nanoid(); const response = await (
pyApi[method].receive((response) => { window as unknown as Record<
console.log(method, "Received response: ", { response }); OperationNames,
resolve(response); (
}, id); args: OperationArgs<OperationNames>,
) => Promise<OperationResponse<OperationNames>>
pyApi[method].dispatch({ ...args, op_key: id }); >
}); )[method](args);
return response as OperationResponse<K>;
}; };
const deserialize =
<T>(fn: (response: T) => void) =>
(r: unknown) => {
try {
fn(r as T);
} catch (e) {
console.error("Error parsing JSON: ", e);
window.localStorage.setItem("error", JSON.stringify(r));
console.error(r);
console.error("See localStorage 'error'");
alert(`Error parsing JSON: ${e}`);
}
};
// Create the API object
const pyApi: PyApi = {} as PyApi;
operationNames.forEach((opName) => {
const name = opName as OperationNames;
// @ts-expect-error - TODO: Fix this. Typescript is not recognizing the receive function correctly
pyApi[name] = createFunctions(name);
});
export { pyApi };

View File

@@ -26,8 +26,6 @@ export const client = new QueryClient();
const root = document.getElementById("app"); const root = document.getElementById("app");
window.clan = window.clan || {};
if (import.meta.env.DEV && !(root instanceof HTMLElement)) { if (import.meta.env.DEV && !(root instanceof HTMLElement)) {
throw new Error( throw new Error(
"Root element not found. Did you forget to add it to your index.html? Or maybe the id attribute got misspelled?", "Root element not found. Did you forget to add it to your index.html? Or maybe the id attribute got misspelled?",

View File

@@ -1,5 +1,5 @@
import { type Component, createSignal, For, Show } from "solid-js"; import { type Component, createSignal, For, Show } from "solid-js";
import { OperationResponse, pyApi } from "@/src/api"; import { OperationResponse, callApi } from "@/src/api";
import { Button } from "@/src/components/button"; import { Button } from "@/src/components/button";
import Icon from "@/src/components/icon"; import Icon from "@/src/components/icon";
@@ -16,7 +16,7 @@ export const HostList: Component = () => {
<div class="tooltip tooltip-bottom" data-tip="Refresh install targets"> <div class="tooltip tooltip-bottom" data-tip="Refresh install targets">
<Button <Button
variant="light" variant="light"
onClick={() => pyApi.show_mdns.dispatch({})} onClick={() => callApi("show_mdns", {})}
startIcon={<Icon icon="Update" />} startIcon={<Icon icon="Update" />}
></Button> ></Button>
</div> </div>

View File

@@ -1,40 +1,6 @@
import { describe, expectTypeOf, it } from "vitest"; import { describe, it } from "vitest";
import { OperationNames, pyApi } from "@/src/api";
describe.concurrent("API types work properly", () => { describe.concurrent("API types work properly", () => {
// Test some basic types // Test some basic types
it("distinct success/error unions", async () => { it("distinct success/error unions", async () => {});
const k: OperationNames = "create_clan" as OperationNames; // Just a random key, since
expectTypeOf(pyApi[k].receive).toBeFunction();
expectTypeOf(pyApi[k].receive).parameter(0).toBeFunction();
// receive is a function that takes a function, which takes the response parameter
expectTypeOf(pyApi[k].receive)
.parameter(0)
.parameter(0)
.toMatchTypeOf<
{ status: "success"; data?: any } | { status: "error"; errors: any[] }
>();
});
it("Cannot access data of error response", async () => {
const k: OperationNames = "create_clan" as OperationNames; // Just a random key, since
expectTypeOf(pyApi[k].receive).toBeFunction();
expectTypeOf(pyApi[k].receive).parameter(0).toBeFunction();
expectTypeOf(pyApi[k].receive).parameter(0).parameter(0).toMatchTypeOf<
// @ts-expect-error: data is not defined in error responses
| { status: "success"; data?: any }
| { status: "error"; errors: any[]; data: any }
>();
});
it("Machine list receives a records of names and machine info.", async () => {
expectTypeOf(pyApi.list_inventory_machines.receive)
.parameter(0)
.parameter(0)
.toMatchTypeOf<
| { status: "success"; data: Record<string, object> }
| { status: "error"; errors: any }
>();
});
}); });