clan-app: Working cancellable tasks

This commit is contained in:
Qubasa
2025-05-09 18:36:04 +02:00
parent 7eac2ce436
commit 5b2a4cc696
4 changed files with 81 additions and 28 deletions

View File

@@ -1,17 +0,0 @@
import logging
from clan_lib.api import ErrorDataClass, SuccessDataClass
log = logging.getLogger(__name__)
def cancel_task(
task_id: str, *, op_key: str
) -> SuccessDataClass[None] | ErrorDataClass:
"""Cancel a task by its op_key."""
log.info(f"Cancelling task with op_key: {task_id}")
return SuccessDataClass(
op_key=op_key,
data=None,
status="success",
)

View File

@@ -3,16 +3,13 @@ import logging
from clan_cli.profiler import profile
log = logging.getLogger(__name__)
import os
from dataclasses import dataclass
from pathlib import Path
from clan_cli.custom_logger import setup_logging
from clan_lib.api import API
from clan_lib.api import API, ErrorDataClass, SuccessDataClass
from clan_app.api.cancel import cancel_task
from clan_app.api.file_gtk import open_file
from clan_app.deps.webview.webview import Size, SizeHint, Webview
@@ -42,8 +39,25 @@ def app_run(app_opts: ClanAppOptions) -> int:
webview = Webview(debug=app_opts.debug)
def cancel_task(
task_id: str, *, op_key: str
) -> SuccessDataClass[None] | ErrorDataClass:
"""Cancel a task by its op_key."""
log.info(f"Cancelling task with op_key: {task_id}")
with webview.lock:
if task_id in webview.threads:
future = webview.threads[task_id]
future.stop_event.set()
log.info(f"Task {task_id} cancelled.")
else:
log.warning(f"Task {task_id} not found.")
return SuccessDataClass(
op_key=op_key,
data=None,
status="success",
)
API.overwrite_fn(open_file)
# breakpoint()
API.overwrite_fn(cancel_task)
webview.bind_jsonschema_api(API)
webview.size = Size(1280, 1024, SizeHint.NONE)

View File

@@ -2,10 +2,13 @@ import ctypes
import json
import logging
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass
from enum import IntEnum
from typing import Any
from clan_cli.async_run import set_should_cancel
from clan_lib.api import (
ApiError,
ErrorDataClass,
@@ -38,16 +41,29 @@ class Size:
self.hint = hint
@dataclass
class WebThread:
thread: threading.Thread
stop_event: threading.Event
class Webview:
def __init__(
self, debug: bool = False, size: Size | None = None, window: int | None = None
) -> None:
self._handle = _webview_lib.webview_create(int(debug), window)
self._callbacks: dict[str, Callable[..., Any]] = {}
self.threads: dict[str, WebThread] = {}
self.stopped_threads: set[str] = set()
self.lock = threading.Lock()
self.stop_garbage_collection: threading.Event = threading.Event()
if size:
self.size = size
def __enter__(self) -> "Webview":
return self
@property
def size(self) -> Size:
return self._size
@@ -78,8 +94,28 @@ class Webview:
def navigate(self, url: str) -> None:
_webview_lib.webview_navigate(self._handle, _encode_c_string(url))
def collect_garbage(self) -> None:
while not self.stop_garbage_collection.is_set():
with self.lock:
for op_key in list(self.threads.keys()):
if op_key in self.stopped_threads:
log.debug(f"Collecting garbage op_key: {op_key}")
del self.threads[op_key]
self.stopped_threads.remove(op_key)
if self.stop_garbage_collection.is_set():
break
time.sleep(1)
def run(self) -> None:
thread = threading.Thread(
target=self.collect_garbage, name="WebviewGarbageCollector"
)
thread.start()
_webview_lib.webview_run(self._handle)
self.stop_garbage_collection.set()
log.info("Shutting down webview...")
if self.lock.locked():
self.lock.release()
self.destroy()
def bind_jsonschema_api(self, api: MethodRegistry) -> None:
@@ -92,7 +128,7 @@ class Webview:
wrap_method: Callable[..., Any] = method,
method_name: str = name,
) -> None:
def thread_task() -> None:
def thread_task(stop_event: threading.Event) -> None:
try:
args = json.loads(req.decode())
@@ -110,10 +146,12 @@ class Webview:
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
reconciled_arguments["op_key"] = seq.decode()
op_key = seq.decode()
reconciled_arguments["op_key"] = op_key
# TODO: We could remove the wrapper in the MethodRegistry
# and just call the method directly
set_should_cancel(lambda: stop_event.is_set())
result = wrap_method(**reconciled_arguments)
serialized = json.dumps(
@@ -139,9 +177,18 @@ class Webview:
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(seq.decode(), FuncStatus.FAILURE, serialized)
finally:
self.stopped_threads.add(seq.decode())
thread = threading.Thread(target=thread_task)
stop_event = threading.Event()
thread = threading.Thread(
target=thread_task, args=(stop_event,), name="WebviewThread"
)
thread.start()
with self.lock:
self.threads[seq.decode()] = WebThread(
thread=thread, stop_event=stop_event
)
c_callback = _webview_lib.CFUNCTYPE(
None, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p