clan-app: simplified task function, moved them to a separate file
This commit is contained in:
@@ -8,7 +8,7 @@ from dataclasses import dataclass
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import clan_lib.machines.actions # noqa: F401
|
import clan_lib.machines.actions # noqa: F401
|
||||||
from clan_lib.api import API, ErrorDataClass, SuccessDataClass
|
from clan_lib.api import API, tasks
|
||||||
|
|
||||||
# TODO: We have to manually import python files to make the API.register be triggered.
|
# TODO: We have to manually import python files to make the API.register be triggered.
|
||||||
# We NEED to fix this, as this is super unintuitive and error-prone.
|
# We NEED to fix this, as this is super unintuitive and error-prone.
|
||||||
@@ -46,45 +46,16 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
|||||||
webview = Webview(debug=app_opts.debug)
|
webview = Webview(debug=app_opts.debug)
|
||||||
webview.title = "Clan App"
|
webview.title = "Clan App"
|
||||||
# This seems to call the gtk api correctly but and gtk also seems to our icon, but somehow the icon is not loaded.
|
# This seems to call the gtk api correctly but and gtk also seems to our icon, but somehow the icon is not loaded.
|
||||||
webview.icon = "clan-white"
|
|
||||||
|
|
||||||
|
# Init LogManager global in log_manager_api module
|
||||||
log_manager_api.LOG_MANAGER_INSTANCE = LogManager(
|
log_manager_api.LOG_MANAGER_INSTANCE = LogManager(
|
||||||
base_dir=user_data_dir() / "clan-app" / "logs"
|
base_dir=user_data_dir() / "clan-app" / "logs"
|
||||||
)
|
)
|
||||||
|
|
||||||
def cancel_task(
|
# Init BAKEND_THREADS in tasks module
|
||||||
task_id: str, *, op_key: str
|
tasks.BAKEND_THREADS = webview.threads
|
||||||
) -> SuccessDataClass[None] | ErrorDataClass:
|
|
||||||
"""Cancel a task by its op_key."""
|
|
||||||
log.debug(f"Cancelling task with op_key: {task_id}")
|
|
||||||
future = webview.threads.get(task_id)
|
|
||||||
if future:
|
|
||||||
future.stop_event.set()
|
|
||||||
log.debug(f"Task {task_id} cancelled.")
|
|
||||||
else:
|
|
||||||
log.warning(f"Task {task_id} not found.")
|
|
||||||
return SuccessDataClass(
|
|
||||||
op_key=op_key,
|
|
||||||
data=None,
|
|
||||||
status="success",
|
|
||||||
)
|
|
||||||
|
|
||||||
def list_tasks(
|
|
||||||
*,
|
|
||||||
op_key: str,
|
|
||||||
) -> SuccessDataClass[list[str]] | ErrorDataClass:
|
|
||||||
"""List all tasks."""
|
|
||||||
log.debug("Listing all tasks.")
|
|
||||||
tasks = list(webview.threads.keys())
|
|
||||||
return SuccessDataClass(
|
|
||||||
op_key=op_key,
|
|
||||||
data=tasks,
|
|
||||||
status="success",
|
|
||||||
)
|
|
||||||
|
|
||||||
API.overwrite_fn(list_tasks)
|
|
||||||
API.overwrite_fn(open_file)
|
API.overwrite_fn(open_file)
|
||||||
API.overwrite_fn(cancel_task)
|
|
||||||
webview.bind_jsonschema_api(API, log_manager=log_manager_api.LOG_MANAGER_INSTANCE)
|
webview.bind_jsonschema_api(API, log_manager=log_manager_api.LOG_MANAGER_INSTANCE)
|
||||||
webview.size = Size(1280, 1024, SizeHint.NONE)
|
webview.size = Size(1280, 1024, SizeHint.NONE)
|
||||||
webview.navigate(content_uri)
|
webview.navigate(content_uri)
|
||||||
|
|||||||
@@ -1,15 +1,36 @@
|
|||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
from clan_lib.api import API
|
from clan_lib.api import API
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class WebThread:
|
||||||
|
thread: threading.Thread
|
||||||
|
stop_event: threading.Event
|
||||||
|
|
||||||
|
|
||||||
|
BAKEND_THREADS: dict[str, WebThread] | None = None
|
||||||
|
|
||||||
|
|
||||||
@API.register_abstract
|
@API.register_abstract
|
||||||
def cancel_task(task_id: str) -> None:
|
def cancel_task(task_id: str) -> None:
|
||||||
"""Cancel a task by its op_key."""
|
"""Cancel a task by its op_key."""
|
||||||
msg = "cancel_task() is not implemented"
|
assert BAKEND_THREADS is not None, "Backend threads not initialized"
|
||||||
raise NotImplementedError(msg)
|
future = BAKEND_THREADS.get(task_id)
|
||||||
|
if future:
|
||||||
|
future.stop_event.set()
|
||||||
|
log.debug(f"Task with id {task_id} has been cancelled.")
|
||||||
|
else:
|
||||||
|
msg = f"Task with id {task_id} not found."
|
||||||
|
raise ValueError(msg)
|
||||||
|
|
||||||
|
|
||||||
@API.register_abstract
|
@API.register
|
||||||
def list_tasks() -> list[str]:
|
def list_tasks() -> list[str]:
|
||||||
"""List all tasks."""
|
"""List all tasks."""
|
||||||
msg = "list_tasks() is not implemented"
|
assert BAKEND_THREADS is not None, "Backend threads not initialized"
|
||||||
raise NotImplementedError(msg)
|
return list(BAKEND_THREADS.keys())
|
||||||
|
|||||||
Reference in New Issue
Block a user