clan-app: init clan http api

clan-app: nix fmt
This commit is contained in:
Qubasa
2025-07-09 13:32:28 +07:00
parent 28d5294292
commit 1c269d1eaa
6 changed files with 699 additions and 13 deletions

View File

@@ -16,9 +16,32 @@ def main(argv: list[str] = sys.argv) -> int:
"--content-uri", type=str, help="The URI of the content to display"
)
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
parser.add_argument(
"--http-api",
action="store_true",
help="Enable HTTP API mode (default: False)",
)
parser.add_argument(
"--http-host",
type=str,
default="localhost",
help="The host for the HTTP API server (default: localhost)",
)
parser.add_argument(
"--http-port",
type=int,
default=8080,
help="The host and port for the HTTP API server (default: 8080)",
)
args = parser.parse_args(argv[1:])
app_opts = ClanAppOptions(content_uri=args.content_uri, debug=args.debug)
app_opts = ClanAppOptions(
content_uri=args.content_uri,
http_api=args.http_api,
http_host=args.http_host,
http_port=args.http_port,
debug=args.debug,
)
try:
app_run(app_opts)
except KeyboardInterrupt:

View File

@@ -25,6 +25,9 @@ log = logging.getLogger(__name__)
class ClanAppOptions:
content_uri: str
debug: bool
http_api: bool = False
http_host: str = "127.0.0.1"
http_port: int = 8080
@profile
@@ -55,19 +58,58 @@ def app_run(app_opts: ClanAppOptions) -> int:
load_in_all_api_functions()
API.overwrite_fn(open_file)
webview = Webview(
debug=app_opts.debug, title="Clan App", size=Size(1280, 1024, SizeHint.NONE)
)
# Start HTTP API server if requested
http_server = None
if app_opts.http_api:
from clan_app.deps.http.http_server import HttpApiServer
# Add middleware to the webview
webview.add_middleware(ArgumentParsingMiddleware(api=API))
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
webview.add_middleware(MethodExecutionMiddleware(api=API))
http_server = HttpApiServer(
api=API,
log_manager=log_manager,
host=app_opts.http_host,
port=app_opts.http_port,
)
http_server.start()
# Init BAKEND_THREADS global in tasks module
tasks.BAKEND_THREADS = webview.threads
# Create webview if not running in HTTP-only mode
if not app_opts.http_api:
webview = Webview(
debug=app_opts.debug, title="Clan App", size=Size(1280, 1024, SizeHint.NONE)
)
# Add middleware to the webview
webview.add_middleware(ArgumentParsingMiddleware(api=API))
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
webview.add_middleware(MethodExecutionMiddleware(api=API))
# Create the bridge
webview.create_bridge()
# Init BAKEND_THREADS global in tasks module
tasks.BAKEND_THREADS = webview.threads
webview.bind_jsonschema_api(API, log_manager=log_manager)
webview.navigate(content_uri)
webview.run()
else:
# HTTP-only mode - keep the server running
log.info("HTTP API server running...")
log.info(
f"Available API methods at: http://{app_opts.http_host}:{app_opts.http_port}/api/methods"
)
log.info(
f"Example request: curl -X POST http://{app_opts.http_host}:{app_opts.http_port}/api/call/list_log_days"
)
log.info("Press Ctrl+C to stop the server")
try:
# Keep the main thread alive
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
log.info("Shutting down HTTP API server...")
if http_server:
http_server.stop()
webview.bind_jsonschema_api(API, log_manager=log_manager)
webview.navigate(content_uri)
webview.run()
return 0

View File

@@ -0,0 +1,85 @@
import json
import logging
import threading
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from clan_lib.api import dataclass_to_dict
from clan_lib.api.tasks import WebThread
from clan_lib.async_run import set_should_cancel
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
if TYPE_CHECKING:
from collections.abc import Callable
log = logging.getLogger(__name__)
@dataclass
class HttpBridge(ApiBridge):
"""HTTP-specific implementation of the API bridge."""
threads: dict[str, WebThread] = field(default_factory=dict)
response_handler: "Callable[[BackendResponse], None] | None" = None
def send_response(self, response: BackendResponse) -> None:
"""Send response back to the HTTP client."""
if self.response_handler:
self.response_handler(response)
else:
# Default behavior - just log the response
serialized = json.dumps(
dataclass_to_dict(response), indent=4, ensure_ascii=False
)
log.debug(f"HTTP response: {serialized}")
def handle_http_request(
self,
method_name: str,
request_data: dict[str, Any],
op_key: str,
) -> None:
"""Handle an HTTP API request."""
try:
# Parse the HTTP request format
header = request_data.get("header", {})
if not isinstance(header, dict):
msg = f"Expected header to be a dict, got {type(header)}"
raise TypeError(msg)
body = request_data.get("body", {})
if not isinstance(body, dict):
msg = f"Expected body to be a dict, got {type(body)}"
raise TypeError(msg)
# Create API request
api_request = BackendRequest(
method_name=method_name, args=body, header=header, op_key=op_key
)
except Exception as e:
self.send_error_response(op_key, str(e), ["http_bridge", method_name])
return
# Process in a separate thread
def thread_task(stop_event: threading.Event) -> None:
set_should_cancel(lambda: stop_event.is_set())
try:
self.process_request(api_request)
finally:
self.threads.pop(op_key, None)
stop_event = threading.Event()
thread = threading.Thread(
target=thread_task, args=(stop_event,), name="HttpThread"
)
thread.start()
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
def set_response_handler(
self, handler: "Callable[[BackendResponse], None]"
) -> None:
"""Set a custom response handler for HTTP responses."""
self.response_handler = handler

