From 90a32dba0f1400a6def3aa99f68a39b279a0a1ab Mon Sep 17 00:00:00 2001 From: Qubasa Date: Thu, 10 Jul 2025 15:07:04 +0700 Subject: [PATCH 1/3] clan-app: Working swagger requests --- pkgs/clan-app/clan_app/deps/http/swagger.html | 85 ++++++++++++++++++- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/pkgs/clan-app/clan_app/deps/http/swagger.html b/pkgs/clan-app/clan_app/deps/http/swagger.html index 855b2729c..9a1106cb8 100644 --- a/pkgs/clan-app/clan_app/deps/http/swagger.html +++ b/pkgs/clan-app/clan_app/deps/http/swagger.html @@ -2,7 +2,8 @@ - Swagger UI + Swagger UI with Interceptors + - + - + \ No newline at end of file From 70eb67ffd28bab32b648d367d83b58ed5725386f Mon Sep 17 00:00:00 2001 From: Qubasa Date: Thu, 10 Jul 2025 16:06:47 +0700 Subject: [PATCH 2/3] stash --- pkgs/clan-app/clan_app/deps/http/swagger.html | 32 ++++--- .../clan_app/deps/http/test_http_api.py | 94 ++++++++++++++++++- 2 files changed, 114 insertions(+), 12 deletions(-) diff --git a/pkgs/clan-app/clan_app/deps/http/swagger.html b/pkgs/clan-app/clan_app/deps/http/swagger.html index 9a1106cb8..5b52cc7e7 100644 --- a/pkgs/clan-app/clan_app/deps/http/swagger.html +++ b/pkgs/clan-app/clan_app/deps/http/swagger.html @@ -46,7 +46,7 @@ */ requestInterceptor: (request) => { console.log("Intercepting request:", request); - + // Only modify requests that have a body (like POST, PUT) if (request.body) { try { @@ -56,19 +56,24 @@ // Create the new, nested structure. const newBody = { body: originalBody, - header: {} // Add an empty header object as per your example + header: {}, // Add an empty header object as per your example }; // Replace the original body with the new, stringified, nested structure. request.body = JSON.stringify(newBody); - + // Update the 'Content-Length' header to match the new body size. - request.headers['Content-Length'] = new Blob([request.body]).size; + request.headers["Content-Length"] = new Blob([ + request.body, + ]).size; console.log("Modified request body:", request.body); } catch (e) { // If the user's input isn't valid JSON, don't modify the request. - console.error("Request Interceptor: Could not parse body as JSON.", e); + console.error( + "Request Interceptor: Could not parse body as JSON.", + e, + ); } } return request; // Always return the request object @@ -90,9 +95,11 @@ const fullResponse = JSON.parse(response.data); // Check if the expected 'body' property exists. - if (fullResponse && typeof fullResponse.body !== 'undefined') { - console.log("Found nested 'body' property. Un-nesting for display."); - + if (fullResponse && typeof fullResponse.body !== "undefined") { + console.log( + "Found nested 'body' property. Un-nesting for display.", + ); + // Replace the response's data with JUST the nested 'body' object. // We stringify it with pretty-printing (2-space indentation) for readability in the UI. response.data = JSON.stringify(fullResponse.body, null, 2); @@ -101,15 +108,18 @@ } catch (e) { // If the response isn't the expected JSON structure, do nothing. // This prevents errors on other endpoints that have a normal response. - console.error("Response Interceptor: Could not parse response or un-nest data.", e); + console.error( + "Response Interceptor: Could not parse response or un-nest data.", + e, + ); } } return response; // Always return the response object - } + }, // --- INTERCEPTORS END HERE --- }); }; - \ No newline at end of file + diff --git a/pkgs/clan-app/clan_app/deps/http/test_http_api.py b/pkgs/clan-app/clan_app/deps/http/test_http_api.py index 753f6afcb..65b419099 100644 --- a/pkgs/clan-app/clan_app/deps/http/test_http_api.py +++ b/pkgs/clan-app/clan_app/deps/http/test_http_api.py @@ -1,13 +1,16 @@ """Tests for HTTP API components.""" import json +import logging import time from unittest.mock import Mock from urllib.request import Request, urlopen - +import threading import pytest from clan_lib.api import MethodRegistry +from clan_lib.async_run import is_async_cancelled from clan_lib.log_manager import LogManager +from clan_lib.api import tasks from clan_app.api.middleware import ( ArgumentParsingMiddleware, @@ -16,6 +19,8 @@ from clan_app.api.middleware import ( ) from clan_app.deps.http.http_server import HttpApiServer +log = logging.getLogger(__name__) + @pytest.fixture def mock_api() -> MethodRegistry: @@ -31,6 +36,21 @@ def mock_api() -> MethodRegistry: msg = "Test error" raise ValueError(msg) + @api.register + def run_task_blocking(wtime: int) -> str: + """A long blocking task that simulates a long-running operation.""" + time.sleep(1) + + for i in range(wtime): + if is_async_cancelled(): + log.debug("Task was cancelled") + return "Task was cancelled" + log.debug( + f"Processing {i} for {wtime}" + ) + time.sleep(1) + return f"Task completed with wtime: {wtime}" + return api @@ -282,5 +302,77 @@ class TestIntegration: server.stop() + def test_blocking_task( + self, mock_api: MethodRegistry, mock_log_manager: Mock + ) -> None: + + shared_threads: dict[str, tasks.WebThread] = {} + tasks.BAKEND_THREADS = shared_threads + + """Test a long-running blocking task.""" + server: HttpApiServer = HttpApiServer( + api=mock_api, + host="127.0.0.1", + port=8083, + shared_threads=shared_threads, + ) + + # Add middleware + server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) + server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) + server.add_middleware(MethodExecutionMiddleware(api=mock_api)) + + # Start server + server.start() + time.sleep(0.1) # Give server time to start + + + sucess = threading.Event() + def parallel_task() -> None: + + time.sleep(1) + request_data: dict = { + "body": {"message": "Integration"}, + } + req: Request = Request( + "http://127.0.0.1:8083/api/v1/test_method", + data=json.dumps(request_data).encode(), + headers={"Content-Type": "application/json"}, + ) + response = urlopen(req) + data: dict = json.loads(response.read().decode()) + + assert "body" in data + assert "header" in data + assert data["body"]["status"] == "success" + assert data["body"]["data"] == {"response": "Hello Integration!"} + sucess.set() + + thread = threading.Thread( + target=parallel_task, + name="ParallelTaskThread", + daemon=True, + ) + thread.start() + + # Make API call + request_data: dict = { + "body": {"wtime": 3}, + } + req: Request = Request( + "http://127.0.0.1:8083/api/v1/run_task_blocking", + data=json.dumps(request_data).encode(), + headers={"Content-Type": "application/json"}, + ) + response = urlopen(req) + data: dict = json.loads(response.read().decode()) + + # thread.join() + assert "body" in data + assert data["body"]["status"] == "success" + assert data["body"]["data"] == "Task completed with wtime: 3" + assert sucess.is_set(), "Parallel task did not complete successfully" + + if __name__ == "__main__": pytest.main([__file__, "-v"]) From 695141b2ae9506f5e29218a84a98d127c3f53b07 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Thu, 10 Jul 2025 17:21:18 +0700 Subject: [PATCH 3/3] clan-app: Make http server non blocking, add tests for the http server and for cancelling tasks --- pkgs/clan-app/clan_app/api/api_bridge.py | 1 + .../clan_app/deps/http/http_server.py | 8 +-- .../clan_app/deps/http/test_http_api.py | 66 +++++++++---------- pkgs/clan-app/pyproject.toml | 2 +- pkgs/clan-app/shell.nix | 1 + pkgs/clan-cli/clan_lib/api/tasks.py | 1 + 6 files changed, 38 insertions(+), 41 deletions(-) diff --git a/pkgs/clan-app/clan_app/api/api_bridge.py b/pkgs/clan-app/clan_app/api/api_bridge.py index 04cc792ba..cdbdee676 100644 --- a/pkgs/clan-app/clan_app/api/api_bridge.py +++ b/pkgs/clan-app/clan_app/api/api_bridge.py @@ -126,6 +126,7 @@ class ApiBridge(ABC): target=thread_task, args=(stop_event,), name=thread_name ) thread.start() + self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event) if wait_for_completion: diff --git a/pkgs/clan-app/clan_app/deps/http/http_server.py b/pkgs/clan-app/clan_app/deps/http/http_server.py index b72b29287..764e79789 100644 --- a/pkgs/clan-app/clan_app/deps/http/http_server.py +++ b/pkgs/clan-app/clan_app/deps/http/http_server.py @@ -1,6 +1,6 @@ import logging import threading -from http.server import HTTPServer +from http.server import HTTPServer, ThreadingHTTPServer from pathlib import Path from typing import TYPE_CHECKING, Any @@ -36,7 +36,7 @@ class HttpApiServer: self._server_thread: threading.Thread | None = None # Bridge is now the request handler itself, no separate instance needed self._middleware: list[Middleware] = [] - self.shared_threads = shared_threads or {} + self.shared_threads = shared_threads if shared_threads is not None else {} def add_middleware(self, middleware: "Middleware") -> None: """Add middleware to the middleware chain.""" @@ -84,9 +84,9 @@ class HttpApiServer: log.warning("HTTP server is already running") return - # Create the server + # Create the server using ThreadingHTTPServer for concurrent request handling handler_class = self._create_request_handler() - self._server = HTTPServer((self.host, self.port), handler_class) + self._server = ThreadingHTTPServer((self.host, self.port), handler_class) def run_server() -> None: if self._server: diff --git a/pkgs/clan-app/clan_app/deps/http/test_http_api.py b/pkgs/clan-app/clan_app/deps/http/test_http_api.py index 65b419099..75c5c9742 100644 --- a/pkgs/clan-app/clan_app/deps/http/test_http_api.py +++ b/pkgs/clan-app/clan_app/deps/http/test_http_api.py @@ -2,19 +2,18 @@ import json import logging +import threading import time from unittest.mock import Mock from urllib.request import Request, urlopen -import threading + import pytest -from clan_lib.api import MethodRegistry +from clan_lib.api import MethodRegistry, tasks from clan_lib.async_run import is_async_cancelled from clan_lib.log_manager import LogManager -from clan_lib.api import tasks from clan_app.api.middleware import ( ArgumentParsingMiddleware, - LoggingMiddleware, MethodExecutionMiddleware, ) from clan_app.deps.http.http_server import HttpApiServer @@ -27,6 +26,8 @@ def mock_api() -> MethodRegistry: """Create a mock API with test methods.""" api = MethodRegistry() + api.register(tasks.delete_task) + @api.register def test_method(message: str) -> dict[str, str]: return {"response": f"Hello {message}!"} @@ -45,9 +46,7 @@ def mock_api() -> MethodRegistry: if is_async_cancelled(): log.debug("Task was cancelled") return "Task was cancelled" - log.debug( - f"Processing {i} for {wtime}" - ) + log.debug(f"Processing {i} for {wtime}") time.sleep(1) return f"Task completed with wtime: {wtime}" @@ -70,7 +69,7 @@ def http_bridge( """Create HTTP bridge dependencies for testing.""" middleware_chain = ( ArgumentParsingMiddleware(api=mock_api), - LoggingMiddleware(log_manager=mock_log_manager), + # LoggingMiddleware(log_manager=mock_log_manager), MethodExecutionMiddleware(api=mock_api), ) return mock_api, middleware_chain @@ -87,7 +86,7 @@ def http_server(mock_api: MethodRegistry, mock_log_manager: Mock) -> HttpApiServ # Add middleware server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) - server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) + # server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) server.add_middleware(MethodExecutionMiddleware(api=mock_api)) # Bridge will be created automatically when accessed @@ -104,7 +103,7 @@ class TestHttpBridge: # We'll test initialization through the server api, middleware_chain = http_bridge assert api is not None - assert len(middleware_chain) == 3 + assert len(middleware_chain) == 2 def test_http_bridge_middleware_setup(self, http_bridge: tuple) -> None: """Test that middleware is properly set up.""" @@ -112,10 +111,10 @@ class TestHttpBridge: # Test that we can create the bridge with middleware # The actual HTTP handling will be tested through the server integration tests - assert len(middleware_chain) == 3 + assert len(middleware_chain) == 2 assert isinstance(middleware_chain[0], ArgumentParsingMiddleware) - assert isinstance(middleware_chain[1], LoggingMiddleware) - assert isinstance(middleware_chain[2], MethodExecutionMiddleware) + # assert isinstance(middleware_chain[1], LoggingMiddleware) + assert isinstance(middleware_chain[1], MethodExecutionMiddleware) class TestHttpApiServer: @@ -268,7 +267,7 @@ class TestIntegration: # Add middleware server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) - server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) + # server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) server.add_middleware(MethodExecutionMiddleware(api=mock_api)) # Bridge will be created automatically when accessed @@ -301,52 +300,49 @@ class TestIntegration: # Always stop server server.stop() - def test_blocking_task( self, mock_api: MethodRegistry, mock_log_manager: Mock ) -> None: - shared_threads: dict[str, tasks.WebThread] = {} tasks.BAKEND_THREADS = shared_threads """Test a long-running blocking task.""" server: HttpApiServer = HttpApiServer( - api=mock_api, - host="127.0.0.1", - port=8083, - shared_threads=shared_threads, - ) + api=mock_api, + host="127.0.0.1", + port=8083, + shared_threads=shared_threads, + ) # Add middleware server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) - server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) + # server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager)) server.add_middleware(MethodExecutionMiddleware(api=mock_api)) # Start server server.start() time.sleep(0.1) # Give server time to start + blocking_op_key = "b37f920f-ce8c-4c8d-b595-28ca983d265e" # str(uuid.uuid4()) - sucess = threading.Event() def parallel_task() -> None: - - time.sleep(1) + # Make API call request_data: dict = { - "body": {"message": "Integration"}, + "body": {"wtime": 60}, + "header": {"op_key": blocking_op_key}, } req: Request = Request( - "http://127.0.0.1:8083/api/v1/test_method", + "http://127.0.0.1:8083/api/v1/run_task_blocking", data=json.dumps(request_data).encode(), headers={"Content-Type": "application/json"}, ) response = urlopen(req) data: dict = json.loads(response.read().decode()) + # thread.join() assert "body" in data - assert "header" in data assert data["body"]["status"] == "success" - assert data["body"]["data"] == {"response": "Hello Integration!"} - sucess.set() + assert data["body"]["data"] == "Task was cancelled" thread = threading.Thread( target=parallel_task, @@ -355,23 +351,21 @@ class TestIntegration: ) thread.start() - # Make API call + time.sleep(1) request_data: dict = { - "body": {"wtime": 3}, + "body": {"task_id": blocking_op_key}, } req: Request = Request( - "http://127.0.0.1:8083/api/v1/run_task_blocking", + "http://127.0.0.1:8083/api/v1/delete_task", data=json.dumps(request_data).encode(), headers={"Content-Type": "application/json"}, ) response = urlopen(req) data: dict = json.loads(response.read().decode()) - # thread.join() assert "body" in data + assert "header" in data assert data["body"]["status"] == "success" - assert data["body"]["data"] == "Task completed with wtime: 3" - assert sucess.is_set(), "Parallel task did not complete successfully" if __name__ == "__main__": diff --git a/pkgs/clan-app/pyproject.toml b/pkgs/clan-app/pyproject.toml index a0034fb6e..603720f19 100644 --- a/pkgs/clan-app/pyproject.toml +++ b/pkgs/clan-app/pyproject.toml @@ -21,7 +21,7 @@ exclude = ["result", "**/__pycache__"] clan_app = ["**/assets/*"] [tool.pytest.ini_options] -testpaths = "tests" +testpaths = [ "tests", "clan_app" ] faulthandler_timeout = 60 log_level = "DEBUG" log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s" diff --git a/pkgs/clan-app/shell.nix b/pkgs/clan-app/shell.nix index ad4f1549e..9235e9432 100644 --- a/pkgs/clan-app/shell.nix +++ b/pkgs/clan-app/shell.nix @@ -58,6 +58,7 @@ mkShell { with ps; [ mypy + pytest-cov ] ++ (clan-app.devshellPyDeps ps) )) diff --git a/pkgs/clan-cli/clan_lib/api/tasks.py b/pkgs/clan-cli/clan_lib/api/tasks.py index fbe89c14e..4d173ecf2 100644 --- a/pkgs/clan-cli/clan_lib/api/tasks.py +++ b/pkgs/clan-cli/clan_lib/api/tasks.py @@ -23,6 +23,7 @@ def delete_task(task_id: str) -> None: """Cancel a task by its op_key.""" assert BAKEND_THREADS is not None, "Backend threads not initialized" future = BAKEND_THREADS.get(task_id) + log.debug(f"Thread ID: {threading.get_ident()}") if future: future.stop_event.set()