Merge pull request 'clan-lib: Add log_manager api, clan-app: Fix API test page' (#3943) from Qubasa/clan-core:ui_log_collection2 into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/3943
This commit is contained in:
Luis Hebendanz
2025-06-12 14:35:15 +00:00
8 changed files with 1296 additions and 87 deletions

View File

@@ -7,8 +7,15 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from clan_lib.api import API, ErrorDataClass, SuccessDataClass import clan_lib.machines.actions # noqa: F401
from clan_lib.api import API, ApiError, ErrorDataClass, SuccessDataClass
from clan_lib.api.log_manager import LogDayDir, LogFile, LogFuncDir, LogManager
# TODO: We have to manually import python files to make the API.register be triggered.
# We NEED to fix this, as this is super unintuitive and error-prone.
from clan_lib.api.tasks import list_tasks as dummy_list # noqa: F401
from clan_lib.custom_logger import setup_logging from clan_lib.custom_logger import setup_logging
from clan_lib.dirs import user_data_dir
from clan_app.api.file_gtk import open_file from clan_app.api.file_gtk import open_file
from clan_app.deps.webview.webview import Size, SizeHint, Webview from clan_app.deps.webview.webview import Size, SizeHint, Webview
@@ -40,6 +47,8 @@ def app_run(app_opts: ClanAppOptions) -> int:
# This seems to call the gtk api correctly but and gtk also seems to our icon, but somehow the icon is not loaded. # This seems to call the gtk api correctly but and gtk also seems to our icon, but somehow the icon is not loaded.
webview.icon = "clan-white" webview.icon = "clan-white"
log_manager = LogManager(base_dir=user_data_dir() / "clan-app" / "logs")
def cancel_task( def cancel_task(
task_id: str, *, op_key: str task_id: str, *, op_key: str
) -> SuccessDataClass[None] | ErrorDataClass: ) -> SuccessDataClass[None] | ErrorDataClass:
@@ -70,14 +79,118 @@ def app_run(app_opts: ClanAppOptions) -> int:
status="success", status="success",
) )
# TODO: We have to manually import python files to make the API.register be triggered. def list_log_days(
# We NEED to fix this, as this is super unintuitive and error-prone. *, op_key: str
import clan_lib.machines.actions # noqa: F401 ) -> SuccessDataClass[list[LogDayDir]] | ErrorDataClass:
"""List all log days."""
log.debug("Listing all log days.")
return SuccessDataClass(
op_key=op_key,
data=log_manager.list_log_days(),
status="success",
)
def list_log_funcs_at_day(
day: str, *, op_key: str
) -> SuccessDataClass[list[LogFuncDir]] | ErrorDataClass:
"""List all log functions at a specific day."""
log.debug(f"Listing all log functions for day: {day}")
try:
log_day_dir = LogDayDir(date_day=day, _base_dir=log_manager.base_dir)
except ValueError:
return ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="Invalid day format",
description=f"Day {day} is not in the correct format (YYYY-MM-DD).",
location=["app::list_log_funcs_at_day", "day"],
)
],
)
return SuccessDataClass(
op_key=op_key,
data=log_day_dir.get_log_files(),
status="success",
)
def list_log_files(
day: str, func_name: str, *, op_key: str
) -> SuccessDataClass[list[LogFile]] | ErrorDataClass:
"""List all log functions at a specific day."""
log.debug(f"Listing all log functions for day: {day}")
try:
log_func_dir = LogFuncDir(
date_day=day, func_name=func_name, _base_dir=log_manager.base_dir
)
except ValueError:
return ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="Invalid day format",
description=f"Day {day} is not in the correct format (YYYY-MM-DD).",
location=["app::list_log_files", "day"],
)
],
)
return SuccessDataClass(
op_key=op_key,
data=log_func_dir.get_log_files(),
status="success",
)
def get_log_file(
id_key: str, *, op_key: str
) -> SuccessDataClass[str] | ErrorDataClass:
"""Get a specific log file."""
try:
log_file = log_manager.get_log_file(id_key)
except ValueError:
return ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="Invalid log file ID",
description=f"Log file ID {id_key} is not in the correct format.",
location=["app::get_log_file", "id_key"],
)
],
)
if not log_file:
return ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="Log file not found",
description=f"Log file with id {id_key} not found.",
location=["app::get_log_file", "id_key"],
)
],
)
log_file_path = log_file.get_file_path()
return SuccessDataClass(
op_key=op_key,
data=log_file_path.read_text(encoding="utf-8"),
status="success",
)
API.overwrite_fn(list_tasks) API.overwrite_fn(list_tasks)
API.overwrite_fn(open_file) API.overwrite_fn(open_file)
API.overwrite_fn(cancel_task) API.overwrite_fn(cancel_task)
webview.bind_jsonschema_api(API) API.overwrite_fn(list_log_days)
API.overwrite_fn(list_log_funcs_at_day)
API.overwrite_fn(list_log_files)
API.overwrite_fn(get_log_file)
webview.bind_jsonschema_api(API, log_manager=log_manager)
webview.size = Size(1280, 1024, SizeHint.NONE) webview.size = Size(1280, 1024, SizeHint.NONE)
webview.navigate(content_uri) webview.navigate(content_uri)
webview.run() webview.run()

View File

