clan-lib: Add LogManager class

This commit is contained in:
Qubasa
2025-06-12 16:24:40 +02:00
parent 726b8f4c6c
commit e6025493c4
5 changed files with 1187 additions and 48 deletions

View File

@@ -1,5 +1,6 @@
import ctypes
import functools
import io
import json
import logging
import threading
@@ -15,7 +16,9 @@ from clan_lib.api import (
dataclass_to_dict,
from_dict,
)
from clan_lib.async_run import set_should_cancel
from clan_lib.api.log_manager import LogManager
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import setup_logging
from ._webview_ffi import _encode_c_string, _webview_lib
@@ -60,6 +63,7 @@ class Webview:
def api_wrapper(
self,
log_manager: LogManager,
api: MethodRegistry,
method_name: str,
wrap_method: Callable[..., Any],
@@ -71,53 +75,104 @@ class Webview:
args = json.loads(request_data.decode())
log.debug(f"Calling {method_name}({args[0]})")
# Initialize dataclasses from the payload
reconciled_arguments = {}
for k, v in args[0].items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = api.get_method_argtype(method_name, k)
try:
# Initialize dataclasses from the payload
reconciled_arguments = {}
for k, v in args[0].items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
# Depending on the introspected argument_type
arg_class = api.get_method_argtype(method_name, k)
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
reconciled_arguments["op_key"] = op_key
# TODO: We could remove the wrapper in the MethodRegistry
# and just call the method directly
reconciled_arguments["op_key"] = op_key
except Exception as e:
log.exception(f"Error while parsing arguments for {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
return
def thread_task(stop_event: threading.Event) -> None:
try:
set_should_cancel(lambda: stop_event.is_set())
result = wrap_method(**reconciled_arguments)
ctx: AsyncContext = get_async_ctx()
ctx.should_cancel = lambda: stop_event.is_set()
log_file = log_manager.create_log_file(
wrap_method, op_key=op_key
).get_file_path()
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
with log_file.open("ab") as log_f:
# Redirect all cmd.run logs to this file.
ctx.stderr = log_f
ctx.stdout = log_f
set_async_ctx(ctx)
log.debug(f"Result for {method_name}: {serialized}")
self.return_(op_key, FuncStatus.SUCCESS, serialized)
except Exception as e:
log.exception(f"Error while handling result of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
# Add a new handler to the root logger that writes to log_f
handler_stream = io.TextIOWrapper(
log_f, encoding="utf-8", write_through=True, line_buffering=True
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
handler = setup_logging(
log.getEffectiveLevel(), log_file=handler_stream
)
self.return_(op_key, FuncStatus.FAILURE, serialized)
finally:
del self.threads[op_key]
log.info("Starting thread for webview API call")
try:
# Original logic: call the wrapped API method.
result = wrap_method(**reconciled_arguments)
# Serialize the result to JSON.
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
# This log message will now also be written to log_f
# through the thread_log_handler.
log.debug(f"Result for {method_name}: {serialized}")
# Return the successful result.
self.return_(op_key, FuncStatus.SUCCESS, serialized)
except Exception as e:
log.exception(f"Error while handling result of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
finally:
# Crucial cleanup: remove the handler from the root logger.
# This stops redirecting logs for this thread to log_f and prevents
# the handler from being used after log_f is closed.
handler.root_logger.removeHandler(handler.new_handler)
# Close the handler. For a StreamHandler using a stream it doesn't
# own (log_f is managed by the 'with' statement), this typically
# flushes the stream.
handler.new_handler.close()
del self.threads[op_key]
stop_event = threading.Event()
thread = threading.Thread(
@@ -173,10 +228,11 @@ class Webview:
log.info("Shutting down webview...")
self.destroy()
def bind_jsonschema_api(self, api: MethodRegistry) -> None:
def bind_jsonschema_api(self, api: MethodRegistry, log_manager: LogManager) -> None:
for name, method in api.functions.items():
wrapper = functools.partial(
self.api_wrapper,
log_manager,
api,
name,
method,