View File

@@ -0,0 +1,229 @@
import json
import logging
import threading
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from urllib.parse import urlparse
from clan_lib.api import MethodRegistry, dataclass_to_dict
from clan_lib.log_manager import LogManager
from clan_app.api.api_bridge import BackendResponse
from clan_app.api.middleware import (
ArgumentParsingMiddleware,
LoggingMiddleware,
MethodExecutionMiddleware,
)
from .http_bridge import HttpBridge
log = logging.getLogger(__name__)
class ClanAPIRequestHandler(BaseHTTPRequestHandler):
"""HTTP request handler for Clan API."""
def __init__(
self, *args: Any, api: MethodRegistry, bridge: HttpBridge, **kwargs: Any
) -> None:
self.api = api
self.bridge = bridge
super().__init__(*args, **kwargs)
def _send_json_response(self, data: dict[str, Any], status_code: int = 200) -> None:
"""Send a JSON response."""
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
response_data = json.dumps(data, indent=2, ensure_ascii=False)
self.wfile.write(response_data.encode("utf-8"))
def _send_error_response(self, message: str, status_code: int = 400) -> None:
"""Send an error response."""
self._send_json_response(
{
"success": False,
"error": message,
"status_code": status_code,
},
status_code=status_code,
)
def do_OPTIONS(self) -> None: # noqa: N802
"""Handle CORS preflight requests."""
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
def do_GET(self) -> None: # noqa: N802
"""Handle GET requests."""
parsed_url = urlparse(self.path)
path = parsed_url.path
if path == "/":
self._send_json_response(
{
"message": "Clan API Server",
"version": "1.0.0",
}
)
elif path == "/api/methods":
self._send_json_response({"methods": list(self.api.functions.keys())})
else:
self._send_error_response("Not Found", 404)
def do_POST(self) -> None: # noqa: N802
"""Handle POST requests."""
parsed_url = urlparse(self.path)
path = parsed_url.path
# Check if this is an API call
if not path.startswith("/api/call/"):
self._send_error_response("Not Found", 404)
return
# Extract method name from path
method_name = path[len("/api/call/") :]
if not method_name:
self._send_error_response("Method name required", 400)
return
if method_name not in self.api.functions:
self._send_error_response(f"Method '{method_name}' not found", 404)
return
# Read request body
try:
content_length = int(self.headers.get("Content-Length", 0))
if content_length > 0:
body = self.rfile.read(content_length)
request_data = json.loads(body.decode("utf-8"))
else:
request_data = {}
except json.JSONDecodeError:
self._send_error_response("Invalid JSON in request body", 400)
return
except Exception as e:
self._send_error_response(f"Error reading request: {e!s}", 400)
return
# Generate a unique operation key
op_key = str(uuid.uuid4())
# Store the response for this request
response_data: dict[str, Any] = {}
response_event = threading.Event()
def response_handler(response: BackendResponse) -> None:
response_data["response"] = response
response_event.set()
# Set the response handler
self.bridge.set_response_handler(response_handler)
# Process the request
self.bridge.handle_http_request(method_name, request_data, op_key)
# Wait for the response (with timeout)
if not response_event.wait(timeout=60): # 60 second timeout
self._send_error_response("Request timeout", 408)
return
# Get the response
response = response_data["response"]
# Convert to JSON-serializable format
if response.body is not None:
response_dict = dataclass_to_dict(response)
else:
response_dict = None
self._send_json_response(
response_dict if response_dict is not None else {},
)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002
"""Override default logging to use our logger."""
log.info(f"{self.address_string()} - {format % args}")
class HttpApiServer:
"""HTTP server for the Clan API using Python's built-in HTTP server."""
def __init__(
self,
api: MethodRegistry,
log_manager: LogManager,
host: str = "127.0.0.1",
port: int = 8080,
) -> None:
self.api = api
self.log_manager = log_manager
self.host = host
self.port = port
self.server: HTTPServer | None = None
self.server_thread: threading.Thread | None = None
self.bridge = self._create_bridge()
def _create_bridge(self) -> HttpBridge:
"""Create HTTP bridge with middleware."""
return HttpBridge(
middleware_chain=(
ArgumentParsingMiddleware(api=self.api),
LoggingMiddleware(log_manager=self.log_manager),
MethodExecutionMiddleware(api=self.api),
)
)
def _create_request_handler(self) -> type[ClanAPIRequestHandler]:
"""Create a request handler class with injected dependencies."""
api = self.api
bridge = self.bridge
class RequestHandler(ClanAPIRequestHandler):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, api=api, bridge=bridge, **kwargs)
return RequestHandler
def start(self) -> None:
"""Start the HTTP server in a separate thread."""
if self.server_thread is not None:
log.warning("HTTP server is already running")
return
# Create the server
handler_class = self._create_request_handler()
self.server = HTTPServer((self.host, self.port), handler_class)
def run_server() -> None:
if self.server:
self.server.serve_forever()
self.server_thread = threading.Thread(target=run_server, daemon=True)
self.server_thread.start()
def stop(self) -> None:
"""Stop the HTTP server."""
if self.server:
self.server.shutdown()
self.server.server_close()
self.server = None
if self.server_thread:
self.server_thread.join(timeout=5)
self.server_thread = None
log.info("HTTP API server stopped")
def is_running(self) -> bool:
"""Check if the server is running."""
return self.server_thread is not None and self.server_thread.is_alive()