@@ -1,5 +1,6 @@
import ctypes import ctypes
import functools import functools
import io
import json import json
import logging import logging
import threading import threading
@@ -15,7 +16,9 @@ from clan_lib.api import (
dataclass_to_dict, dataclass_to_dict,
from_dict, from_dict,
) )
from clan_lib.async_run import set_should_cancel from clan_lib.api.log_manager import LogManager
from clan_lib.async_run import AsyncContext, get_async_ctx, set_async_ctx
from clan_lib.custom_logger import setup_logging
from ._webview_ffi import _encode_c_string, _webview_lib from ._webview_ffi import _encode_c_string, _webview_lib
@@ -60,6 +63,7 @@ class Webview:
def api_wrapper( def api_wrapper(
self, self,
log_manager: LogManager,
api: MethodRegistry, api: MethodRegistry,
method_name: str, method_name: str,
wrap_method: Callable[..., Any], wrap_method: Callable[..., Any],
@@ -71,53 +75,104 @@ class Webview:
args = json.loads(request_data.decode()) args = json.loads(request_data.decode())
log.debug(f"Calling {method_name}({args[0]})") log.debug(f"Calling {method_name}({args[0]})")
# Initialize dataclasses from the payload try:
reconciled_arguments = {} # Initialize dataclasses from the payload
for k, v in args[0].items(): reconciled_arguments = {}
# Some functions expect to be called with dataclass instances for k, v in args[0].items():
# But the js api returns dictionaries. # Some functions expect to be called with dataclass instances
# Introspect the function and create the expected dataclass from dict dynamically # But the js api returns dictionaries.
# Depending on the introspected argument_type # Introspect the function and create the expected dataclass from dict dynamically
arg_class = api.get_method_argtype(method_name, k) # Depending on the introspected argument_type
arg_class = api.get_method_argtype(method_name, k)
# TODO: rename from_dict into something like construct_checked_value # TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class # from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v) reconciled_arguments[k] = from_dict(arg_class, v)
reconciled_arguments["op_key"] = op_key reconciled_arguments["op_key"] = op_key
# TODO: We could remove the wrapper in the MethodRegistry except Exception as e:
# and just call the method directly log.exception(f"Error while parsing arguments for {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
return
def thread_task(stop_event: threading.Event) -> None: def thread_task(stop_event: threading.Event) -> None:
try: ctx: AsyncContext = get_async_ctx()
set_should_cancel(lambda: stop_event.is_set()) ctx.should_cancel = lambda: stop_event.is_set()
result = wrap_method(**reconciled_arguments) log_file = log_manager.create_log_file(
wrap_method, op_key=op_key
).get_file_path()
serialized = json.dumps( with log_file.open("ab") as log_f:
dataclass_to_dict(result), indent=4, ensure_ascii=False # Redirect all cmd.run logs to this file.
) ctx.stderr = log_f
ctx.stdout = log_f
set_async_ctx(ctx)
log.debug(f"Result for {method_name}: {serialized}") # Add a new handler to the root logger that writes to log_f
self.return_(op_key, FuncStatus.SUCCESS, serialized) handler_stream = io.TextIOWrapper(
except Exception as e: log_f, encoding="utf-8", write_through=True, line_buffering=True
log.exception(f"Error while handling result of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
) )
serialized = json.dumps( handler = setup_logging(
dataclass_to_dict(result), indent=4, ensure_ascii=False log.getEffectiveLevel(), log_file=handler_stream
) )
self.return_(op_key, FuncStatus.FAILURE, serialized) log.info("Starting thread for webview API call")
finally:
del self.threads[op_key] try:
# Original logic: call the wrapped API method.
result = wrap_method(**reconciled_arguments)
# Serialize the result to JSON.
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
# This log message will now also be written to log_f
# through the thread_log_handler.
log.debug(f"Result for {method_name}: {serialized}")
# Return the successful result.
self.return_(op_key, FuncStatus.SUCCESS, serialized)
except Exception as e:
log.exception(f"Error while handling result of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["bind_jsonschema_api", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
finally:
# Crucial cleanup: remove the handler from the root logger.
# This stops redirecting logs for this thread to log_f and prevents
# the handler from being used after log_f is closed.
handler.root_logger.removeHandler(handler.new_handler)
# Close the handler. For a StreamHandler using a stream it doesn't
# own (log_f is managed by the 'with' statement), this typically
# flushes the stream.
handler.new_handler.close()
del self.threads[op_key]
stop_event = threading.Event() stop_event = threading.Event()
thread = threading.Thread( thread = threading.Thread(
@@ -173,10 +228,11 @@ class Webview:
log.info("Shutting down webview...") log.info("Shutting down webview...")
self.destroy() self.destroy()
def bind_jsonschema_api(self, api: MethodRegistry) -> None: def bind_jsonschema_api(self, api: MethodRegistry, log_manager: LogManager) -> None:
for name, method in api.functions.items(): for name, method in api.functions.items():
wrapper = functools.partial( wrapper = functools.partial(
self.api_wrapper, self.api_wrapper,
log_manager,
api, api,
name, name,
method, method,

View File

@@ -2,22 +2,28 @@ import {
createForm, createForm,
FieldValues, FieldValues,
getValues, getValues,
setValue,
SubmitHandler, SubmitHandler,
} from "@modular-forms/solid"; } from "@modular-forms/solid";
import { TextInput } from "@/src/Form/fields/TextInput"; import { TextInput } from "@/src/Form/fields/TextInput";
import { Button } from "./components/Button/Button"; import { Button } from "./components/Button/Button";
import { callApi } from "./api"; import { callApi } from "./api";
import { API } from "@/api/API"; import { API } from "@/api/API";
import { createSignal, Match, Switch } from "solid-js"; import { createSignal, Match, Switch, For, Show } from "solid-js";
import { Typography } from "./components/Typography"; import { Typography } from "./components/Typography";
import { createQuery } from "@tanstack/solid-query"; import { useQuery } from "@tanstack/solid-query";
import { makePersisted } from "@solid-primitives/storage"; import { makePersisted } from "@solid-primitives/storage";
import jsonSchema from "@/api/API.json";
interface APITesterForm extends FieldValues { interface APITesterForm extends FieldValues {
endpoint: string; endpoint: string;
payload: string; payload: string;
} }
const ACTUAL_API_ENDPOINT_NAMES: (keyof API)[] = jsonSchema.required.map(
(key) => key as keyof API,
);
export const ApiTester = () => { export const ApiTester = () => {
const [persistedTestData, setPersistedTestData] = makePersisted( const [persistedTestData, setPersistedTestData] = makePersisted(
createSignal<APITesterForm>(), createSignal<APITesterForm>(),
@@ -28,34 +34,48 @@ export const ApiTester = () => {
); );
const [formStore, { Form, Field }] = createForm<APITesterForm>({ const [formStore, { Form, Field }] = createForm<APITesterForm>({
initialValues: persistedTestData(), initialValues: persistedTestData() || { endpoint: "", payload: "" },
}); });
const query = createQuery(() => ({ const [endpointSearchTerm, setEndpointSearchTerm] = createSignal(
// eslint-disable-next-line @tanstack/query/exhaustive-deps getValues(formStore).endpoint || "",
queryKey: [], );
queryFn: async () => { const [showSuggestions, setShowSuggestions] = createSignal(false);
const values = getValues(formStore);
return await callApi( const filteredEndpoints = () => {
values.endpoint as keyof API, const term = endpointSearchTerm().toLowerCase();
JSON.parse(values.payload || ""), if (!term) return ACTUAL_API_ENDPOINT_NAMES;
); return ACTUAL_API_ENDPOINT_NAMES.filter((ep) =>
}, ep.toLowerCase().includes(term),
staleTime: Infinity, );
})); };
const query = useQuery(() => {
const currentEndpoint = getValues(formStore).endpoint;
const currentPayload = getValues(formStore).payload;
const values = getValues(formStore);
return {
queryKey: ["api-tester", currentEndpoint, currentPayload],
queryFn: async () => {
return await callApi(
values.endpoint as keyof API,
JSON.parse(values.payload || "{}"),
);
},
staleTime: Infinity,
enabled: false,
};
});
const handleSubmit: SubmitHandler<APITesterForm> = (values) => { const handleSubmit: SubmitHandler<APITesterForm> = (values) => {
console.log(values); console.log(values);
setPersistedTestData(values); setPersistedTestData(values);
setEndpointSearchTerm(values.endpoint);
query.refetch(); query.refetch();
const v = getValues(formStore); const v = getValues(formStore);
console.log(v); console.log(v);
// const result = callApi(
// values.endpoint as keyof API,
// JSON.parse(values.payload)
// );
// setResult(result);
}; };
return ( return (
<div class="p-2"> <div class="p-2">
@@ -64,11 +84,60 @@ export const ApiTester = () => {
<div class="flex flex-col"> <div class="flex flex-col">
<Field name="endpoint"> <Field name="endpoint">
{(field, fieldProps) => ( {(field, fieldProps) => (
<TextInput <div class="relative">
label={"endpoint"} <TextInput
value={field.value || ""} label={"endpoint"}
inputProps={fieldProps} value={field.value || ""}
/> inputProps={{
...fieldProps,
onInput: (e: Event) => {
if (fieldProps.onInput) {
(fieldProps.onInput as (ev: Event) => void)(e);
}
setEndpointSearchTerm(
(e.currentTarget as HTMLInputElement).value,
);
setShowSuggestions(true);
},
onBlur: (e: FocusEvent) => {
if (fieldProps.onBlur) {
(fieldProps.onBlur as (ev: FocusEvent) => void)(e);
}
setTimeout(() => setShowSuggestions(false), 150);
},
onFocus: (e: FocusEvent) => {
setEndpointSearchTerm(field.value || "");
setShowSuggestions(true);
},
onKeyDown: (e: KeyboardEvent) => {
if (e.key === "Escape") {
setShowSuggestions(false);
}
},
}}
/>
<Show
when={showSuggestions() && filteredEndpoints().length > 0}
>
<ul class="absolute z-10 w-full bg-white border border-gray-300 rounded mt-1 max-h-60 overflow-y-auto shadow-lg">
<For each={filteredEndpoints()}>
{(ep) => (
<li
class="p-2 hover:bg-gray-100 cursor-pointer"
onMouseDown={(e) => {
e.preventDefault();
setValue(formStore, "endpoint", ep);
setEndpointSearchTerm(ep);
setShowSuggestions(false);
}}
>
{ep}
</li>
)}
</For>
</ul>
</Show>
</div>
)} )}
</Field> </Field>
<Field name="payload"> <Field name="payload">

View File

@@ -32,20 +32,6 @@ class FileRequest:
initial_folder: str | None = field(default=None) initial_folder: str | None = field(default=None)
@API.register_abstract
def cancel_task(task_id: str) -> None:
"""Cancel a task by its op_key."""
msg = "cancel_task() is not implemented"
raise NotImplementedError(msg)
@API.register_abstract
def list_tasks() -> list[str]:
"""List all tasks."""
msg = "list_tasks() is not implemented"
raise NotImplementedError(msg)
@API.register_abstract @API.register_abstract
def open_file(file_request: FileRequest) -> list[str] | None: def open_file(file_request: FileRequest) -> list[str] | None:
""" """

View File

@@ -0,0 +1,335 @@
import datetime
import logging
from collections.abc import Callable # Union for str | None
from dataclasses import dataclass
from functools import total_ordering
from pathlib import Path
from clan_lib.api import API
log = logging.getLogger(__name__)
# Global helper function for format checking (used by LogManager and internally by classes)
def is_correct_day_format(date_day: str) -> bool:
"""Check if the date_day is in the correct format YYYY-MM-DD."""
try:
datetime.datetime.strptime(date_day, "%Y-%m-%d").replace(tzinfo=datetime.UTC)
except ValueError:
return False
return True
@total_ordering
@dataclass(frozen=True)
class LogFile:
op_key: str
date_day: str # YYYY-MM-DD
func_name: str
_base_dir: Path
date_second: str # HH-MM-SS
def __post_init__(self) -> None:
# Validate formats upon initialization.
if not is_correct_day_format(self.date_day):
msg = f"LogFile.date_day '{self.date_day}' is not in YYYY-MM-DD format."
raise ValueError(msg)
try:
datetime.datetime.strptime(self.date_second, "%H-%M-%S").replace(
tzinfo=datetime.UTC
)
except ValueError as ex:
msg = f"LogFile.date_second '{self.date_second}' is not in HH-MM-SS format."
raise ValueError(msg) from ex
@property
def _datetime_obj(self) -> datetime.datetime:
# Formats are pre-validated by __post_init__.
return datetime.datetime.strptime(
f"{self.date_day} {self.date_second}", "%Y-%m-%d %H-%M-%S"
).replace(tzinfo=datetime.UTC)
@classmethod
def from_path(cls, file: Path) -> "LogFile":
date_day = file.parent.parent.name
func_name = file.parent.name
base_dir = file.parent.parent.parent
filename_stem = file.stem
parts = filename_stem.split("_", 1)
if len(parts) != 2:
msg = f"Log filename '{file.name}' in dir '{file.parent}' does not match 'HH-MM-SS_op_key.log' format."
raise ValueError(msg)
date_second_str = parts[0]
op_key_str = parts[1]
return LogFile(
op_key=op_key_str,
date_day=date_day,
date_second=date_second_str,
func_name=func_name,
_base_dir=base_dir,
)
def get_file_path(self) -> Path:
return (
self._base_dir
/ self.date_day
/ self.func_name
/ f"{self.date_second}_{self.op_key}.log"
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogFile):
return NotImplemented
# Compare all significant fields for equality
return (
self._datetime_obj == other._datetime_obj
and self.func_name == other.func_name
and self.op_key == other.op_key
and self._base_dir == other._base_dir
)
def __lt__(self, other: object) -> bool:
if not isinstance(other, LogFile):
return NotImplemented
# Primary sort: datetime (newest first). self is "less than" other if self is newer.
if self._datetime_obj != other._datetime_obj:
return self._datetime_obj > other._datetime_obj
# Secondary sort: func_name (alphabetical ascending)
if self.func_name != other.func_name:
return self.func_name < other.func_name
# Tertiary sort: op_key (alphabetical ascending)
return self.op_key < other.op_key
@total_ordering
@dataclass(frozen=True)
class LogFuncDir:
date_day: str
func_name: str
_base_dir: Path
def __post_init__(self) -> None:
if not is_correct_day_format(self.date_day):
msg = f"LogFuncDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
raise ValueError(msg)
@property
def _date_obj(self) -> datetime.date:
return (
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
.replace(tzinfo=datetime.UTC)
.date()
)
def get_dir_path(self) -> Path:
return self._base_dir / self.date_day / self.func_name
def get_log_files(self) -> list[LogFile]:
dir_path = self.get_dir_path()
if not dir_path.exists() or not dir_path.is_dir():
return []
log_files_list: list[LogFile] = []
for file_path in dir_path.iterdir():
if file_path.is_file() and file_path.suffix == ".log":
try:
log_files_list.append(LogFile.from_path(file_path))
except ValueError:
log.warning(
f"Skipping malformed log file '{file_path.name}' in '{dir_path}'."
)
return sorted(log_files_list) # Sorts using LogFile.__lt__ (newest first)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogFuncDir):
return NotImplemented
return (
self.date_day == other.date_day
and self.func_name == other.func_name
and self._base_dir == other._base_dir
)
def __lt__(self, other: object) -> bool:
if not isinstance(other, LogFuncDir):
return NotImplemented
# Primary sort: date (newest first)
if self._date_obj != other._date_obj:
return self._date_obj > other._date_obj
# Secondary sort: func_name (alphabetical ascending)
return self.func_name < other.func_name
@total_ordering
@dataclass(frozen=True)
class LogDayDir:
date_day: str
_base_dir: Path
def __post_init__(self) -> None:
if not is_correct_day_format(self.date_day):
msg = f"LogDayDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
raise ValueError(msg)
@property
def _date_obj(self) -> datetime.date:
return (
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
.replace(tzinfo=datetime.UTC)
.date()
)
def get_dir_path(self) -> Path:
return self._base_dir / self.date_day
# This method returns a list of LogFuncDir objects, as per the original structure.
def get_log_files(self) -> list[LogFuncDir]:
dir_path = self.get_dir_path()
if not dir_path.exists() or not dir_path.is_dir():
return []
func_dirs_list: list[LogFuncDir] = []
for func_dir_path in dir_path.iterdir():
if func_dir_path.is_dir():
try:
func_dirs_list.append(
LogFuncDir(
date_day=self.date_day,
func_name=func_dir_path.name,
_base_dir=self._base_dir,
)
)
except (
ValueError
): # Should mainly catch issues if self.date_day was somehow invalid
log.warning(
f"Warning: Skipping malformed function directory '{func_dir_path.name}' in '{dir_path}'."
)
# Sorts using LogFuncDir.__lt__ (newest date first, then by func_name).
# Since all LogFuncDir here share the same date_day, they'll be sorted by func_name.
return sorted(func_dirs_list)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogDayDir):
return NotImplemented
return self.date_day == other.date_day and self._base_dir == other._base_dir
def __lt__(self, other: object) -> bool:
if not isinstance(other, LogDayDir):
return NotImplemented
# Primary sort: date (newest first)
return self._date_obj > other._date_obj
@dataclass(frozen=True)
class LogManager:
base_dir: Path
def create_log_file(self, func: Callable, op_key: str) -> LogFile:
now_utc = datetime.datetime.now(tz=datetime.UTC)
log_file = LogFile(
op_key=op_key,
date_day=now_utc.strftime("%Y-%m-%d"),
date_second=now_utc.strftime("%H-%M-%S"), # Corrected original's %H-$M-%S
func_name=func.__name__,
_base_dir=self.base_dir,
)
log_path = log_file.get_file_path()
log_path.parent.mkdir(parents=True, exist_ok=True)
if log_path.exists():
msg = f"BUG! Log file {log_path} already exists."
raise FileExistsError(msg)
log_path.touch()
return log_file
def list_log_days(self) -> list[LogDayDir]:
if not self.base_dir.exists() or not self.base_dir.is_dir():
return []
log_day_dirs_list: list[LogDayDir] = []
for day_dir_candidate_path in self.base_dir.iterdir():
if day_dir_candidate_path.is_dir() and is_correct_day_format(
day_dir_candidate_path.name
):
try:
log_day_dirs_list.append(
LogDayDir(
date_day=day_dir_candidate_path.name,
_base_dir=self.base_dir,
)
)
except ValueError:
log.warning(
f"Skipping directory with invalid date format '{day_dir_candidate_path.name}'."
)
return sorted(log_day_dirs_list) # Sorts using LogDayDir.__lt__ (newest first)
def get_log_file(
self, op_key_to_find: str, specific_date_day: str | None = None
) -> LogFile | None:
days_to_search: list[LogDayDir]
if specific_date_day:
if not is_correct_day_format(specific_date_day):
# print(f"Warning: Provided specific_date_day '{specific_date_day}' is not in YYYY-MM-DD format.")
return None
try:
target_day_dir = LogDayDir(
date_day=specific_date_day, _base_dir=self.base_dir
)
if (
not target_day_dir.get_dir_path().exists()
): # Check if dir exists on disk
return None
days_to_search = [target_day_dir] # Search only this specific day
except ValueError: # If LogDayDir construction fails (e.g. date_day format despite is_correct_day_format)
return None
else:
days_to_search = self.list_log_days() # Already sorted, newest day first
for day_dir in (
days_to_search
): # Iterates newest day first if days_to_search came from list_log_days()
# day_dir.get_log_files() returns List[LogFuncDir], sorted by func_name (date is same)
for func_dir in day_dir.get_log_files():
# func_dir.get_log_files() returns List[LogFile], sorted newest file first
for log_file in func_dir.get_log_files():
if log_file.op_key == op_key_to_find:
return log_file
return None
@API.register_abstract
def list_log_days() -> list[LogDayDir]:
"""List all logs."""
msg = "list_logs() is not implemented"
raise NotImplementedError(msg)
@API.register_abstract
def list_log_funcs_at_day(day: str) -> list[LogFuncDir]:
"""List all logs for a specific function on a specific day."""
msg = "list_func_logs() is not implemented"
raise NotImplementedError(msg)
@API.register_abstract
def list_log_files(day: str, func_name: str) -> list[LogFile]:
"""List all log files for a specific function on a specific day."""
msg = "list_func_logs() is not implemented"
raise NotImplementedError(msg)
@API.register_abstract
def get_log_file(id_key: str) -> str:
"""Get a specific log file by op_key, function name and day."""
msg = "get_log_file() is not implemented"
raise NotImplementedError(msg)

View File

@@ -0,0 +1,15 @@
from clan_lib.api import API
@API.register_abstract
def cancel_task(task_id: str) -> None:
"""Cancel a task by its op_key."""
msg = "cancel_task() is not implemented"
raise NotImplementedError(msg)
@API.register_abstract
def list_tasks() -> list[str]:
"""List all tasks."""
msg = "list_tasks() is not implemented"
raise NotImplementedError(msg)

View File

@@ -0,0 +1,622 @@
# ruff: noqa: SLF001
import datetime
import logging # For LogManager if not already imported
from pathlib import Path
from typing import Any # Added Dict
import pytest
# Assuming your classes are in a file named 'log_manager_module.py'
# If they are in the same file as the tests, you don't need this relative import.
from .log_manager import (
LogDayDir,
LogFile,
LogFuncDir,
LogManager,
is_correct_day_format,
)
# Dummy function for LogManager.create_log_file
def sample_func_one() -> None:
pass
def sample_func_two() -> None:
pass
# --- Fixtures ---
@pytest.fixture
def base_dir(tmp_path: Path) -> Path:
"""Provides a temporary base directory for logs."""
return tmp_path / "logs"
@pytest.fixture
def log_manager(base_dir: Path) -> LogManager:
"""Provides a LogManager instance initialized with the temporary base_dir."""
return LogManager(base_dir=base_dir)
@pytest.fixture
def populated_log_structure(
log_manager: LogManager, base_dir: Path, monkeypatch: pytest.MonkeyPatch
) -> tuple[LogManager, Path, dict[str, LogFile]]:
"""
Creates a predefined log structure for testing listing and retrieval.
Returns the log_manager, base_dir, and a dictionary of created LogFile objects.
"""
created_files: dict[str, LogFile] = {}
# Mock datetime.datetime.now for predictable file names
class MockDateTime(datetime.datetime):
_now_val = datetime.datetime(2023, 10, 26, 10, 0, 0, tzinfo=datetime.UTC)
_delta = datetime.timedelta(seconds=0)
@classmethod
def now(cls: Any, tz: Any = None) -> "MockDateTime":
current = cls._now_val + cls._delta
cls._delta += datetime.timedelta(
seconds=1, minutes=1
) # Increment for uniqueness
return current
monkeypatch.setattr(datetime, "datetime", MockDateTime)
# Day 1: 2023-10-26
# Func A
lf1 = log_manager.create_log_file(sample_func_one, "op_key_A1") # 10-00-00
created_files["lf1"] = lf1
lf2 = log_manager.create_log_file(sample_func_one, "op_key_A2") # 10-01-01
created_files["lf2"] = lf2
# Func B
lf3 = log_manager.create_log_file(sample_func_two, "op_key_B1") # 10-02-02
created_files["lf3"] = lf3
# Day 2: 2023-10-27 (by advancing mock time enough)
MockDateTime._now_val = datetime.datetime(
2023, 10, 27, 12, 0, 0, tzinfo=datetime.UTC
)
MockDateTime._delta = datetime.timedelta(seconds=0) # Reset delta for new day
lf4 = log_manager.create_log_file(sample_func_one, "op_key_A3_day2") # 12-00-00
created_files["lf4"] = lf4
# Create a malformed file and dir to test skipping
malformed_day_dir = base_dir / "2023-13-01" # Invalid date
malformed_day_dir.mkdir(parents=True, exist_ok=True)
(malformed_day_dir / "some_func").mkdir(exist_ok=True)
malformed_func_dir = base_dir / "2023-10-26" / "malformed_func_dir_name!"
malformed_func_dir.mkdir(parents=True, exist_ok=True)
malformed_log_file_dir = base_dir / "2023-10-26" / sample_func_one.__name__
(malformed_log_file_dir / "badname.log").touch()
(malformed_log_file_dir / "10-00-00_op_key.txt").touch() # Wrong suffix
return log_manager, base_dir, created_files
# --- Tests for is_correct_day_format ---
@pytest.mark.parametrize(
("date_str", "expected"),
[
("2023-10-26", True),
("2024-01-01", True),
("2023-10-26X", False),
("2023/10/26", False),
("23-10-26", False),
("2023-13-01", False), # Invalid month
("2023-02-30", False), # Invalid day
("random-string", False),
("", False),
],
)
def test_is_correct_day_format(date_str: str, expected: bool) -> None:
assert is_correct_day_format(date_str) == expected
# --- Tests for LogFile ---
class TestLogFile:
def test_creation_valid(self, tmp_path: Path) -> None:
lf = LogFile("op1", "2023-10-26", "my_func", tmp_path, "10-20-30")
assert lf.op_key == "op1"
assert lf.date_day == "2023-10-26"
assert lf.func_name == "my_func"
assert lf._base_dir == tmp_path
assert lf.date_second == "10-20-30"
def test_creation_invalid_date_day(self, tmp_path: Path) -> None:
with pytest.raises(ValueError, match="not in YYYY-MM-DD format"):
LogFile("op1", "2023/10/26", "my_func", tmp_path, "10-20-30")
def test_creation_invalid_date_second(self, tmp_path: Path) -> None:
with pytest.raises(ValueError, match="not in HH-MM-SS format"):
LogFile("op1", "2023-10-26", "my_func", tmp_path, "10:20:30")
def test_datetime_obj(self, tmp_path: Path) -> None:
lf = LogFile("op1", "2023-10-26", "my_func", tmp_path, "10-20-30")
expected_dt = datetime.datetime(2023, 10, 26, 10, 20, 30, tzinfo=datetime.UTC)
assert lf._datetime_obj == expected_dt
def test_from_path_valid(self, tmp_path: Path) -> None:
base = tmp_path / "logs"
file_path = base / "2023-10-26" / "my_func" / "10-20-30_op_key_123.log"
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch()
lf = LogFile.from_path(file_path)
assert lf.op_key == "op_key_123"
assert lf.date_day == "2023-10-26"
assert lf.func_name == "my_func"
assert lf._base_dir == base
assert lf.date_second == "10-20-30"
def test_from_path_invalid_filename_format(self, tmp_path: Path) -> None:
file_path = (
tmp_path / "logs" / "2023-10-26" / "my_func" / "10-20-30-op_key_123.log"
) # Extra dash
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch()
with pytest.raises(ValueError, match="is not in HH-MM-SS format."):
LogFile.from_path(file_path)
def test_from_path_filename_no_op_key(self, tmp_path: Path) -> None:
file_path = tmp_path / "logs" / "2023-10-26" / "my_func" / "10-20-30_.log"
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.touch()
# This will result in op_key being ""
lf = LogFile.from_path(file_path)
assert lf.op_key == ""
def test_get_file_path(self, tmp_path: Path) -> None:
lf = LogFile("op1", "2023-10-26", "my_func", tmp_path, "10-20-30")
expected_path = tmp_path / "2023-10-26" / "my_func" / "10-20-30_op1.log"
assert lf.get_file_path() == expected_path
def test_equality(self, tmp_path: Path) -> None:
lf1 = LogFile("op1", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf2 = LogFile("op1", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf3 = LogFile(
"op2", "2023-10-26", "func_a", tmp_path, "10-00-00"
) # Diff op_key
lf4 = LogFile("op1", "2023-10-26", "func_a", tmp_path, "10-00-01") # Diff time
assert lf1 == lf2
assert lf1 != lf3
assert lf1 != lf4
assert lf1 != "not a logfile"
def test_ordering(self, tmp_path: Path) -> None:
# Newest datetime first
lf_newest = LogFile("op", "2023-10-26", "f", tmp_path, "10-00-01")
lf_older = LogFile("op", "2023-10-26", "f", tmp_path, "10-00-00")
lf_oldest_d = LogFile("op", "2023-10-25", "f", tmp_path, "12-00-00")
# Same datetime, different func_name (alphabetical)
lf_func_a = LogFile("op", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf_func_b = LogFile("op", "2023-10-26", "func_b", tmp_path, "10-00-00")
# Same datetime, same func_name, different op_key (alphabetical)
lf_op_a = LogFile("op_a", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf_op_b = LogFile("op_b", "2023-10-26", "func_a", tmp_path, "10-00-00")
assert lf_newest < lf_older # lf_newest is "less than" because it's newer
assert lf_older < lf_oldest_d
assert lf_func_a < lf_func_b
assert not (lf_func_b < lf_func_a)
assert lf_op_a < lf_op_b
assert not (lf_op_b < lf_op_a)
# Test sorting
files = [
lf_older,
lf_op_b,
lf_newest,
lf_func_a,
lf_oldest_d,
lf_op_a,
lf_func_b,
]
# Expected order (newest first, then func_name, then op_key):
# 1. lf_newest (2023-10-26 10:00:01 func_f op)
# 2. lf_func_a (2023-10-26 10:00:00 func_a op) - same time as lf_older, but func_a < func_f
# 3. lf_op_a (2023-10-26 10:00:00 func_a op_a)
# 4. lf_op_b (2023-10-26 10:00:00 func_a op_b)
# 5. lf_func_b (2023-10-26 10:00:00 func_b op)
# 6. lf_older (2023-10-26 10:00:00 func_f op)
# 7. lf_oldest_d(2023-10-25 12:00:00 func_f op)
sorted(files)
# Let's re-evaluate based on rules:
# lf_func_a is same time as lf_older. func_a < f. So lf_func_a < lf_older.
# lf_op_a is same time and func as lf_func_a. op_a > op. So lf_func_a < lf_op_a.
lf_fa_op = LogFile("op", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf_fa_opa = LogFile("op_a", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf_fa_opb = LogFile("op_b", "2023-10-26", "func_a", tmp_path, "10-00-00")
lf_fb_op = LogFile("op", "2023-10-26", "func_b", tmp_path, "10-00-00")
lf_ff_op1 = LogFile("op", "2023-10-26", "f", tmp_path, "10-00-01") # lf_newest
lf_ff_op0 = LogFile("op", "2023-10-26", "f", tmp_path, "10-00-00") # lf_older
lf_old_day = LogFile(
"op", "2023-10-25", "f", tmp_path, "12-00-00"
) # lf_oldest_d
files_redefined = [
lf_fa_op,
lf_fa_opa,
lf_fa_opb,
lf_fb_op,
lf_ff_op1,
lf_ff_op0,
lf_old_day,
]
sorted_redefined = sorted(files_redefined)
expected_redefined = [
lf_ff_op1, # Newest time
lf_ff_op0, # 2023-10-26 10:00:00, f, op
lf_fa_op, # 2023-10-26 10:00:00, func_a, op (func_a smallest)
lf_fa_opa, # 2023-10-26 10:00:00, func_a, op_a
lf_fa_opb, # 2023-10-26 10:00:00, func_a, op_b
lf_fb_op, # 2023-10-26 10:00:00, func_b, op
lf_old_day,
]
assert sorted_redefined == expected_redefined
# --- Tests for LogFuncDir ---
class TestLogFuncDir:
def test_creation_valid(self, tmp_path: Path) -> None:
lfd = LogFuncDir("2023-10-26", "my_func", tmp_path)
assert lfd.date_day == "2023-10-26"
assert lfd.func_name == "my_func"
assert lfd._base_dir == tmp_path
def test_creation_invalid_date_day(self, tmp_path: Path) -> None:
with pytest.raises(ValueError, match="not in YYYY-MM-DD format"):
LogFuncDir("2023/10/26", "my_func", tmp_path)
def test_date_obj(self, tmp_path: Path) -> None:
lfd = LogFuncDir("2023-10-26", "my_func", tmp_path)
assert lfd._date_obj == datetime.date(2023, 10, 26)
def test_get_dir_path(self, tmp_path: Path) -> None:
lfd = LogFuncDir("2023-10-26", "my_func", tmp_path)
expected = tmp_path / "2023-10-26" / "my_func"
assert lfd.get_dir_path() == expected
def test_get_log_files_empty_or_missing(self, tmp_path: Path) -> None:
lfd = LogFuncDir("2023-10-26", "non_existent_func", tmp_path)
assert lfd.get_log_files() == [] # Dir does not exist
dir_path = lfd.get_dir_path()
dir_path.mkdir(parents=True, exist_ok=True) # Dir exists but empty
assert lfd.get_log_files() == []
def test_get_log_files_populated(
self, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None:
base = tmp_path
lfd = LogFuncDir("2023-10-26", "my_func", base)
dir_path = lfd.get_dir_path()
dir_path.mkdir(parents=True, exist_ok=True)
# Create some log files
lf1_path = dir_path / "10-00-01_op1.log"
lf1_path.touch()
lf2_path = (
dir_path / "09-00-00_op2.log"
) # Older, but will be sorted newer first
lf2_path.touch()
lf3_path = dir_path / "10-00-00_op0.log" # Same time as lf1, op0 < op1
lf3_path.touch()
# Create a non-log file and a malformed log file
(dir_path / "not_a_log.txt").touch()
(
dir_path / "malformed.log"
).touch() # Will cause ValueError in LogFile.from_path
with caplog.at_level(logging.WARNING):
log_files = lfd.get_log_files()
assert len(log_files) == 3
assert any(
"Skipping malformed log file 'malformed.log'" in record.message
for record in caplog.records
)
# Expected order: newest first (10-00-00_op1, then 10-00-00_op0, then 09-00-00_op2)
# Sorting by LogFile: newest datetime first, then func_name (same here), then op_key
expected_lf1 = LogFile.from_path(lf1_path)
expected_lf2 = LogFile.from_path(lf2_path)
expected_lf3 = LogFile.from_path(lf3_path)
assert log_files[0] == expected_lf1 # 10-00-00_op1
assert log_files[1] == expected_lf3 # 10-00-00_op0 (op0 < op1)
assert log_files[2] == expected_lf2 # 09-00-00_op2
def test_equality(self, tmp_path: Path) -> None:
lfd1 = LogFuncDir("2023-10-26", "func_a", tmp_path)
lfd2 = LogFuncDir("2023-10-26", "func_a", tmp_path)
lfd3 = LogFuncDir("2023-10-27", "func_a", tmp_path) # Diff date
lfd4 = LogFuncDir("2023-10-26", "func_b", tmp_path) # Diff func_name
assert lfd1 == lfd2
assert lfd1 != lfd3
assert lfd1 != lfd4
assert lfd1 != "not a logfuncdir"
def test_ordering(self, tmp_path: Path) -> None:
# Newest date first
lfd_new_date = LogFuncDir("2023-10-27", "func_a", tmp_path)
lfd_old_date = LogFuncDir("2023-10-26", "func_a", tmp_path)
# Same date, different func_name (alphabetical)
lfd_func_a = LogFuncDir("2023-10-26", "func_a", tmp_path)
lfd_func_b = LogFuncDir("2023-10-26", "func_b", tmp_path)
assert (
lfd_new_date < lfd_old_date
) # lfd_new_date is "less than" because it's newer
assert lfd_func_a < lfd_func_b
# Expected sort: lfd_new_date, then lfd_func_a, then lfd_func_b, then lfd_old_date (if func_a different)
# but lfd_old_date and lfd_func_a are same date.
# Expected: lfd_new_date, then lfd_func_a (same date as old_date but func_a<func_a is false, it's equal so goes by obj id or first seen?)
# Ok, LogFuncDir same date, sort by func_name. lfd_old_date is func_a.
# So: lfd_new_date (2023-10-27, func_a)
# lfd_func_a (2023-10-26, func_a)
# lfd_old_date (2023-10-26, func_a) -- wait, lfd_func_a IS lfd_old_date content-wise if func_name 'func_a'
# lfd_func_b (2023-10-26, func_b)
# Redefine for clarity
lfd1 = LogFuncDir("2023-10-27", "z_func", tmp_path) # Newest date
lfd2 = LogFuncDir(
"2023-10-26", "a_func", tmp_path
) # Older date, alpha first func
lfd3 = LogFuncDir(
"2023-10-26", "b_func", tmp_path
) # Older date, alpha second func
items_redefined = [lfd3, lfd1, lfd2]
sorted_items = sorted(items_redefined)
expected_sorted = [lfd1, lfd2, lfd3]
assert sorted_items == expected_sorted
# --- Tests for LogDayDir ---
class TestLogDayDir:
def test_creation_valid(self, tmp_path: Path) -> None:
ldd = LogDayDir("2023-10-26", tmp_path)
assert ldd.date_day == "2023-10-26"
assert ldd._base_dir == tmp_path
def test_creation_invalid_date_day(self, tmp_path: Path) -> None:
with pytest.raises(ValueError, match="not in YYYY-MM-DD format"):
LogDayDir("2023/10/26", tmp_path)
def test_date_obj(self, tmp_path: Path) -> None:
ldd = LogDayDir("2023-10-26", tmp_path)
assert ldd._date_obj == datetime.date(2023, 10, 26)
def test_get_dir_path(self, tmp_path: Path) -> None:
ldd = LogDayDir("2023-10-26", tmp_path)
expected = tmp_path / "2023-10-26"
assert ldd.get_dir_path() == expected
def test_get_log_files_empty_or_missing(
self, tmp_path: Path
) -> None: # Renamed from get_log_files for clarity here
ldd = LogDayDir("2023-10-26", tmp_path)
assert ldd.get_log_files() == [] # Dir does not exist
dir_path = ldd.get_dir_path()
dir_path.mkdir(parents=True, exist_ok=True) # Dir exists but empty
assert ldd.get_log_files() == []
def test_get_log_files_populated(
self, tmp_path: Path, caplog: pytest.LogCaptureFixture
) -> None: # Renamed
base = tmp_path
ldd = LogDayDir("2023-10-26", base)
day_dir_path = ldd.get_dir_path()
day_dir_path.mkdir(parents=True, exist_ok=True)
# Create func dirs
func_a_path = day_dir_path / "func_a"
func_a_path.mkdir()
func_b_path = day_dir_path / "func_b"
func_b_path.mkdir()
# Create a non-dir and a malformed func dir name (if your logic would try to parse it)
(day_dir_path / "not_a_dir.txt").touch()
# LogDayDir's get_log_files doesn't try to parse func dir names for validity beyond being a dir
# The warning in LogDayDir.get_log_files is for ValueError from LogFuncDir init
# which can only happen if self.date_day is bad, but it's validated in LogDayDir.__post_init__.
# So, the warning there is unlikely to trigger from func_dir_path.name issues.
with caplog.at_level(logging.WARNING):
log_func_dirs = ldd.get_log_files()
assert len(log_func_dirs) == 2
# No warnings expected from this specific setup for LogDayDir.get_log_files
# assert not any("Skipping malformed function directory" in record.message for record in caplog.records)
# Expected order: func_name alphabetical (since date_day is the same for all)
expected_lfd_a = LogFuncDir("2023-10-26", "func_a", base)
expected_lfd_b = LogFuncDir("2023-10-26", "func_b", base)
assert log_func_dirs[0] == expected_lfd_a
assert log_func_dirs[1] == expected_lfd_b
def test_equality(self, tmp_path: Path) -> None:
ldd1 = LogDayDir("2023-10-26", tmp_path)
ldd2 = LogDayDir("2023-10-26", tmp_path)
ldd3 = LogDayDir("2023-10-27", tmp_path) # Diff date
ldd4 = LogDayDir("2023-10-26", tmp_path / "other_base") # Diff base
assert ldd1 == ldd2
assert ldd1 != ldd3
assert ldd1 != ldd4
assert ldd1 != "not a logdaydir"
def test_ordering(self, tmp_path: Path) -> None:
ldd_new = LogDayDir("2023-10-27", tmp_path)
ldd_old = LogDayDir("2023-10-26", tmp_path)
ldd_ancient = LogDayDir("2023-01-01", tmp_path)
assert ldd_new < ldd_old # ldd_new is "less than" because it's newer
assert ldd_old < ldd_ancient
items = [ldd_ancient, ldd_new, ldd_old]
sorted_items = sorted(items)
expected_sorted = [ldd_new, ldd_old, ldd_ancient]
assert sorted_items == expected_sorted
# --- Tests for LogManager ---
class TestLogManager:
def test_create_log_file(
self, log_manager: LogManager, base_dir: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
op_key = "test_op_123"
log_file_obj = log_manager.create_log_file(sample_func_one, op_key)
now = datetime.datetime.now(tz=datetime.UTC)
expected_date_day = now.strftime("%Y-%m-%d")
expected_date_second = now.strftime(
"%H-%M-%S"
) # Approximation, could be off by 1 sec
assert log_file_obj.op_key == op_key
assert log_file_obj.func_name == sample_func_one.__name__
assert log_file_obj.date_day == expected_date_day
# Allow for slight time difference due to execution
created_dt = (
datetime.datetime.strptime(log_file_obj.date_second, "%H-%M-%S")
.replace(tzinfo=datetime.UTC)
.time()
)
expected_dt_approx = (
datetime.datetime.strptime(expected_date_second, "%H-%M-%S")
.replace(tzinfo=datetime.UTC)
.time()
)
time_diff = datetime.datetime.combine(
datetime.date.min, created_dt
) - datetime.datetime.combine(datetime.date.min, expected_dt_approx)
assert abs(time_diff.total_seconds()) <= 1 # Allow 1 second diff
expected_path = (
base_dir
/ expected_date_day
/ sample_func_one.__name__
/ f"{log_file_obj.date_second}_{op_key}.log" # Use actual created second
)
assert expected_path.exists()
assert expected_path.is_file()
# Test creating it again (should fail)
# Need to mock datetime.now to ensure same filename for collision test
class MockDateTimeExact(datetime.datetime):
_val = datetime.datetime.strptime(
f"{log_file_obj.date_day} {log_file_obj.date_second}",
"%Y-%m-%d %H-%M-%S",
).replace(tzinfo=datetime.UTC)
@classmethod
def now(cls: Any, tz: Any = None) -> "MockDateTimeExact":
return cls._val
monkeypatch.setattr(datetime, "datetime", MockDateTimeExact)
with pytest.raises(FileExistsError, match="BUG! Log file .* already exists"):
log_manager.create_log_file(sample_func_one, op_key)
def test_list_log_days_empty(self, log_manager: LogManager) -> None:
assert log_manager.list_log_days() == []
log_manager.base_dir.mkdir() # Create base_dir but keep it empty
assert log_manager.list_log_days() == []
def test_list_log_days_populated(
self,
populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]],
caplog: pytest.LogCaptureFixture,
) -> None:
log_manager, base_dir, _ = populated_log_structure
with caplog.at_level(logging.WARNING):
day_dirs: list[LogDayDir] = log_manager.list_log_days()
assert len(day_dirs) == 2 # 2023-10-27 and 2023-10-26
assert day_dirs[0].date_day == "2023-10-27" # Newest first
assert day_dirs[1].date_day == "2023-10-26"
# Add a non-dir file to base_dir
(base_dir / "some_file.txt").touch()
day_dirs_after_file: list[LogDayDir] = log_manager.list_log_days()
assert len(day_dirs_after_file) == 2 # Should not affect count
def test_get_log_file_not_found(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, _ = populated_log_structure
assert log_manager.get_log_file("non_existent_op_key") is None
def test_get_log_file_found_no_specific_date(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, created_files = populated_log_structure
found_log_file = log_manager.get_log_file("op_key_A1")
assert found_log_file is not None
assert found_log_file == created_files["lf1"]
found_log_file_newest = log_manager.get_log_file("op_key_A3_day2")
assert found_log_file_newest is not None
assert found_log_file_newest == created_files["lf4"]
def test_get_log_file_found_with_specific_date(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, created_files = populated_log_structure
found_log_file = log_manager.get_log_file(
"op_key_A1", specific_date_day="2023-10-26"
)
assert found_log_file is not None
assert found_log_file == created_files["lf1"]
assert (
log_manager.get_log_file("op_key_A1", specific_date_day="2023-10-27")
is None
)
def test_get_log_file_specific_date_not_exists(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, _ = populated_log_structure
assert (
log_manager.get_log_file("any_op_key", specific_date_day="1999-01-01")
is None
)
def test_get_log_file_specific_date_invalid_format(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, _ = populated_log_structure
assert (
log_manager.get_log_file("any_op_key", specific_date_day="2023/01/01")
is None
)

View File

@@ -2,7 +2,9 @@ import inspect
import logging import logging
import os import os
import sys import sys
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import IO
from clan_lib.colors import AnsiColor, RgbColor, color_by_tuple from clan_lib.colors import AnsiColor, RgbColor, color_by_tuple
@@ -147,13 +149,24 @@ def print_trace(msg: str, logger: logging.Logger, prefix: str | None) -> None:
logger.debug(f"{msg} \n{callers_str}", extra={"command_prefix": prefix}) logger.debug(f"{msg} \n{callers_str}", extra={"command_prefix": prefix})
def setup_logging(level: int) -> None: @dataclass
class RegisteredHandler:
root_logger: logging.Logger
new_handler: logging.StreamHandler
def setup_logging(level: int, log_file: IO[str] | None = None) -> RegisteredHandler:
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.setLevel(level) root_logger.setLevel(level)
# Set our formatter handler # Set our formatter handler
default_handler = logging.StreamHandler() default_handler = logging.StreamHandler(log_file)
default_handler.setLevel(level) default_handler.setLevel(level)
trace_prints = bool(int(os.environ.get("TRACE_PRINT", "0"))) trace_prints = bool(int(os.environ.get("TRACE_PRINT", "0")))
default_handler.setFormatter(PrefixFormatter(trace_prints)) default_handler.setFormatter(PrefixFormatter(trace_prints))
root_logger.addHandler(default_handler) root_logger.addHandler(default_handler)
return RegisteredHandler(
root_logger=root_logger,
new_handler=default_handler,
)