clan-app: Implement dynamic groups and array based filtering of logs and groups

This commit is contained in:
Qubasa
2025-07-03 17:49:34 +07:00
parent 903331a789
commit 85670d1dda
6 changed files with 2094 additions and 111 deletions

View File

@@ -2,13 +2,43 @@ import datetime
import logging
import urllib.parse
from collections.abc import Callable # Union for str | None
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import total_ordering
from pathlib import Path
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class LogGroupConfig:
"""Configuration for a hierarchical log group with nickname support."""
name: str # The name of this group level (single directory name)
nickname: str | None = None # Optional display name for easier visibility
children: dict[str, "LogGroupConfig"] = field(
default_factory=dict
) # Nested child groups
def get_display_name(self) -> str:
"""Get the display name (nickname if available, otherwise the name)."""
return self.nickname if self.nickname else self.name
def add_child(self, child: "LogGroupConfig") -> "LogGroupConfig":
"""Add a child group configuration and return a new LogGroupConfig instance."""
new_children = {**self.children, child.name: child}
return LogGroupConfig(
name=self.name, nickname=self.nickname, children=new_children
)
def get_child(self, name: str) -> "LogGroupConfig | None":
"""Get a child group by name."""
return self.children.get(name)
def get_path_components(self) -> list[str]:
"""Get the path components for this group (just the name as a single component)."""
return [self.name]
# Global helper function for format checking (used by LogManager and internally by classes)
def is_correct_day_format(date_day: str) -> bool:
"""Check if the date_day is in the correct format YYYY-MM-DD."""
@@ -51,11 +81,33 @@ class LogFile:
@classmethod
def from_path(cls, file: Path) -> "LogFile":
date_day = file.parent.parent.parent.name
group = urllib.parse.unquote(file.parent.parent.name)
# Work backwards from the file path to reconstruct the hierarchical group structure
func_name = file.parent.name
# Traverse up from func_dir to find the date_day directory
current_path = file.parent.parent # Start from group level
group_components: list[str] = []
while (
current_path.parent.name != current_path.parent.parent.name
): # Until we reach base_dir
parent_name = current_path.name
# Check if this looks like a date directory (YYYY-MM-DD format)
if is_correct_day_format(parent_name):
date_day = parent_name
base_dir = current_path.parent
break
# This is a group component, URL decode it
group_components.insert(0, urllib.parse.unquote(parent_name))
current_path = current_path.parent
else:
# Fallback: assume single-level structure
date_day = file.parent.parent.parent.name
group_components = [urllib.parse.unquote(file.parent.parent.name)]
base_dir = file.parent.parent.parent.parent
group = "/".join(group_components)
filename_stem = file.stem
parts = filename_stem.split("_", 1)
if len(parts) != 2:
@@ -75,13 +127,16 @@ class LogFile:
)
def get_file_path(self) -> Path:
return (
self._base_dir
/ self.date_day
/ urllib.parse.quote(self.group, safe="")
/ self.func_name
/ f"{self.date_second}_{self.op_key}.log"
)
# Create nested directory structure for hierarchical groups
path = self._base_dir / self.date_day
# Split group by slash and create nested directories
# Dynamic elements are already URL encoded at LogFile creation time
group_components = self.group.split("/")
for component in group_components:
path = path / component
return path / self.func_name / f"{self.date_second}_{self.op_key}.log"
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogFile):
@@ -133,12 +188,16 @@ class LogFuncDir:
)
def get_dir_path(self) -> Path:
return (
self._base_dir
/ self.date_day
/ urllib.parse.quote(self.group, safe="")
/ self.func_name
)
# Create nested directory structure for hierarchical groups
path = self._base_dir / self.date_day
# Split group by slash and create nested directories
# Dynamic elements are already URL encoded at LogFile creation time
group_components = self.group.split("/")
for component in group_components:
path = path / component
return path / self.func_name
def get_log_files(self) -> list[LogFile]:
dir_path = self.get_dir_path()
@@ -184,8 +243,21 @@ class LogFuncDir:
@dataclass(frozen=True)
class LogGroupDir:
date_day: str
group: str
group_path: list[
str
] # Path components for nested groups, e.g., ["flakes", "flake1", "machines"]
_base_dir: Path
nickname: str | None = None
@property
def group_name(self) -> str:
"""Get the name of this group level (last component of path)."""
return self.group_path[-1] if self.group_path else ""
@property
def full_group_path(self) -> str:
"""Get the full group path as a slash-separated string."""
return "/".join(self.group_path)
def __post_init__(self) -> None:
if not is_correct_day_format(self.date_day):
@@ -201,7 +273,51 @@ class LogGroupDir:
)
def get_dir_path(self) -> Path:
return self._base_dir / self.date_day / urllib.parse.quote(self.group, safe="")
"""Get the directory path for this nested group."""
path = self._base_dir / self.date_day
for i, component in enumerate(self.group_path):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
path = path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed
path = path / component
return path
def get_display_name(self) -> str:
"""Get the display name (nickname if available, otherwise group name)."""
return self.nickname if self.nickname else self.group_name
def get_nested_groups(self) -> list["LogGroupDir"]:
"""Get nested LogGroupDir instances within this group."""
dir_path = self.get_dir_path()
if not dir_path.exists() or not dir_path.is_dir():
return []
nested_groups: list[LogGroupDir] = []
for subdir_path in dir_path.iterdir():
if subdir_path.is_dir():
# Check if this is a group directory (contains other groups) or a function directory
# Function directories should contain .log files, group directories should contain other directories
contains_log_files = any(
f.suffix == ".log"
for f in subdir_path.rglob("*.log")
if f.parent == subdir_path
)
contains_subdirs = any(p.is_dir() for p in subdir_path.iterdir())
# If it contains subdirectories but no direct log files, it's likely a nested group
if contains_subdirs and not contains_log_files:
group_name = urllib.parse.unquote(subdir_path.name)
nested_path = [*self.group_path, group_name]
nested_groups.append(
LogGroupDir(
date_day=self.date_day,
group_path=nested_path,
_base_dir=self._base_dir,
nickname=None, # Will be populated by LogManager if configured
)
)
return sorted(nested_groups)
def get_log_files(self) -> list[LogFuncDir]:
dir_path = self.get_dir_path()
@@ -211,11 +327,17 @@ class LogGroupDir:
func_dirs_list: list[LogFuncDir] = []
for func_dir_path in dir_path.iterdir():
if func_dir_path.is_dir():
# Only include directories that actually contain log files (function directories)
# Skip directories that contain other directories (nested groups)
contains_log_files = any(
f.suffix == ".log" for f in func_dir_path.iterdir() if f.is_file()
)
if contains_log_files:
try:
func_dirs_list.append(
LogFuncDir(
date_day=self.date_day,
group=self.group,
group=self.full_group_path,
func_name=func_dir_path.name,
_base_dir=self._base_dir,
)
@@ -232,8 +354,9 @@ class LogGroupDir:
return NotImplemented
return (
self.date_day == other.date_day
and self.group == other.group
and self.group_path == other.group_path
and self._base_dir == other._base_dir
and self.nickname == other.nickname
)
def __lt__(self, other: object) -> bool:
@@ -242,8 +365,8 @@ class LogGroupDir:
# Primary sort: date (newest first)
if self._date_obj != other._date_obj:
return self._date_obj > other._date_obj
# Secondary sort: group (alphabetical ascending)
return self.group < other.group
# Secondary sort: group path (alphabetical ascending)
return self.group_path < other.group_path
@total_ordering
@@ -251,6 +374,7 @@ class LogGroupDir:
class LogDayDir:
date_day: str
_base_dir: Path
group_configs: dict[str, LogGroupConfig] = field(default_factory=dict)
def __post_init__(self) -> None:
if not is_correct_day_format(self.date_day):
@@ -268,35 +392,117 @@ class LogDayDir:
def get_dir_path(self) -> Path:
return self._base_dir / self.date_day
def get_root_groups(self) -> list[LogGroupDir]:
"""Get root-level LogGroupDir instances."""
return self._get_groups_at_path([])
def get_log_files(self) -> list[LogGroupDir]:
dir_path = self.get_dir_path()
"""Backward compatibility method - returns root groups."""
return self.get_root_groups()
def _get_groups_at_path(self, current_path: list[str]) -> list[LogGroupDir]:
# Build the current directory path
dir_path = self._base_dir / self.date_day
for i, component in enumerate(current_path):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
dir_path = dir_path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed
dir_path = dir_path / component
if not dir_path.exists() or not dir_path.is_dir():
return []
group_dirs_list: list[LogGroupDir] = []
# First level: group directories
for group_dir_path in dir_path.iterdir():
if group_dir_path.is_dir():
group_name = urllib.parse.unquote(group_dir_path.name)
# Look for group directories at this level
for subdir_path in dir_path.iterdir():
if subdir_path.is_dir():
group_name = urllib.parse.unquote(subdir_path.name)
group_path = [*current_path, group_name]
# A directory is a group directory if:
# 1. It contains function directories (directories with .log files), OR
# 2. It contains other group directories (nested structure)
# 3. It's NOT itself a function directory (doesn't contain .log files directly)
is_function_dir = self._is_function_directory(subdir_path)
if not is_function_dir: # Not a function directory
contains_functions = self._contains_function_directories(
subdir_path
)
contains_groups = self._contains_group_directories(subdir_path)
# If it contains either functions or groups, it's a valid group directory
if contains_functions or contains_groups:
try:
# Find nickname from configuration
nickname = None
config = self._find_config_for_path(group_path)
if config:
nickname = config.nickname
group_dirs_list.append(
LogGroupDir(
date_day=self.date_day,
group=group_name,
group_path=group_path,
_base_dir=self._base_dir,
nickname=nickname,
)
)
except ValueError:
log.warning(
f"Warning: Skipping malformed group directory '{group_dir_path.name}' in '{dir_path}'."
f"Warning: Skipping malformed group directory '{subdir_path.name}' in '{dir_path}'."
)
return sorted(group_dirs_list)
def _contains_function_directories(self, dir_path: Path) -> bool:
"""Check if directory contains function directories (directories with .log files)."""
for subdir in dir_path.iterdir():
if subdir.is_dir():
# Check if this subdirectory contains .log files directly
if any(f.suffix == ".log" for f in subdir.iterdir() if f.is_file()):
return True
return False
def _is_function_directory(self, dir_path: Path) -> bool:
"""Check if a directory is a function directory (contains .log files directly)."""
return any(f.suffix == ".log" for f in dir_path.iterdir() if f.is_file())
def _contains_group_directories(self, dir_path: Path) -> bool:
"""Check if directory contains nested group directories."""
for subdir in dir_path.iterdir():
if subdir.is_dir() and not self._is_function_directory(subdir):
# If subdir is not a function directory, it might be a group directory
return True
return False
def _find_config_for_path(self, group_path: list[str]) -> LogGroupConfig | None:
"""Find the configuration for a given group path."""
if not group_path:
return None
current_config = self.group_configs.get(group_path[0])
if not current_config:
return None
# Traverse down the hierarchy
for component in group_path[1:]:
current_config = current_config.get_child(component)
if not current_config:
return None
return current_config
def __eq__(self, other: object) -> bool:
if not isinstance(other, LogDayDir):
return NotImplemented
return self.date_day == other.date_day and self._base_dir == other._base_dir
return (
self.date_day == other.date_day
and self._base_dir == other._base_dir
and self.group_configs == other.group_configs
)
def __lt__(self, other: object) -> bool:
if not isinstance(other, LogDayDir):
@@ -308,19 +514,109 @@ class LogDayDir:
@dataclass(frozen=True)
class LogManager:
base_dir: Path
root_group_configs: dict[str, LogGroupConfig] = field(default_factory=dict)
def add_root_group_config(self, group_config: LogGroupConfig) -> "LogManager":
"""Return a new LogManager with the added root-level group configuration."""
new_configs = {**self.root_group_configs, group_config.name: group_config}
return LogManager(base_dir=self.base_dir, root_group_configs=new_configs)
def find_group_config(self, group_path: list[str]) -> LogGroupConfig | None:
"""Find group configuration by traversing the hierarchical path.
Only looks at structure elements (even indices), ignoring dynamic names (odd indices).
"""
if not group_path:
return None
current_config = self.root_group_configs.get(group_path[0])
if not current_config:
return None
# If only root group, return it
if len(group_path) == 1:
return current_config
# Traverse down the hierarchy, only looking at structure elements (even indices)
for i in range(2, len(group_path), 2):
structure_name = group_path[i]
current_config = current_config.get_child(structure_name)
if not current_config:
return None
return current_config
def get_group_display_name(self, group_path: list[str] | str) -> str:
"""Get the display name for a group (nickname if configured, otherwise group name).
For alternating structure/dynamic pattern:
- Structure elements (even indices): use configured nickname
- Dynamic elements (odd indices): use actual name
"""
if isinstance(group_path, str):
group_path = group_path.split("/")
if not group_path:
return ""
# Check if the last element is a structure element (even index) or dynamic element (odd index)
last_index = len(group_path) - 1
if last_index % 2 == 0:
# Even index = structure element, try to find config
config = self.find_group_config(group_path)
if config:
return config.get_display_name()
# Fallback to the structure name itself
return group_path[-1]
# Odd index = dynamic element, return the actual name
return group_path[-1]
def create_nested_log_group_dir(
self, date_day: str, group_path: list[str]
) -> LogGroupDir:
"""Create a LogGroupDir with nickname support if configured."""
config = self.find_group_config(group_path)
nickname = config.nickname if config else None
return LogGroupDir(
date_day=date_day,
group_path=group_path,
_base_dir=self.base_dir,
nickname=nickname,
)
def create_log_file(
self, func: Callable, op_key: str, group: str | None = None
self, func: Callable, op_key: str, group_path: str | list[str] | None = None
) -> LogFile:
now_utc = datetime.datetime.now(tz=datetime.UTC)
if group is None:
group = "default"
if group_path is None:
group_path = ["default"]
elif isinstance(group_path, str):
group_path = group_path.split("/")
# Validate that the group path structure is registered in the configuration
if not self._is_group_path_registered(group_path):
group_str = "/".join(group_path)
msg = f"Group structure '{group_str}' is not valid. Root group '{group_path[0]}' or structure elements at even indices are not registered."
raise ValueError(msg)
# URL encode dynamic elements (odd indices) before creating group string
encoded_group_path = []
for i, component in enumerate(group_path):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
encoded_group_path.append(urllib.parse.quote(component, safe=""))
else: # Even index = structure element, no encoding needed
encoded_group_path.append(component)
# Convert encoded path to string for LogFile
group_str = "/".join(encoded_group_path)
log_file = LogFile(
op_key=op_key,
date_day=now_utc.strftime("%Y-%m-%d"),
group=group,
group=group_str,
date_second=now_utc.strftime("%H-%M-%S"), # Corrected original's %H-$M-%S
func_name=func.__name__,
_base_dir=self.base_dir,
@@ -336,6 +632,57 @@ class LogManager:
log_path.touch()
return log_file
def _is_group_path_registered(self, group_path: list[str]) -> bool:
"""Check if the given group path structure is registered in the configuration.
This validates the group structure (e.g., clans/<name>/machines) but allows
dynamic names (e.g., <name> can be any value).
"""
# Special case: allow "default" group without registration
if group_path == ["default"]:
return True
# For dynamic group validation, we need to check if the structure exists
# by matching the pattern, not the exact path
return self._validate_group_structure(group_path)
def _validate_group_structure(self, group_path: list[str]) -> bool:
"""Validate that the group structure exists, allowing dynamic names.
Pattern alternates: structure -> dynamic -> structure -> dynamic -> ...
- Even indices (0, 2, 4, ...): must be registered group names (structure elements)
- Odd indices (1, 3, 5, ...): can be any dynamic names (will be URL encoded)
Examples:
- ["clans", "repo-name", "default"] -> clans(structure) -> repo-name(dynamic) -> default(structure)
- ["clans", "repo-name", "machines", "machine-name"] -> clans(struct) -> repo-name(dyn) -> machines(struct) -> machine-name(dyn)
"""
if not group_path:
return False
# Check if root group exists (index 0 - always structure)
root_group = group_path[0]
if root_group not in self.root_group_configs:
return False
if len(group_path) == 1:
return True
# For longer paths, traverse the structure elements only
current_config = self.root_group_configs[root_group]
# Check all structure elements (even indices starting from 2)
for i in range(2, len(group_path), 2):
structure_name = group_path[i]
# Look for this structure in current config's children
if structure_name not in current_config.children:
return False
current_config = current_config.children[structure_name]
return True
def list_log_days(self) -> list[LogDayDir]:
if not self.base_dir.exists() or not self.base_dir.is_dir():
return []
@@ -350,6 +697,7 @@ class LogManager:
LogDayDir(
date_day=day_dir_candidate_path.name,
_base_dir=self.base_dir,
group_configs=self.root_group_configs,
)
)
except ValueError:
@@ -363,41 +711,186 @@ class LogManager:
self,
op_key_to_find: str,
specific_date_day: str | None = None,
specific_group: str | None = None,
specific_group: list[str] | str | None = None,
) -> LogFile | None:
days_to_search: list[LogDayDir]
if specific_date_day:
if not is_correct_day_format(specific_date_day):
# print(f"Warning: Provided specific_date_day '{specific_date_day}' is not in YYYY-MM-DD format.")
return None
try:
target_day_dir = LogDayDir(
date_day=specific_date_day, _base_dir=self.base_dir
date_day=specific_date_day,
_base_dir=self.base_dir,
group_configs=self.root_group_configs,
)
if (
not target_day_dir.get_dir_path().exists()
): # Check if dir exists on disk
if not target_day_dir.get_dir_path().exists():
return None
days_to_search = [target_day_dir] # Search only this specific day
except ValueError: # If LogDayDir construction fails (e.g. date_day format despite is_correct_day_format)
days_to_search = [target_day_dir]
except ValueError:
return None
else:
days_to_search = self.list_log_days() # Already sorted, newest day first
days_to_search = self.list_log_days()
for day_dir in (
days_to_search
): # Iterates newest day first if days_to_search came from list_log_days()
# day_dir.get_log_files() returns List[LogGroupDir], sorted by group name
for group_dir in day_dir.get_log_files():
# Skip this group if specific_group is provided and doesn't match
if specific_group is not None and group_dir.group != specific_group:
# If specific_group is provided, use filter function to navigate directly
if specific_group is not None:
# Convert string to array if needed (backward compatibility)
if isinstance(specific_group, str):
specific_group_array = specific_group.split("/")
else:
specific_group_array = specific_group
for day_dir in days_to_search:
result = self._search_log_file_in_specific_group(
day_dir, op_key_to_find, specific_group_array
)
if result:
return result
return None
# Search all groups if no specific group provided
for day_dir in days_to_search:
result = self._search_log_file_in_groups(
day_dir.get_root_groups(), op_key_to_find, None
)
if result:
return result
return None
def _search_log_file_in_specific_group(
self, day_dir: LogDayDir, op_key_to_find: str, specific_group: list[str]
) -> LogFile | None:
"""Search for a log file in a specific group using the filter function."""
# Build the directory path using the same logic as filter function
dir_path = day_dir.get_dir_path()
for i, component in enumerate(specific_group):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
dir_path = dir_path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed
dir_path = dir_path / component
if not dir_path.exists() or not dir_path.is_dir():
return None
# Search for function directories in this specific group
for func_dir_path in dir_path.iterdir():
if func_dir_path.is_dir():
# Check if this is a function directory (contains .log files)
contains_log_files = any(
f.suffix == ".log" for f in func_dir_path.iterdir() if f.is_file()
)
if contains_log_files:
try:
# Create LogFuncDir and search for the log file
# Need to create the group string that matches what create_log_file creates
# Encode dynamic elements (odd indices) to match the stored LogFile.group
encoded_group_path = []
for i, component in enumerate(specific_group):
if (
i % 2 == 1
): # Odd index = dynamic element, needs URL encoding
encoded_group_path.append(
urllib.parse.quote(component, safe="")
)
else: # Even index = structure element, no encoding needed
encoded_group_path.append(component)
func_dir = LogFuncDir(
date_day=day_dir.date_day,
group="/".join(encoded_group_path),
func_name=func_dir_path.name,
_base_dir=self.base_dir,
)
# Search through log files in this function directory
for log_file in func_dir.get_log_files():
if log_file.op_key == op_key_to_find:
return log_file
except ValueError:
# Skip malformed function directories
continue
# group_dir.get_log_files() returns List[LogFuncDir], sorted by func_name
return None
def _search_log_file_in_groups(
self,
group_dirs: list[LogGroupDir],
op_key_to_find: str,
specific_group: str | None = None,
) -> LogFile | None:
"""Recursively search for a log file in group directories."""
for group_dir in group_dirs:
# Search in function directories of this group
for func_dir in group_dir.get_log_files():
# func_dir.get_log_files() returns List[LogFile], sorted newest file first
for log_file in func_dir.get_log_files():
if log_file.op_key == op_key_to_find:
return log_file
# Recursively search in nested groups
nested_groups = group_dir.get_nested_groups()
result = self._search_log_file_in_groups(
nested_groups, op_key_to_find, specific_group
)
if result:
return result
return None
def filter(self, path: list[str], date_day: str | None = None) -> list[str]:
"""Filter and list folders at the specified hierarchical path.
Args:
path: List of path components to navigate to. Empty list returns top-level groups.
For alternating structure/dynamic pattern:
- ["clans"] lists all dynamic names under clans
- ["clans", <name>, "machines"] lists all dynamic names under machines
- [] lists all top-level groups
date_day: Optional date to filter by. If None, uses most recent day.
Returns:
List of folder names (decoded) at the specified path level.
"""
# Get the day to search in
if date_day is None:
days = self.list_log_days()
if not days:
return []
day_dir = days[0] # Most recent day
else:
if not is_correct_day_format(date_day):
return []
try:
day_dir = LogDayDir(
date_day=date_day,
_base_dir=self.base_dir,
group_configs=self.root_group_configs,
)
if not day_dir.get_dir_path().exists():
return []
except ValueError:
return []
# Empty path means list top-level groups
if not path:
return list(self.root_group_configs.keys())
# Build the directory path to search in
dir_path = day_dir.get_dir_path()
for i, component in enumerate(path):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
dir_path = dir_path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed
dir_path = dir_path / component
if not dir_path.exists() or not dir_path.is_dir():
return []
# List directories and decode their names
folder_names = []
for subdir_path in dir_path.iterdir():
if subdir_path.is_dir():
# Decode the directory name
decoded_name = urllib.parse.unquote(subdir_path.name)
folder_names.append(decoded_name)
return sorted(folder_names)