View File

@@ -0,0 +1,307 @@
"""Tests for HTTP API components."""
import json
import threading
import time
from unittest.mock import Mock
from urllib.error import HTTPError
from urllib.request import Request, urlopen
import pytest
from clan_lib.api import MethodRegistry
from clan_lib.log_manager import LogManager
from clan_app.api.api_bridge import BackendResponse
from clan_app.api.middleware import (
ArgumentParsingMiddleware,
LoggingMiddleware,
MethodExecutionMiddleware,
)
from clan_app.deps.http.http_bridge import HttpBridge
from clan_app.deps.http.http_server import HttpApiServer
@pytest.fixture
def mock_api() -> MethodRegistry:
"""Create a mock API with test methods."""
api = MethodRegistry()
@api.register
def test_method(message: str) -> dict[str, str]:
return {"response": f"Hello {message}!"}
@api.register
def test_method_with_error() -> dict[str, str]:
msg = "Test error"
raise ValueError(msg)
return api
@pytest.fixture
def mock_log_manager() -> Mock:
"""Create a mock log manager."""
log_manager = Mock(spec=LogManager)
log_manager.create_log_file.return_value.get_file_path.return_value = Mock()
log_manager.create_log_file.return_value.get_file_path.return_value.open.return_value = Mock()
return log_manager
@pytest.fixture
def http_bridge(mock_api: MethodRegistry, mock_log_manager: Mock) -> HttpBridge:
"""Create HTTP bridge with mock dependencies."""
return HttpBridge(
middleware_chain=(
ArgumentParsingMiddleware(api=mock_api),
LoggingMiddleware(log_manager=mock_log_manager),
MethodExecutionMiddleware(api=mock_api),
)
)
@pytest.fixture
def http_server(mock_api: MethodRegistry, mock_log_manager: Mock) -> HttpApiServer:
"""Create HTTP server with mock dependencies."""
return HttpApiServer(
api=mock_api,
log_manager=mock_log_manager,
host="127.0.0.1",
port=8081, # Use different port for tests
)
class TestHttpBridge:
"""Tests for HttpBridge class."""
def test_http_bridge_initialization(self, http_bridge: HttpBridge) -> None:
"""Test HTTP bridge initialization."""
assert http_bridge.threads == {}
assert http_bridge.response_handler is None
def test_set_response_handler(self, http_bridge: HttpBridge) -> None:
"""Test setting response handler."""
handler: Mock = Mock()
http_bridge.set_response_handler(handler)
assert http_bridge.response_handler == handler
def test_handle_http_request_success(self, http_bridge: HttpBridge) -> None:
"""Test successful HTTP request handling."""
# Set up response handler
response_received: threading.Event = threading.Event()
received_response: dict = {}
def response_handler(response: BackendResponse) -> None:
received_response["response"] = response
response_received.set()
http_bridge.set_response_handler(response_handler)
# Make request
request_data: dict = {"header": {}, "body": {"message": "World"}}
http_bridge.handle_http_request("test_method", request_data, "test-op-key")
# Wait for response
assert response_received.wait(timeout=5)
response = received_response["response"]
assert response._op_key == "test-op-key" # noqa: SLF001
assert response.body.data == {"response": "Hello World!"}
def test_handle_http_request_with_invalid_header(
self, http_bridge: HttpBridge
) -> None:
"""Test HTTP request with invalid header."""
response_received: threading.Event = threading.Event()
received_response: dict = {}
def response_handler(response: BackendResponse) -> None:
received_response["response"] = response
response_received.set()
http_bridge.set_response_handler(response_handler)
# Make request with invalid header
request_data: dict = {
"header": "invalid_header", # Should be dict
"body": {"message": "World"},
}
http_bridge.handle_http_request("test_method", request_data, "test-op-key")
# Wait for response
assert response_received.wait(timeout=5)
response = received_response["response"]
assert response._op_key == "test-op-key" # noqa: SLF001
assert response.body.status == "error"
class TestHttpApiServer:
"""Tests for HttpApiServer class."""
def test_server_initialization(self, http_server: HttpApiServer) -> None:
"""Test HTTP server initialization."""
assert http_server.host == "127.0.0.1"
assert http_server.port == 8081
assert http_server.server is None
assert http_server.server_thread is None
assert not http_server.is_running()
def test_server_start_stop(self, http_server: HttpApiServer) -> None:
"""Test starting and stopping the server."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
assert http_server.is_running()
# Stop server
http_server.stop()
time.sleep(0.1) # Give server time to stop
assert not http_server.is_running()
def test_server_endpoints(self, http_server: HttpApiServer) -> None:
"""Test server endpoints."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
try:
# Test root endpoint
response = urlopen("http://127.0.0.1:8081/")
data: dict = json.loads(response.read().decode())
assert data["message"] == "Clan API Server"
assert data["version"] == "1.0.0"
# Test methods endpoint
response = urlopen("http://127.0.0.1:8081/api/methods")
data = json.loads(response.read().decode())
assert "test_method" in data["methods"]
assert "test_method_with_error" in data["methods"]
# Test API call endpoint
request_data: dict = {"header": {}, "body": {"message": "World"}}
req: Request = Request(
"http://127.0.0.1:8081/api/call/test_method",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req)
data = json.loads(response.read().decode())
assert data["success"] is True
assert data["data"]["data"] == {"response": "Hello World!"}
finally:
# Always stop server
http_server.stop()
def test_server_error_handling(self, http_server: HttpApiServer) -> None:
"""Test server error handling."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
try:
# Test 404 error
with pytest.raises(HTTPError) as exc_info:
urlopen("http://127.0.0.1:8081/nonexistent")
assert exc_info.value.code == 404
# Test method not found
request_data: dict = {"header": {}, "body": {}}
req: Request = Request(
"http://127.0.0.1:8081/api/call/nonexistent_method",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
with pytest.raises(HTTPError) as exc_info:
urlopen(req)
assert exc_info.value.code == 404
# Test invalid JSON
req = Request(
"http://127.0.0.1:8081/api/call/test_method",
data=b"invalid json",
headers={"Content-Type": "application/json"},
)
with pytest.raises(HTTPError) as exc_info:
urlopen(req)
assert exc_info.value.code == 400
finally:
# Always stop server
http_server.stop()
def test_server_cors_headers(self, http_server: HttpApiServer) -> None:
"""Test CORS headers are properly set."""
# Start server
http_server.start()
time.sleep(0.1) # Give server time to start
try:
# Test OPTIONS request
class OptionsRequest(Request):
def get_method(self) -> str:
return "OPTIONS"
req: Request = OptionsRequest("http://127.0.0.1:8081/api/call/test_method")
response = urlopen(req)
# Check CORS headers
headers = response.info()
assert headers.get("Access-Control-Allow-Origin") == "*"
assert "GET" in headers.get("Access-Control-Allow-Methods", "")
assert "POST" in headers.get("Access-Control-Allow-Methods", "")
finally:
# Always stop server
http_server.stop()
class TestIntegration:
"""Integration tests for HTTP API components."""
def test_full_request_flow(
self, mock_api: MethodRegistry, mock_log_manager: Mock
) -> None:
"""Test complete request flow from server to bridge to middleware."""
server: HttpApiServer = HttpApiServer(
api=mock_api,
log_manager=mock_log_manager,
host="127.0.0.1",
port=8082,
)
# Start server
server.start()
time.sleep(0.1) # Give server time to start
try:
# Make API call
request_data: dict = {
"header": {"logging": {"group_path": ["test", "group"]}},
"body": {"message": "Integration"},
}
req: Request = Request(
"http://127.0.0.1:8082/api/call/test_method",
data=json.dumps(request_data).encode(),
headers={"Content-Type": "application/json"},
)
response = urlopen(req)
data: dict = json.loads(response.read().decode())
# Verify response
assert data["success"] is True
assert data["data"]["data"] == {"response": "Hello Integration!"}
assert "op_key" in data
finally:
# Always stop server
server.stop()
if __name__ == "__main__":
pytest.main([__file__, "-v"])