Merge pull request 'clan-app: Implement dynamic groups and array based filtering of logs and groups' (#4190) from Qubasa/clan-core:add_clan_group into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4190
This commit is contained in:
Luis Hebendanz
2025-07-04 11:06:50 +00:00
18 changed files with 1750 additions and 1193 deletions

View File

@@ -8,14 +8,10 @@ from dataclasses import dataclass
from pathlib import Path
import clan_lib.machines.actions # noqa: F401
from clan_lib.api import API, tasks
# TODO: We have to manually import python files to make the API.register be triggered.
# We NEED to fix this, as this is super unintuitive and error-prone.
from clan_lib.api.tasks import list_tasks as dummy_list # noqa: F401
from clan_lib.api import API, load_in_all_api_functions, tasks
from clan_lib.custom_logger import setup_logging
from clan_lib.dirs import user_data_dir
from clan_lib.log_manager import LogManager
from clan_lib.log_manager import LogGroupConfig, LogManager
from clan_lib.log_manager import api as log_manager_api
from clan_app.api.file_gtk import open_file
@@ -45,16 +41,22 @@ def app_run(app_opts: ClanAppOptions) -> int:
webview = Webview(debug=app_opts.debug)
webview.title = "Clan App"
# This seems to call the gtk api correctly but and gtk also seems to our icon, but somehow the icon is not loaded.
# Init LogManager global in log_manager_api module
log_manager_api.LOG_MANAGER_INSTANCE = LogManager(
base_dir=user_data_dir() / "clan-app" / "logs"
# Add a log group ["clans", <dynamic_name>, "machines", <dynamic_name>]
log_manager = LogManager(base_dir=user_data_dir() / "clan-app" / "logs")
clan_log_group = LogGroupConfig("clans", "Clans").add_child(
LogGroupConfig("machines", "Machines")
)
log_manager = log_manager.add_root_group_config(clan_log_group)
# Init LogManager global in log_manager_api module
log_manager_api.LOG_MANAGER_INSTANCE = log_manager
# Init BAKEND_THREADS in tasks module
# Init BAKEND_THREADS global in tasks module
tasks.BAKEND_THREADS = webview.threads
# Populate the API global with all functions
load_in_all_api_functions()
API.overwrite_fn(open_file)
webview.bind_jsonschema_api(API, log_manager=log_manager_api.LOG_MANAGER_INSTANCE)
webview.size = Size(1280, 1024, SizeHint.NONE)

View File

@@ -1,3 +1,4 @@
# ruff: noqa: TRY301
import functools
import io
import json
@@ -66,15 +67,24 @@ class Webview:
) -> None:
op_key = op_key_bytes.decode()
args = json.loads(request_data.decode())
log.debug(f"Calling {method_name}({args})")
log.debug(f"Calling {method_name}({json.dumps(args, indent=4)})")
header: dict[str, Any]
try:
# Initialize dataclasses from the payload
reconciled_arguments = {}
if len(args) > 1:
header = args[1]
for k, v in args[0].items():
if len(args) == 1:
request = args[0]
header = request.get("header", {})
msg = f"Expected header to be a dict, got {type(header)}"
if not isinstance(header, dict):
raise TypeError(msg)
body = request.get("body", {})
msg = f"Expected body to be a dict, got {type(body)}"
if not isinstance(body, dict):
raise TypeError(msg)
for k, v in body.items():
# Some functions expect to be called with dataclass instances
# But the js api returns dictionaries.
# Introspect the function and create the expected dataclass from dict dynamically
@@ -84,8 +94,11 @@ class Webview:
# TODO: rename from_dict into something like construct_checked_value
# from_dict really takes Anything and returns an instance of the type/class
reconciled_arguments[k] = from_dict(arg_class, v)
elif len(args) == 1:
header = args[0]
elif len(args) > 1:
msg = (
"Expected a single argument, got multiple arguments to api_wrapper"
)
raise ValueError(msg)
reconciled_arguments["op_key"] = op_key
except Exception as e:
@@ -110,17 +123,39 @@ class Webview:
def thread_task(stop_event: threading.Event) -> None:
ctx: AsyncContext = get_async_ctx()
ctx.should_cancel = lambda: stop_event.is_set()
# If the API call has set log_group in metadata,
# create the log file under that group.
log_group = header.get("logging", {}).get("group", None)
if log_group is not None:
log.warning(
f"Using log group {log_group} for {method_name} with op_key {op_key}"
)
log_file = log_manager.create_log_file(
wrap_method, op_key=op_key, group=log_group
).get_file_path()
try:
# If the API call has set log_group in metadata,
# create the log file under that group.
log_group: list[str] = header.get("logging", {}).get("group_path", None)
if log_group is not None:
if not isinstance(log_group, list):
msg = f"Expected log_group to be a list, got {type(log_group)}"
raise TypeError(msg)
log.warning(
f"Using log group {log_group} for {method_name} with op_key {op_key}"
)
log_file = log_manager.create_log_file(
wrap_method, op_key=op_key, group_path=log_group
).get_file_path()
except Exception as e:
log.exception(f"Error while handling request header of {method_name}")
result = ErrorDataClass(
op_key=op_key,
status="error",
errors=[
ApiError(
message="An internal error occured",
description=str(e),
location=["header_middleware", method_name],
)
],
)
serialized = json.dumps(
dataclass_to_dict(result), indent=4, ensure_ascii=False
)
self.return_(op_key, FuncStatus.SUCCESS, serialized)
with log_file.open("ab") as log_f:
# Redirect all cmd.run logs to this file.

View File

@@ -23,42 +23,25 @@ export type SuccessQuery<T extends OperationNames> = Extract<
>;
export type SuccessData<T extends OperationNames> = SuccessQuery<T>["data"];
function isMachine(obj: unknown): obj is Machine {
return (
!!obj &&
typeof obj === "object" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (obj as any).name === "string" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (obj as any).flake === "object" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (obj as any).flake.identifier === "string"
);
}
// Machine type with flake for API calls
interface Machine {
name: string;
flake: {
identifier: string;
};
}
interface BackendOpts {
logging?: { group: string | Machine };
interface SendHeaderType {
logging?: { group_path: string[] };
}
interface BackendSendType<K extends OperationNames> {
body: OperationArgs<K>;
header?: SendHeaderType;
}
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
interface ReceiveHeaderType {}
interface BackendReturnType<K extends OperationNames> {
body: OperationResponse<K>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
header: Record<string, any>;
header: ReceiveHeaderType;
}
const _callApi = <K extends OperationNames>(
method: K,
args: OperationArgs<K>,
backendOpts?: BackendOpts,
backendOpts?: SendHeaderType,
): { promise: Promise<BackendReturnType<K>>; op_key: string } => {
// if window[method] does not exist, throw an error
if (!(method in window)) {
@@ -82,26 +65,19 @@ const _callApi = <K extends OperationNames>(
};
}
let header: BackendOpts = {};
if (backendOpts != undefined) {
header = { ...backendOpts };
const group = backendOpts?.logging?.group;
if (group != undefined && isMachine(group)) {
header = {
logging: { group: group.flake.identifier + "#" + group.name },
};
}
}
const message: BackendSendType<OperationNames> = {
body: args,
header: backendOpts,
};
const promise = (
window as unknown as Record<
OperationNames,
(
args: OperationArgs<OperationNames>,
metadata: BackendOpts,
args: BackendSendType<OperationNames>,
) => Promise<BackendReturnType<OperationNames>>
>
)[method](args, header) as Promise<BackendReturnType<K>>;
)[method](message) as Promise<BackendReturnType<K>>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const op_key = (promise as any)._webviewMessageId as string;
@@ -153,7 +129,7 @@ const handleCancel = async <K extends OperationNames>(
export const callApi = <K extends OperationNames>(
method: K,
args: OperationArgs<K>,
backendOpts?: BackendOpts,
backendOpts?: SendHeaderType,
): { promise: Promise<OperationResponse<K>>; op_key: string } => {
console.log("Calling API", method, args, backendOpts);

View File

@@ -186,6 +186,7 @@ export function RemoteForm(props: RemoteFormProps) {
props.queryFn,
props.machine?.name,
props.machine?.flake,
props.machine?.flake.identifier,
props.field || "targetHost",
],
queryFn: async () => {
@@ -209,7 +210,12 @@ export function RemoteForm(props: RemoteFormProps) {
},
{
logging: {
group: { name: props.machine.name, flake: props.machine.flake },
group_path: [
"clans",
props.machine.flake.identifier,
"machines",
props.machine.name,
],
},
},
).promise;

View File

@@ -54,7 +54,9 @@ export const MachineListItem = (props: MachineListItemProps) => {
flake: { identifier: active_clan },
name: name,
},
{ logging: { group: { name, flake: { identifier: active_clan } } } },
{
logging: { group_path: ["clans", active_clan, "machines", name] },
},
).promise;
if (target_host.status == "error") {
@@ -115,7 +117,9 @@ export const MachineListItem = (props: MachineListItemProps) => {
name: name,
},
{
logging: { group: { name, flake: { identifier: active_clan } } },
logging: {
group_path: ["clans", active_clan, "machines", name],
},
},
).promise;
@@ -141,7 +145,11 @@ export const MachineListItem = (props: MachineListItemProps) => {
flake: { identifier: active_clan },
name: name,
},
{ logging: { group: { name, flake: { identifier: active_clan } } } },
{
logging: {
group_path: ["clans", active_clan, "machines", name],
},
},
).promise;
if (build_host.status == "error") {
@@ -166,7 +174,11 @@ export const MachineListItem = (props: MachineListItemProps) => {
target_host: target_host.data!.data,
build_host: build_host.data?.data || null,
},
{ logging: { group: { name, flake: { identifier: active_clan } } } },
{
logging: {
group_path: ["clans", active_clan, "machines", name],
},
},
).promise;
setUpdating(false);

View File

@@ -85,7 +85,7 @@ export function MachineForm(props: MachineFormProps) {
},
{
logging: {
group: { name: machine_name, flake: { identifier: base_dir } },
group_path: ["clans", base_dir, "machines", machine_name],
},
},
).promise;
@@ -130,7 +130,9 @@ export function MachineForm(props: MachineFormProps) {
},
},
{
logging: { group: { name: machine, flake: { identifier: curr_uri } } },
logging: {
group_path: ["clans", curr_uri, "machines", machine],
},
},
).promise;
@@ -161,7 +163,9 @@ export function MachineForm(props: MachineFormProps) {
build_host: null,
},
{
logging: { group: { name: machine, flake: { identifier: curr_uri } } },
logging: {
group_path: ["clans", curr_uri, "machines", machine],
},
},
).promise.finally(() => {
setIsUpdating(false);

View File

@@ -158,7 +158,7 @@ export const VarsStep = (props: VarsStepProps) => {
},
{
logging: {
group: { name: props.machine_id, flake: { identifier: props.dir } },
group_path: ["clans", props.dir, "machines", props.machine_id],
},
},
).promise;

View File

@@ -16,14 +16,22 @@ export const MachineInstall = () => {
queryFn: async () => {
const curr = activeClanURI();
if (curr) {
const result = await callApi("get_machine_details", {
machine: {
flake: {
identifier: curr,
const result = await callApi(
"get_machine_details",
{
machine: {
flake: {
identifier: curr,
},
name: params.id,
},
name: params.id,
},
}).promise;
{
logging: {
group_path: ["clans", curr, "machines", params.id],
},
},
).promise;
if (result.status === "error") throw new Error("Failed to fetch data");
return result.data;
}

View File

@@ -8,7 +8,6 @@ import Icon from "@/src/components/icon";
import { Header } from "@/src/layout/header";
import { makePersisted } from "@solid-primitives/storage";
import { useClanContext } from "@/src/contexts/clan";
import { debug } from "console";
type MachinesModel = Extract<
OperationResponse<"list_machines">,

View File

@@ -23,42 +23,25 @@ export type SuccessQuery<T extends OperationNames> = Extract<
>;
export type SuccessData<T extends OperationNames> = SuccessQuery<T>["data"];
function isMachine(obj: unknown): obj is Machine {
return (
!!obj &&
typeof obj === "object" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (obj as any).name === "string" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (obj as any).flake === "object" &&
// eslint-disable-next-line @typescript-eslint/no-explicit-any
typeof (obj as any).flake.identifier === "string"
);
}
// Machine type with flake for API calls
interface Machine {
name: string;
flake: {
identifier: string;
};
}
interface BackendOpts {
logging?: { group: string | Machine };
interface SendHeaderType {
logging?: { group_path: string[] };
}
interface BackendSendType<K extends OperationNames> {
body: OperationArgs<K>;
header?: SendHeaderType;
}
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
interface ReceiveHeaderType {}
interface BackendReturnType<K extends OperationNames> {
body: OperationResponse<K>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
header: Record<string, any>;
header: ReceiveHeaderType;
}
const _callApi = <K extends OperationNames>(
method: K,
args: OperationArgs<K>,
backendOpts?: BackendOpts,
backendOpts?: SendHeaderType,
): { promise: Promise<BackendReturnType<K>>; op_key: string } => {
// if window[method] does not exist, throw an error
if (!(method in window)) {
@@ -82,26 +65,19 @@ const _callApi = <K extends OperationNames>(
};
}
let header: BackendOpts = {};
if (backendOpts != undefined) {
header = { ...backendOpts };
const group = backendOpts?.logging?.group;
if (group != undefined && isMachine(group)) {
header = {
logging: { group: group.flake.identifier + "#" + group.name },
};
}
}
const message: BackendSendType<OperationNames> = {
body: args,
header: backendOpts,
};
const promise = (
window as unknown as Record<
OperationNames,
(
args: OperationArgs<OperationNames>,
metadata: BackendOpts,
args: BackendSendType<OperationNames>,
) => Promise<BackendReturnType<OperationNames>>
>
)[method](args, header) as Promise<BackendReturnType<K>>;
)[method](message) as Promise<BackendReturnType<K>>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const op_key = (promise as any)._webviewMessageId as string;
@@ -153,7 +129,7 @@ const handleCancel = async <K extends OperationNames>(
export const callApi = <K extends OperationNames>(
method: K,
args: OperationArgs<K>,
backendOpts?: BackendOpts,
backendOpts?: SendHeaderType,
): { promise: Promise<OperationResponse<K>>; op_key: string } => {
console.log("Calling API", method, args, backendOpts);

View File

@@ -1,34 +1,12 @@
#!/usr/bin/env python3
import importlib
import json
import pkgutil
from types import ModuleType
def import_all_modules_from_package(pkg: ModuleType) -> None:
for _loader, module_name, _is_pkg in pkgutil.walk_packages(
pkg.__path__, prefix=f"{pkg.__name__}."
):
base_name = module_name.split(".")[-1]
# Skip test modules
if (
base_name.startswith("test_")
or base_name.endswith("_test")
or base_name == "conftest"
):
continue
importlib.import_module(module_name)
from clan_lib.api import load_in_all_api_functions
def main() -> None:
import clan_cli
import clan_lib
import_all_modules_from_package(clan_cli)
import_all_modules_from_package(clan_lib)
load_in_all_api_functions()
from clan_lib.api import API

View File

@@ -1,8 +1,11 @@
import importlib
import logging
import pkgutil
from collections.abc import Callable
from dataclasses import dataclass
from functools import wraps
from inspect import Parameter, Signature, signature
from types import ModuleType
from typing import (
Annotated,
Any,
@@ -12,6 +15,8 @@ from typing import (
get_type_hints,
)
from clan_lib.api.util import JSchemaTypeError
log = logging.getLogger(__name__)
from .serde import dataclass_to_dict, from_dict, sanitize_string
@@ -217,12 +222,16 @@ API.register(open_file)
for name, func in self._registry.items():
hints = get_type_hints(func)
serialized_hints = {
key: type_to_dict(
value, scope=name + " argument" if key != "return" else "return"
)
for key, value in hints.items()
}
try:
serialized_hints = {
key: type_to_dict(
value, scope=name + " argument" if key != "return" else "return"
)
for key, value in hints.items()
}
except JSchemaTypeError as e:
msg = f"Error serializing type hints for function '{name}': {e}"
raise JSchemaTypeError(msg) from e
return_type = serialized_hints.pop("return")
@@ -283,4 +292,35 @@ API.register(open_file)
return None
def import_all_modules_from_package(pkg: ModuleType) -> None:
for _loader, module_name, _is_pkg in pkgutil.walk_packages(
pkg.__path__, prefix=f"{pkg.__name__}."
):
base_name = module_name.split(".")[-1]
# Skip test modules
if (
base_name.startswith("test_")
or base_name.endswith("_test")
or base_name == "conftest"
):
continue
importlib.import_module(module_name)
def load_in_all_api_functions() -> None:
"""
For the global API object, to have all functions available.
We have to make sure python loads every wrapped function at least once.
This is done by importing all modules from the clan_lib and clan_cli packages.
"""
import clan_cli
import clan_lib
import_all_modules_from_package(clan_lib)
import_all_modules_from_package(clan_cli)
API = MethodRegistry()

View File

@@ -2,16 +2,67 @@ import datetime
import logging
import urllib.parse
from collections.abc import Callable # Union for str | None
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import total_ordering
from pathlib import Path
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class LogGroupConfig:
"""Configuration for a hierarchical log group with nickname support."""
name: str # The name of this group level (single directory name)
nickname: str | None = None # Optional display name for easier visibility
children: dict[str, "LogGroupConfig"] = field(
default_factory=dict
) # Nested child groups
def get_display_name(self) -> str:
"""Get the display name for this log group.
Returns:
The nickname if available, otherwise the group name.
"""
return self.nickname if self.nickname else self.name
def add_child(self, child: "LogGroupConfig") -> "LogGroupConfig":
"""Add a child group configuration and return a new LogGroupConfig instance.
Args:
child: The child LogGroupConfig to add.
Returns:
A new LogGroupConfig instance with the child added.
"""
new_children = {**self.children, child.name: child}
return LogGroupConfig(
name=self.name, nickname=self.nickname, children=new_children
)
def get_child(self, name: str) -> "LogGroupConfig | None":
"""Get a child group configuration by name.
Args:
name: The name of the child group to retrieve.
Returns:
The child LogGroupConfig if found, None otherwise.
"""
return self.children.get(name)
# Global helper function for format checking (used by LogManager and internally by classes)
def is_correct_day_format(date_day: str) -> bool:
"""Check if the date_day is in the correct format YYYY-MM-DD."""
"""Check if the date_day string is in the correct format YYYY-MM-DD.
Args:
date_day: The date string to validate.
Returns:
True if the date_day matches YYYY-MM-DD format, False otherwise.
"""
try:
datetime.datetime.strptime(date_day, "%Y-%m-%d").replace(tzinfo=datetime.UTC)
except ValueError:
@@ -30,6 +81,11 @@ class LogFile:
date_second: str # HH-MM-SS
def __post_init__(self) -> None:
"""Validate date and time formats after initialization.
Raises:
ValueError: If date_day or date_second are not in the correct format.
"""
# Validate formats upon initialization.
if not is_correct_day_format(self.date_day):
msg = f"LogFile.date_day '{self.date_day}' is not in YYYY-MM-DD format."
@@ -44,46 +100,42 @@ class LogFile:
@property
def _datetime_obj(self) -> datetime.datetime:
"""Get the datetime object for this log file.
Returns:
A datetime object constructed from date_day and date_second.
"""
# Formats are pre-validated by __post_init__.
return datetime.datetime.strptime(
f"{self.date_day} {self.date_second}", "%Y-%m-%d %H-%M-%S"
).replace(tzinfo=datetime.UTC)
@classmethod
def from_path(cls, file: Path) -> "LogFile":
date_day = file.parent.parent.parent.name
group = urllib.parse.unquote(file.parent.parent.name)
func_name = file.parent.name
base_dir = file.parent.parent.parent.parent
filename_stem = file.stem
parts = filename_stem.split("_", 1)
if len(parts) != 2:
msg = f"Log filename '{file.name}' in dir '{file.parent}' does not match 'HH-MM-SS_op_key.log' format."
raise ValueError(msg)
date_second_str = parts[0]
op_key_str = parts[1]
return LogFile(
op_key=op_key_str,
date_day=date_day,
group=group,
date_second=date_second_str,
func_name=func_name,
_base_dir=base_dir,
)
def get_file_path(self) -> Path:
return (
self._base_dir
/ self.date_day
/ urllib.parse.quote(self.group, safe="")
/ self.func_name
/ f"{self.date_second}_{self.op_key}.log"
)
"""Get the full file path for this log file.
Returns:
The complete Path object for this log file including nested directory structure.
"""
# Create nested directory structure for hierarchical groups
path = self._base_dir / self.date_day
# Split group by slash and create nested directories
# Dynamic elements are already URL encoded at LogFile creation time
group_components = self.group.split("/")
for component in group_components:
path = path / component
return path / self.func_name / f"{self.date_second}_{self.op_key}.log"
def __eq__(self, other: object) -> bool:
"""Check equality with another LogFile instance.
Args:
other: The object to compare with.
Returns:
True if all significant fields are equal, False otherwise.
"""
if not isinstance(other, LogFile):
return NotImplemented
# Compare all significant fields for equality
@@ -96,6 +148,16 @@ class LogFile:
)
def __lt__(self, other: object) -> bool:
"""Compare LogFile instances for sorting.
Sorting order: datetime (newest first), then group, func_name, op_key (all ascending).
Args:
other: The object to compare with.
Returns:
True if this instance should be sorted before the other.
"""
if not isinstance(other, LogFile):
return NotImplemented
# Primary sort: datetime (newest first). self is "less than" other if self is newer.
@@ -111,154 +173,31 @@ class LogFile:
return self.op_key < other.op_key
@total_ordering
@dataclass(frozen=True)
class LogFuncDir:
date_day: str
group: str
func_name: str
_base_dir: Path
def __post_init__(self) -> None:
if not is_correct_day_format(self.date_day):
msg = f"LogFuncDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
raise ValueError(msg)
@property
def _date_obj(self) -> datetime.date:
return (
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
.replace(tzinfo=datetime.UTC)
.date()
)
def get_dir_path(self) -> Path:
return (
self._base_dir
/ self.date_day
/ urllib.parse.quote(self.group, safe="")
/ self.func_name
)
def get_log_files(self) -> list[LogFile]:
dir_path = self.get_dir_path()
if not dir_path.exists() or not dir_path.is_dir():
return []
log_files_list: list[LogFile] = []
for file_path in dir_path.iterdir():
if file_path.is_file() and file_path.suffix == ".log":
try:
log_files_list.append(LogFile.from_path(file_path))
except ValueError:
log.warning(
f"Skipping malformed log file '{file_path.name}' in '{dir_path}'."
)
return sorted(log_files_list) # Sorts using LogFile.__lt__ (newest first)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogFuncDir):
return NotImplemented
return (
self.date_day == other.date_day
and self.group == other.group
and self.func_name == other.func_name
and self._base_dir == other._base_dir
)
def __lt__(self, other: object) -> bool:
if not isinstance(other, LogFuncDir):
return NotImplemented
# Primary sort: date (newest first)
if self._date_obj != other._date_obj:
return self._date_obj > other._date_obj
# Secondary sort: group (alphabetical ascending)
if self.group != other.group:
return self.group < other.group
# Tertiary sort: func_name (alphabetical ascending)
return self.func_name < other.func_name
@total_ordering
@dataclass(frozen=True)
class LogGroupDir:
date_day: str
group: str
_base_dir: Path
def __post_init__(self) -> None:
if not is_correct_day_format(self.date_day):
msg = f"LogGroupDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
raise ValueError(msg)
@property
def _date_obj(self) -> datetime.date:
return (
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
.replace(tzinfo=datetime.UTC)
.date()
)
def get_dir_path(self) -> Path:
return self._base_dir / self.date_day / urllib.parse.quote(self.group, safe="")
def get_log_files(self) -> list[LogFuncDir]:
dir_path = self.get_dir_path()
if not dir_path.exists() or not dir_path.is_dir():
return []
func_dirs_list: list[LogFuncDir] = []
for func_dir_path in dir_path.iterdir():
if func_dir_path.is_dir():
try:
func_dirs_list.append(
LogFuncDir(
date_day=self.date_day,
group=self.group,
func_name=func_dir_path.name,
_base_dir=self._base_dir,
)
)
except ValueError:
log.warning(
f"Skipping malformed function directory '{func_dir_path.name}' in '{dir_path}'."
)
return sorted(func_dirs_list)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogGroupDir):
return NotImplemented
return (
self.date_day == other.date_day
and self.group == other.group
and self._base_dir == other._base_dir
)
def __lt__(self, other: object) -> bool:
if not isinstance(other, LogGroupDir):
return NotImplemented
# Primary sort: date (newest first)
if self._date_obj != other._date_obj:
return self._date_obj > other._date_obj
# Secondary sort: group (alphabetical ascending)
return self.group < other.group
@total_ordering
@dataclass(frozen=True)
class LogDayDir:
"""Represents a single day's log directory."""
date_day: str
_base_dir: Path
def __post_init__(self) -> None:
"""Validate date format after initialization.
Raises:
ValueError: If date_day is not in YYYY-MM-DD format.
"""
if not is_correct_day_format(self.date_day):
msg = f"LogDayDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
raise ValueError(msg)
@property
def _date_obj(self) -> datetime.date:
"""Get the date object for this log day directory.
Returns:
A date object constructed from date_day.
"""
return (
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
.replace(tzinfo=datetime.UTC)
@@ -266,39 +205,37 @@ class LogDayDir:
)
def get_dir_path(self) -> Path:
"""Get the directory path for this log day.
Returns:
The Path object for this day's log directory.
"""
return self._base_dir / self.date_day
def get_log_files(self) -> list[LogGroupDir]:
dir_path = self.get_dir_path()
if not dir_path.exists() or not dir_path.is_dir():
return []
group_dirs_list: list[LogGroupDir] = []
# First level: group directories
for group_dir_path in dir_path.iterdir():
if group_dir_path.is_dir():
group_name = urllib.parse.unquote(group_dir_path.name)
try:
group_dirs_list.append(
LogGroupDir(
date_day=self.date_day,
group=group_name,
_base_dir=self._base_dir,
)
)
except ValueError:
log.warning(
f"Warning: Skipping malformed group directory '{group_dir_path.name}' in '{dir_path}'."
)
return sorted(group_dirs_list)
def __eq__(self, other: object) -> bool:
"""Check equality with another LogDayDir instance.
Args:
other: The object to compare with.
Returns:
True if date_day and base_dir are equal, False otherwise.
"""
if not isinstance(other, LogDayDir):
return NotImplemented
return self.date_day == other.date_day and self._base_dir == other._base_dir
def __lt__(self, other: object) -> bool:
"""Compare LogDayDir instances for sorting.
Sorting order: date (newest first).
Args:
other: The object to compare with.
Returns:
True if this instance should be sorted before the other.
"""
if not isinstance(other, LogDayDir):
return NotImplemented
# Primary sort: date (newest first)
@@ -307,20 +244,105 @@ class LogDayDir:
@dataclass(frozen=True)
class LogManager:
"""Manages hierarchical log files with group configurations and filtering capabilities.
Provides functionality to create, search, and organize log files in a hierarchical
directory structure with support for dynamic group names and nicknames.
Attributes:
base_dir: The base directory where all log files are stored.
root_group_configs: Dictionary of root-level group configurations.
"""
base_dir: Path
root_group_configs: dict[str, LogGroupConfig] = field(default_factory=dict)
def add_root_group_config(self, group_config: LogGroupConfig) -> "LogManager":
"""Return a new LogManager with the added root-level group configuration.
Args:
group_config: The root-level group configuration to add.
Returns:
A new LogManager instance with the group configuration added.
"""
new_configs = {**self.root_group_configs, group_config.name: group_config}
return LogManager(base_dir=self.base_dir, root_group_configs=new_configs)
def find_group_config(self, group_path: list[str]) -> LogGroupConfig | None:
"""Find group configuration by traversing the hierarchical path.
Only looks at structure elements (even indices), ignoring dynamic names (odd indices).
Args:
group_path: The group path components to search for.
Returns:
The LogGroupConfig if found, None otherwise.
"""
if not group_path:
return None
current_config = self.root_group_configs.get(group_path[0])
if not current_config:
return None
# If only root group, return it
if len(group_path) == 1:
return current_config
# Traverse down the hierarchy, only looking at structure elements (even indices)
for i in range(2, len(group_path), 2):
structure_name = group_path[i]
current_config = current_config.get_child(structure_name)
if not current_config:
return None
return current_config
def create_log_file(
self, func: Callable, op_key: str, group: str | None = None
self, func: Callable, op_key: str, group_path: list[str] | None = None
) -> LogFile:
"""Create a new log file for the given function and operation.
Args:
func: The function to create a log file for.
op_key: The operation key identifier.
group_path: Optional group path components. Defaults to ["default"].
Returns:
A new LogFile instance with the log file created on disk.
Raises:
ValueError: If the group structure is not registered.
FileExistsError: If the log file already exists.
"""
now_utc = datetime.datetime.now(tz=datetime.UTC)
if group is None:
group = "default"
if group_path is None:
group_path = ["default"]
# Validate that the group path structure is registered in the configuration
if not self._is_group_path_registered(group_path):
group_str = "/".join(group_path)
msg = f"Group structure '{group_str}' is not valid. Root group '{group_path[0]}' or structure elements at even indices are not registered."
raise ValueError(msg)
# URL encode dynamic elements (odd indices) before creating group string
encoded_group_path = []
for i, component in enumerate(group_path):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
encoded_group_path.append(urllib.parse.quote(component, safe=""))
else: # Even index = structure element, no encoding needed
encoded_group_path.append(component)
# Convert encoded path to string for LogFile
group_str = "/".join(encoded_group_path)
log_file = LogFile(
op_key=op_key,
date_day=now_utc.strftime("%Y-%m-%d"),
group=group,
group=group_str,
date_second=now_utc.strftime("%H-%M-%S"), # Corrected original's %H-$M-%S
func_name=func.__name__,
_base_dir=self.base_dir,
@@ -336,7 +358,75 @@ class LogManager:
log_path.touch()
return log_file
def _is_group_path_registered(self, group_path: list[str]) -> bool:
"""Check if the given group path structure is registered in the configuration.
This validates the group structure (e.g., clans/<name>/machines) but allows
dynamic names (e.g., <name> can be any value).
Args:
group_path: The group path components to validate.
Returns:
True if the group structure is registered, False otherwise.
"""
# Special case: allow "default" group without registration
if group_path == ["default"]:
return True
# For dynamic group validation, we need to check if the structure exists
# by matching the pattern, not the exact path
return self._validate_group_structure(group_path)
def _validate_group_structure(self, group_path: list[str]) -> bool:
"""Validate that the group structure exists, allowing dynamic names.
Pattern alternates: structure -> dynamic -> structure -> dynamic -> ...
- Even indices (0, 2, 4, ...): must be registered group names (structure elements)
- Odd indices (1, 3, 5, ...): can be any dynamic names (will be URL encoded)
Examples:
- ["clans", "repo-name", "default"] -> clans(structure) -> repo-name(dynamic) -> default(structure)
- ["clans", "repo-name", "machines", "machine-name"] -> clans(struct) -> repo-name(dyn) -> machines(struct) -> machine-name(dyn)
Args:
group_path: The group path components to validate.
Returns:
True if the group structure is valid, False otherwise.
"""
if not group_path:
return False
# Check if root group exists (index 0 - always structure)
root_group = group_path[0]
if root_group not in self.root_group_configs:
return False
if len(group_path) == 1:
return True
# For longer paths, traverse the structure elements only
current_config = self.root_group_configs[root_group]
# Check all structure elements (even indices starting from 2)
for i in range(2, len(group_path), 2):
structure_name = group_path[i]
# Look for this structure in current config's children
if structure_name not in current_config.children:
return False
current_config = current_config.children[structure_name]
return True
def list_log_days(self) -> list[LogDayDir]:
"""List all available log days in the base directory.
Returns:
A sorted list of LogDayDir instances (newest first). Returns empty list if base directory doesn't exist.
"""
if not self.base_dir.exists() or not self.base_dir.is_dir():
return []
@@ -361,43 +451,211 @@ class LogManager:
def get_log_file(
self,
op_key_to_find: str,
specific_date_day: str | None = None,
specific_group: str | None = None,
op_key: str,
*,
date_day: str | None = None,
selector: list[str] | None = None,
) -> LogFile | None:
"""Get a specific log file by operation key.
Args:
op_key: The operation key to search for.
date_day: Optional specific date to search in (YYYY-MM-DD format).
selector: Optional group path to search in. If None, searches all groups.
Returns:
The LogFile if found, None otherwise.
"""
days_to_search: list[LogDayDir]
if specific_date_day:
if not is_correct_day_format(specific_date_day):
# print(f"Warning: Provided specific_date_day '{specific_date_day}' is not in YYYY-MM-DD format.")
if date_day:
if not is_correct_day_format(date_day):
return None
try:
target_day_dir = LogDayDir(
date_day=specific_date_day, _base_dir=self.base_dir
date_day=date_day,
_base_dir=self.base_dir,
)
if (
not target_day_dir.get_dir_path().exists()
): # Check if dir exists on disk
if not target_day_dir.get_dir_path().exists():
return None
days_to_search = [target_day_dir] # Search only this specific day
except ValueError: # If LogDayDir construction fails (e.g. date_day format despite is_correct_day_format)
days_to_search = [target_day_dir]
except ValueError:
return None
else:
days_to_search = self.list_log_days() # Already sorted, newest day first
days_to_search = self.list_log_days()
for day_dir in (
days_to_search
): # Iterates newest day first if days_to_search came from list_log_days()
# day_dir.get_log_files() returns List[LogGroupDir], sorted by group name
for group_dir in day_dir.get_log_files():
# Skip this group if specific_group is provided and doesn't match
if specific_group is not None and group_dir.group != specific_group:
# Search for the log file directly using filesystem traversal
for day_dir in days_to_search:
result = self._find_log_file_in_day(day_dir, op_key, selector)
if result:
return result
return None
def _find_log_file_in_day(
self, day_dir: LogDayDir, op_key: str, selector: list[str] | None = None
) -> LogFile | None:
"""Find a log file in a specific day directory.
Args:
day_dir: The LogDayDir to search in.
op_key: The operation key to search for.
selector: Optional group path to search in. If None, searches all groups.
Returns:
The LogFile if found, None otherwise.
"""
base_path = day_dir.get_dir_path()
if selector is not None:
# Search in specific group path
search_path = base_path
for i, component in enumerate(selector):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
search_path = search_path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed
search_path = search_path / component
if search_path.exists() and search_path.is_dir():
return self._search_in_path(search_path, op_key, selector)
else:
# Search all groups in this day
if base_path.exists() and base_path.is_dir():
return self._search_in_path(base_path, op_key, None)
return None
def _search_in_path(
self, search_path: Path, op_key: str, group_path: list[str] | None
) -> LogFile | None:
"""Search for log files in a given path.
Args:
search_path: The path to search in.
op_key: The operation key to search for.
group_path: The group path used to construct the LogFile.
Returns:
The LogFile if found, None otherwise.
"""
log_files: list[LogFile] = []
# Recursively search for log files
for log_file_path in search_path.rglob("*.log"):
if log_file_path.is_file():
try:
# Parse filename to get op_key and time
filename_stem = log_file_path.stem
parts = filename_stem.split("_", 1)
if len(parts) == 2:
date_second_str, file_op_key = parts
if file_op_key == op_key:
# Find the base directory (contains date directories)
base_dir = self.base_dir
# Get path relative to base directory
try:
relative_to_base = log_file_path.relative_to(base_dir)
path_parts = relative_to_base.parts
if len(path_parts) >= 3: # date/[groups...]/func/file
date_day = path_parts[0]
func_name = path_parts[
-2
] # Second to last is function name
group_parts = path_parts[
1:-2
] # Between date and function
# Create group string (already URL encoded in filesystem)
group_str = (
"/".join(group_parts)
if group_parts
else "default"
)
if is_correct_day_format(date_day):
log_file = LogFile(
op_key=file_op_key,
date_day=date_day,
group=group_str,
func_name=func_name,
_base_dir=self.base_dir,
date_second=date_second_str,
)
log_files.append(log_file)
except ValueError:
# Skip files that can't be made relative to base_dir
continue
except (ValueError, IndexError):
# Skip malformed files
continue
# group_dir.get_log_files() returns List[LogFuncDir], sorted by func_name
for func_dir in group_dir.get_log_files():
# func_dir.get_log_files() returns List[LogFile], sorted newest file first
for log_file in func_dir.get_log_files():
if log_file.op_key == op_key_to_find:
return log_file
# Return the newest log file if any found
if log_files:
return sorted(log_files)[0] # LogFile.__lt__ sorts newest first
return None
def filter(
self, selector: list[str] | None = None, date_day: str | None = None
) -> list[str]:
"""Filter and list folders at the specified hierarchical path.
Args:
selector: List of path components to navigate to. Empty list returns top-level groups.
For alternating structure/dynamic pattern:
- ["clans"] lists all dynamic names under clans
- ["clans", <name>, "machines"] lists all dynamic names under machines
- [] lists all top-level groups
date_day: Optional date to filter by (YYYY-MM-DD format). If None, uses most recent day.
Returns:
List of folder names (decoded) at the specified path level.
"""
if selector is None:
selector = []
# Get the day to search in
if date_day is None:
days = self.list_log_days()
if not days:
return []
day_dir = days[0] # Most recent day
else:
if not is_correct_day_format(date_day):
return []
try:
day_dir = LogDayDir(
date_day=date_day,
_base_dir=self.base_dir,
)
if not day_dir.get_dir_path().exists():
return []
except ValueError:
return []
# Empty path means list top-level groups
if not selector:
return list(self.root_group_configs.keys())
# Build the directory path to search in
dir_path = day_dir.get_dir_path()
for i, component in enumerate(selector):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
dir_path = dir_path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed
dir_path = dir_path / component
if not dir_path.exists() or not dir_path.is_dir():
return []
# List directories and decode their names
folder_names = []
for subdir_path in dir_path.iterdir():
if subdir_path.is_dir():
# Decode the directory name
decoded_name = urllib.parse.unquote(subdir_path.name)
folder_names.append(decoded_name)
return sorted(folder_names)

View File

@@ -1,53 +1,69 @@
from clan_lib.api import API
from clan_lib.errors import ClanError
from clan_lib.log_manager import LogDayDir, LogFile, LogFuncDir, LogGroupDir, LogManager
from clan_lib.log_manager import LogManager
LOG_MANAGER_INSTANCE: LogManager | None = None
@API.register
def list_log_days() -> list[LogDayDir]:
"""List all logs."""
def list_log_days() -> list[str]:
"""List all available log days.
Returns:
A list of date strings in YYYY-MM-DD format representing all available log days.
Raises:
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
"""
assert LOG_MANAGER_INSTANCE is not None
return LOG_MANAGER_INSTANCE.list_log_days()
return [day.date_day for day in LOG_MANAGER_INSTANCE.list_log_days()]
@API.register
def list_log_groups(date_day: str) -> list[LogGroupDir]:
"""List all log groups."""
def list_log_groups(
selector: list[str] | None, date_day: str | None = None
) -> list[str]:
"""List all log groups at the specified hierarchical path.
Args:
selector: List of path components to navigate to. Empty list returns top-level groups.
date_day: Optional date to filter by (YYYY-MM-DD format). If None, uses most recent day.
Returns:
A list of folder names (decoded) at the specified path level.
Raises:
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
"""
assert LOG_MANAGER_INSTANCE is not None
day_dir = LogDayDir(date_day, LOG_MANAGER_INSTANCE.base_dir)
return day_dir.get_log_files()
return LOG_MANAGER_INSTANCE.filter(selector, date_day=date_day)
@API.register
def list_log_funcs_at_day(date_day: str, group: str) -> list[LogFuncDir]:
"""List all logs for a specific function on a specific day."""
assert LOG_MANAGER_INSTANCE is not None
group_dir = LogGroupDir(date_day, group, LOG_MANAGER_INSTANCE.base_dir)
return group_dir.get_log_files()
def get_log_file(
id_key: str, selector: list[str] | None = None, date_day: str | None = None
) -> str:
"""Get the contents of a specific log file by operation key.
Args:
id_key: The operation key to search for.
selector: Optional group path to search in. If None, searches all groups.
date_day: Optional specific date to search in (YYYY-MM-DD format). If None, searches all days.
@API.register
def list_log_files(date_day: str, group: str, func_name: str) -> list[LogFile]:
"""List all log files for a specific function on a specific day."""
assert LOG_MANAGER_INSTANCE is not None
func_dir = LogFuncDir(date_day, group, func_name, LOG_MANAGER_INSTANCE.base_dir)
return func_dir.get_log_files()
Returns:
The contents of the log file as a string.
@API.register
def get_log_file(id_key: str, group: str | None = None) -> str:
"""Get a specific log file by op_key, function name and day."""
Raises:
ClanError: If the log file is not found.
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
"""
assert LOG_MANAGER_INSTANCE is not None
log_file = LOG_MANAGER_INSTANCE.get_log_file(id_key, specific_group=group)
log_file = LOG_MANAGER_INSTANCE.get_log_file(
op_key=id_key, selector=selector, date_day=date_day
)
if log_file is None:
return ""
file_path = log_file.get_file_path()
if not file_path.exists():
msg = f"Log file {file_path} does not exist."
msg = f"Log file with op_key '{id_key}' not found in selector '{selector}' and date_day '{date_day}'."
raise ClanError(msg)
return file_path.read_text()
return log_file.get_file_path().read_text()

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""
Simple LogManager example with filter function.
This demonstrates:
- Dynamic group names with URL encoding
- Hierarchical structure navigation using the filter function
- Pattern: clans -> <dynamic_name> -> machines -> <dynamic_name>
"""
from pathlib import Path
from clan_lib.log_manager import LogGroupConfig, LogManager
def example_function() -> None:
"""Example function for creating logs."""
def deploy_machine() -> None:
"""Function for deploying machines."""
def main() -> None:
"""Simple LogManager demonstration with filter function."""
# Setup
log_manager = LogManager(base_dir=Path("/tmp/clan_logs"))
# Configure structure: clans -> <dynamic> -> machines -> <dynamic>
clans_config = LogGroupConfig("clans", "Clans")
machines_config = LogGroupConfig("machines", "Machines")
clans_config = clans_config.add_child(machines_config)
log_manager = log_manager.add_root_group_config(clans_config)
print("=== LogManager Filter Function Example ===\n")
# Create some example logs
repos = ["/home/user/Projects/qubasas_clan", "https://github.com/qubasa/myclan"]
machines = ["wintux", "demo", "gchq-local"]
for repo in repos:
for machine in machines:
log_manager.create_log_file(
deploy_machine,
f"deploy_{machine}",
["clans", repo, "machines", machine],
)
print("Created log files for multiple repos and machines\n")
# Demonstrate filter function
print("=== Using the filter() function ===")
# 1. List top-level groups
top_level = log_manager.filter([])
print(f"1. Top-level groups: {top_level}")
# 2. List all repositories under 'clans'
clans_repos = log_manager.filter(["clans"])
print(f"2. Repositories under clans: {clans_repos}")
# 3. List machines under first repository
if clans_repos:
first_repo = clans_repos[0]
repo_machines = log_manager.filter(["clans", first_repo, "machines"])
print(f"3. Machines under '{first_repo}': {repo_machines}")
# 4. List machines under second repository
if len(clans_repos) > 1:
second_repo = clans_repos[1]
repo_machines = log_manager.filter(["clans", second_repo, "machines"])
print(f"4. Machines under '{second_repo}': {repo_machines}")
print("\n=== Using get_log_file with arrays ===")
# Demonstrate the new array-based get_log_file functionality
if clans_repos and len(clans_repos) > 0:
specific_log = log_manager.get_log_file(
"deploy_wintux",
selector=["clans", clans_repos[0], "machines", "wintux"],
)
if specific_log:
print(
f"5. Found specific log: {specific_log.op_key} in {specific_log.func_name}"
)
else:
print("5. Specific log not found")
print("\n=== Key Features ===")
print("✓ Dynamic names with special chars (/, spaces, etc.) work")
print("✓ Names are URL encoded in filesystem but returned decoded")
print("✓ Filter function navigates hierarchy with simple arrays")
print("✓ get_log_file now accepts specific_group as array")
print("✓ Empty array [] lists top-level groups")
print("✓ Odd indices are dynamic, even indices are structure")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,187 @@
# Test file specifically for URL encoding functionality
import urllib.parse
from pathlib import Path
from clan_lib.log_manager import LogGroupConfig, LogManager
def sample_function() -> None:
"""Sample function for testing."""
class TestURLEncoding:
"""Test URL encoding for dynamic group names."""
def test_dynamic_name_url_encoding_forward_slash(self, tmp_path: Path) -> None:
"""Test that dynamic names with forward slashes get URL encoded."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
default_config = LogGroupConfig("default", "Default")
clans_config = clans_config.add_child(default_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Use a dynamic name with forward slashes
dynamic_name = "/home/user/Projects/qubasas_clan"
group_path = ["clans", dynamic_name, "default"]
log_file = log_manager.create_log_file(sample_function, "test_op", group_path)
# Check that the LogFile uses encoded path for file system operations
file_path = log_file.get_file_path()
expected_encoded = urllib.parse.quote(dynamic_name, safe="")
# Verify the encoded name appears in the file path
assert expected_encoded in str(file_path)
assert file_path.exists()
# Verify that no intermediate directories were created from the forward slashes
# The encoded name should be a single directory
day_dir = tmp_path / log_file.date_day / "clans"
direct_children = [p.name for p in day_dir.iterdir() if p.is_dir()]
assert len(direct_children) == 1
assert direct_children[0] == expected_encoded
def test_dynamic_name_url_encoding_special_characters(self, tmp_path: Path) -> None:
"""Test URL encoding of dynamic names with various special characters."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
machines_config = LogGroupConfig("machines", "Machines")
clans_config = clans_config.add_child(machines_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Test various special characters
test_cases = [
"repo with spaces",
"repo&with&ampersands",
"repo!with!exclamations",
"repo%with%percent",
"repo@with@symbols",
"repo#with#hash",
"repo+with+plus",
]
for dynamic_name in test_cases:
group_path = ["clans", dynamic_name, "machines", f"machine-{dynamic_name}"]
log_file = log_manager.create_log_file(
sample_function, f"test_{dynamic_name}", group_path
)
# Check that the file was created and encoded names appear in path
file_path = log_file.get_file_path()
assert file_path.exists()
# Verify encoding for both dynamic elements (indices 1 and 3)
expected_encoded_repo = urllib.parse.quote(dynamic_name, safe="")
expected_encoded_machine = urllib.parse.quote(
f"machine-{dynamic_name}", safe=""
)
assert expected_encoded_repo in str(file_path)
assert expected_encoded_machine in str(file_path)
def test_structure_elements_not_encoded(self, tmp_path: Path) -> None:
"""Test that structure elements (even indices) are NOT URL encoded."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements with special characters in their names
# (though this is not typical, testing to ensure they're not encoded)
test_config = LogGroupConfig("test-group", "Test Group")
sub_config = LogGroupConfig("sub-group", "Sub Group")
test_config = test_config.add_child(sub_config)
log_manager = log_manager.add_root_group_config(test_config)
# Use structure names that contain hyphens (common case)
group_path = ["test-group", "dynamic-name", "sub-group", "another-dynamic"]
log_file = log_manager.create_log_file(sample_function, "test_op", group_path)
file_path = log_file.get_file_path()
# Structure elements should NOT be encoded
assert "test-group" in str(file_path) # Structure element, not encoded
assert "sub-group" in str(file_path) # Structure element, not encoded
# Dynamic elements should be encoded
expected_dynamic1 = urllib.parse.quote("dynamic-name", safe="")
expected_dynamic2 = urllib.parse.quote("another-dynamic", safe="")
assert expected_dynamic1 in str(file_path)
assert expected_dynamic2 in str(file_path)
def test_url_encoding_with_unicode_characters(self, tmp_path: Path) -> None:
"""Test URL encoding with Unicode characters in dynamic names."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
default_config = LogGroupConfig("default", "Default")
clans_config = clans_config.add_child(default_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Use Unicode characters in dynamic name
dynamic_name = "项目/中文/测试" # Chinese characters with slashes
group_path = ["clans", dynamic_name, "default"]
log_file = log_manager.create_log_file(
sample_function, "unicode_test", group_path
)
file_path = log_file.get_file_path()
# Check that file was created and Unicode was properly encoded
assert file_path.exists()
expected_encoded = urllib.parse.quote(dynamic_name, safe="")
assert expected_encoded in str(file_path)
# Verify no intermediate directories from slashes in Unicode string
day_dir = tmp_path / log_file.date_day / "clans"
direct_children = [p.name for p in day_dir.iterdir() if p.is_dir()]
assert len(direct_children) == 1
assert direct_children[0] == expected_encoded
def test_backward_compatibility_single_element_paths(self, tmp_path: Path) -> None:
"""Test that single-element paths (no dynamic names) still work."""
log_manager = LogManager(base_dir=tmp_path)
# Register simple structure
default_config = LogGroupConfig("default", "Default")
log_manager = log_manager.add_root_group_config(default_config)
# Use simple single-element path (no dynamic names to encode)
group_path = ["default"]
log_file = log_manager.create_log_file(
sample_function, "simple_test", group_path
)
file_path = log_file.get_file_path()
# Should work exactly as before
assert file_path.exists()
assert "default" in str(file_path)
# No encoding should have occurred
assert urllib.parse.quote("default", safe="") == "default" # No special chars
def test_empty_dynamic_name_encoding(self, tmp_path: Path) -> None:
"""Test URL encoding with empty string as dynamic name."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
default_config = LogGroupConfig("default", "Default")
clans_config = clans_config.add_child(default_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Use empty string as dynamic name
group_path = ["clans", "", "default"]
log_file = log_manager.create_log_file(
sample_function, "empty_test", group_path
)
file_path = log_file.get_file_path()
# Should work - empty string gets encoded as empty string
assert file_path.exists()
expected_encoded = urllib.parse.quote("", safe="")
assert expected_encoded == "" # Empty string encodes to empty string

View File

@@ -16,6 +16,7 @@ mkShell {
with ps;
[
mypy
pytest-cov
]
++ (clan-cli.devshellPyDeps ps)
))