clan-app: simplified task function, moved them to a separate file

This commit is contained in:
Qubasa
2025-07-02 16:18:37 +07:00
parent 1ec67ecfaf
commit 5d99d0e1e7
2 changed files with 30 additions and 38 deletions

View File

@@ -1,15 +1,36 @@
import logging
import threading
from dataclasses import dataclass
from clan_lib.api import API
log = logging.getLogger(__name__)
@dataclass
class WebThread:
thread: threading.Thread
stop_event: threading.Event
BAKEND_THREADS: dict[str, WebThread] | None = None
@API.register_abstract
def cancel_task(task_id: str) -> None:
"""Cancel a task by its op_key."""
msg = "cancel_task() is not implemented"
raise NotImplementedError(msg)
assert BAKEND_THREADS is not None, "Backend threads not initialized"
future = BAKEND_THREADS.get(task_id)
if future:
future.stop_event.set()
log.debug(f"Task with id {task_id} has been cancelled.")
else:
msg = f"Task with id {task_id} not found."
raise ValueError(msg)
@API.register_abstract
@API.register
def list_tasks() -> list[str]:
"""List all tasks."""
msg = "list_tasks() is not implemented"
raise NotImplementedError(msg)
assert BAKEND_THREADS is not None, "Backend threads not initialized"
return list(BAKEND_THREADS.keys())