View File

@@ -24,7 +24,8 @@ def list_log_groups(date_day: str) -> list[LogGroupDir]:
def list_log_funcs_at_day(date_day: str, group: str) -> list[LogFuncDir]:
"""List all logs for a specific function on a specific day."""
assert LOG_MANAGER_INSTANCE is not None
group_dir = LogGroupDir(date_day, group, LOG_MANAGER_INSTANCE.base_dir)
group_path = group.split("/") if group else []
group_dir = LogGroupDir(date_day, group_path, LOG_MANAGER_INSTANCE.base_dir)
return group_dir.get_log_files()

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
"""
Simple LogManager example with filter function.
This demonstrates:
- Dynamic group names with URL encoding
- Hierarchical structure navigation using the filter function
- Pattern: clans -> <dynamic_name> -> machines -> <dynamic_name>
"""
from pathlib import Path
from clan_lib.log_manager import LogGroupConfig, LogManager
def example_function() -> None:
"""Example function for creating logs."""
def deploy_machine() -> None:
"""Function for deploying machines."""
def main() -> None:
"""Simple LogManager demonstration with filter function."""
# Setup
log_manager = LogManager(base_dir=Path("/tmp/clan_logs"))
# Configure structure: clans -> <dynamic> -> machines -> <dynamic>
clans_config = LogGroupConfig("clans", "Clans")
machines_config = LogGroupConfig("machines", "Machines")
clans_config = clans_config.add_child(machines_config)
log_manager = log_manager.add_root_group_config(clans_config)
print("=== LogManager Filter Function Example ===\n")
# Create some example logs
repos = ["/home/user/Projects/qubasas_clan", "https://github.com/qubasa/myclan"]
machines = ["wintux", "demo", "gchq-local"]
for repo in repos:
for machine in machines:
log_manager.create_log_file(
deploy_machine,
f"deploy_{machine}",
["clans", repo, "machines", machine],
)
print("Created log files for multiple repos and machines\n")
# Demonstrate filter function
print("=== Using the filter() function ===")
# 1. List top-level groups
top_level = log_manager.filter([])
print(f"1. Top-level groups: {top_level}")
# 2. List all repositories under 'clans'
clans_repos = log_manager.filter(["clans"])
print(f"2. Repositories under clans: {clans_repos}")
# 3. List machines under first repository
if clans_repos:
first_repo = clans_repos[0]
repo_machines = log_manager.filter(["clans", first_repo, "machines"])
print(f"3. Machines under '{first_repo}': {repo_machines}")
# 4. List machines under second repository
if len(clans_repos) > 1:
second_repo = clans_repos[1]
repo_machines = log_manager.filter(["clans", second_repo, "machines"])
print(f"4. Machines under '{second_repo}': {repo_machines}")
print("\n=== Using get_log_file with arrays ===")
# Demonstrate the new array-based get_log_file functionality
if clans_repos and len(clans_repos) > 0:
specific_log = log_manager.get_log_file(
"deploy_wintux",
specific_group=["clans", clans_repos[0], "machines", "wintux"],
)
if specific_log:
print(
f"5. Found specific log: {specific_log.op_key} in {specific_log.func_name}"
)
else:
print("5. Specific log not found")
print("\n=== Key Features ===")
print("✓ Dynamic names with special chars (/, spaces, etc.) work")
print("✓ Names are URL encoded in filesystem but returned decoded")
print("✓ Filter function navigates hierarchy with simple arrays")
print("✓ get_log_file now accepts specific_group as array")
print("✓ Empty array [] lists top-level groups")
print("✓ Odd indices are dynamic, even indices are structure")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,187 @@
# Test file specifically for URL encoding functionality
import urllib.parse
from pathlib import Path
from clan_lib.log_manager import LogGroupConfig, LogManager
def sample_function() -> None:
"""Sample function for testing."""
class TestURLEncoding:
"""Test URL encoding for dynamic group names."""
def test_dynamic_name_url_encoding_forward_slash(self, tmp_path: Path) -> None:
"""Test that dynamic names with forward slashes get URL encoded."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
default_config = LogGroupConfig("default", "Default")
clans_config = clans_config.add_child(default_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Use a dynamic name with forward slashes
dynamic_name = "/home/user/Projects/qubasas_clan"
group_path = ["clans", dynamic_name, "default"]
log_file = log_manager.create_log_file(sample_function, "test_op", group_path)
# Check that the LogFile uses encoded path for file system operations
file_path = log_file.get_file_path()
expected_encoded = urllib.parse.quote(dynamic_name, safe="")
# Verify the encoded name appears in the file path
assert expected_encoded in str(file_path)
assert file_path.exists()
# Verify that no intermediate directories were created from the forward slashes
# The encoded name should be a single directory
day_dir = tmp_path / log_file.date_day / "clans"
direct_children = [p.name for p in day_dir.iterdir() if p.is_dir()]
assert len(direct_children) == 1
assert direct_children[0] == expected_encoded
def test_dynamic_name_url_encoding_special_characters(self, tmp_path: Path) -> None:
"""Test URL encoding of dynamic names with various special characters."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
machines_config = LogGroupConfig("machines", "Machines")
clans_config = clans_config.add_child(machines_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Test various special characters
test_cases = [
"repo with spaces",
"repo&with&ampersands",
"repo!with!exclamations",
"repo%with%percent",
"repo@with@symbols",
"repo#with#hash",
"repo+with+plus",
]
for dynamic_name in test_cases:
group_path = ["clans", dynamic_name, "machines", f"machine-{dynamic_name}"]
log_file = log_manager.create_log_file(
sample_function, f"test_{dynamic_name}", group_path
)
# Check that the file was created and encoded names appear in path
file_path = log_file.get_file_path()
assert file_path.exists()
# Verify encoding for both dynamic elements (indices 1 and 3)
expected_encoded_repo = urllib.parse.quote(dynamic_name, safe="")
expected_encoded_machine = urllib.parse.quote(
f"machine-{dynamic_name}", safe=""
)
assert expected_encoded_repo in str(file_path)
assert expected_encoded_machine in str(file_path)
def test_structure_elements_not_encoded(self, tmp_path: Path) -> None:
"""Test that structure elements (even indices) are NOT URL encoded."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements with special characters in their names
# (though this is not typical, testing to ensure they're not encoded)
test_config = LogGroupConfig("test-group", "Test Group")
sub_config = LogGroupConfig("sub-group", "Sub Group")
test_config = test_config.add_child(sub_config)
log_manager = log_manager.add_root_group_config(test_config)
# Use structure names that contain hyphens (common case)
group_path = ["test-group", "dynamic-name", "sub-group", "another-dynamic"]
log_file = log_manager.create_log_file(sample_function, "test_op", group_path)
file_path = log_file.get_file_path()
# Structure elements should NOT be encoded
assert "test-group" in str(file_path) # Structure element, not encoded
assert "sub-group" in str(file_path) # Structure element, not encoded
# Dynamic elements should be encoded
expected_dynamic1 = urllib.parse.quote("dynamic-name", safe="")
expected_dynamic2 = urllib.parse.quote("another-dynamic", safe="")
assert expected_dynamic1 in str(file_path)
assert expected_dynamic2 in str(file_path)
def test_url_encoding_with_unicode_characters(self, tmp_path: Path) -> None:
"""Test URL encoding with Unicode characters in dynamic names."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
default_config = LogGroupConfig("default", "Default")
clans_config = clans_config.add_child(default_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Use Unicode characters in dynamic name
dynamic_name = "项目/中文/测试" # Chinese characters with slashes
group_path = ["clans", dynamic_name, "default"]
log_file = log_manager.create_log_file(
sample_function, "unicode_test", group_path
)
file_path = log_file.get_file_path()
# Check that file was created and Unicode was properly encoded
assert file_path.exists()
expected_encoded = urllib.parse.quote(dynamic_name, safe="")
assert expected_encoded in str(file_path)
# Verify no intermediate directories from slashes in Unicode string
day_dir = tmp_path / log_file.date_day / "clans"
direct_children = [p.name for p in day_dir.iterdir() if p.is_dir()]
assert len(direct_children) == 1
assert direct_children[0] == expected_encoded
def test_backward_compatibility_single_element_paths(self, tmp_path: Path) -> None:
"""Test that single-element paths (no dynamic names) still work."""
log_manager = LogManager(base_dir=tmp_path)
# Register simple structure
default_config = LogGroupConfig("default", "Default")
log_manager = log_manager.add_root_group_config(default_config)
# Use simple single-element path (no dynamic names to encode)
group_path = ["default"]
log_file = log_manager.create_log_file(
sample_function, "simple_test", group_path
)
file_path = log_file.get_file_path()
# Should work exactly as before
assert file_path.exists()
assert "default" in str(file_path)
# No encoding should have occurred
assert urllib.parse.quote("default", safe="") == "default" # No special chars
def test_empty_dynamic_name_encoding(self, tmp_path: Path) -> None:
"""Test URL encoding with empty string as dynamic name."""
log_manager = LogManager(base_dir=tmp_path)
# Register structure elements
clans_config = LogGroupConfig("clans", "Clans")
default_config = LogGroupConfig("default", "Default")
clans_config = clans_config.add_child(default_config)
log_manager = log_manager.add_root_group_config(clans_config)
# Use empty string as dynamic name
group_path = ["clans", "", "default"]
log_file = log_manager.create_log_file(
sample_function, "empty_test", group_path
)
file_path = log_file.get_file_path()
# Should work - empty string gets encoded as empty string
assert file_path.exists()
expected_encoded = urllib.parse.quote("", safe="")
assert expected_encoded == "" # Empty string encodes to empty string

View File

@@ -16,6 +16,7 @@ mkShell {
with ps;
[
mypy
pytest-cov
]
++ (clan-cli.devshellPyDeps ps)
))