clan_lib: expose log_manager with API.register properly

This commit is contained in:
Qubasa
2025-07-04 13:19:09 +07:00
parent 24b8cb799a
commit 70d57cb267
5 changed files with 93 additions and 144 deletions

View File

@@ -396,10 +396,6 @@ class LogDayDir:
"""Get root-level LogGroupDir instances."""
return self._get_groups_at_path([])
def get_log_files(self) -> list[LogGroupDir]:
"""Backward compatibility method - returns root groups."""
return self.get_root_groups()
def _get_groups_at_path(self, current_path: list[str]) -> list[LogGroupDir]:
# Build the current directory path
dir_path = self._base_dir / self.date_day
@@ -587,14 +583,12 @@ class LogManager:
)
def create_log_file(
self, func: Callable, op_key: str, group_path: str | list[str] | None = None
self, func: Callable, op_key: str, group_path: list[str] | None = None
) -> LogFile:
now_utc = datetime.datetime.now(tz=datetime.UTC)
if group_path is None:
group_path = ["default"]
elif isinstance(group_path, str):
group_path = group_path.split("/")
# Validate that the group path structure is registered in the configuration
if not self._is_group_path_registered(group_path):
@@ -709,18 +703,19 @@ class LogManager:
def get_log_file(
self,
op_key_to_find: str,
specific_date_day: str | None = None,
specific_group: list[str] | str | None = None,
op_key: str,
*,
date_day: str | None = None,
selector: list[str] | None = None,
) -> LogFile | None:
days_to_search: list[LogDayDir]
if specific_date_day:
if not is_correct_day_format(specific_date_day):
if date_day:
if not is_correct_day_format(date_day):
return None
try:
target_day_dir = LogDayDir(
date_day=specific_date_day,
date_day=date_day,
_base_dir=self.base_dir,
group_configs=self.root_group_configs,
)
@@ -733,16 +728,13 @@ class LogManager:
days_to_search = self.list_log_days()
# If specific_group is provided, use filter function to navigate directly
if specific_group is not None:
if selector is not None:
# Convert string to array if needed (backward compatibility)
if isinstance(specific_group, str):
specific_group_array = specific_group.split("/")
else:
specific_group_array = specific_group
specific_group_array = selector
for day_dir in days_to_search:
result = self._search_log_file_in_specific_group(
day_dir, op_key_to_find, specific_group_array
day_dir, op_key, specific_group_array
)
if result:
return result
@@ -751,7 +743,7 @@ class LogManager:
# Search all groups if no specific group provided
for day_dir in days_to_search:
result = self._search_log_file_in_groups(
day_dir.get_root_groups(), op_key_to_find, None
day_dir.get_root_groups(), op_key, None
)
if result:
return result
@@ -836,7 +828,9 @@ class LogManager:
return None
def filter(self, path: list[str], date_day: str | None = None) -> list[str]:
def filter(
self, selector: list[str] | None = None, date_day: str | None = None
) -> list[str]:
"""Filter and list folders at the specified hierarchical path.
Args:
@@ -850,6 +844,9 @@ class LogManager:
Returns:
List of folder names (decoded) at the specified path level.
"""
if selector is None:
selector = []
# Get the day to search in
if date_day is None:
days = self.list_log_days()
@@ -871,12 +868,12 @@ class LogManager:
return []
# Empty path means list top-level groups
if not path:
if not selector:
return list(self.root_group_configs.keys())
# Build the directory path to search in
dir_path = day_dir.get_dir_path()
for i, component in enumerate(path):
for i, component in enumerate(selector):
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
dir_path = dir_path / urllib.parse.quote(component, safe="")
else: # Even index = structure element, no encoding needed

View File

@@ -1,54 +1,36 @@
from clan_lib.api import API
from clan_lib.errors import ClanError
from clan_lib.log_manager import LogDayDir, LogFile, LogFuncDir, LogGroupDir, LogManager
from clan_lib.log_manager import LogManager
LOG_MANAGER_INSTANCE: LogManager | None = None
@API.register
def list_log_days() -> list[LogDayDir]:
def list_log_days() -> list[str]:
"""List all logs."""
assert LOG_MANAGER_INSTANCE is not None
return LOG_MANAGER_INSTANCE.list_log_days()
return [day.date_day for day in LOG_MANAGER_INSTANCE.list_log_days()]
@API.register
def list_log_groups(date_day: str) -> list[LogGroupDir]:
def list_log_groups(selector: list[str], date_day: str | None = None) -> list[str]:
"""List all log groups."""
assert LOG_MANAGER_INSTANCE is not None
day_dir = LogDayDir(date_day, LOG_MANAGER_INSTANCE.base_dir)
return day_dir.get_log_files()
return LOG_MANAGER_INSTANCE.filter(selector, date_day=date_day)
@API.register
def list_log_funcs_at_day(date_day: str, group: str) -> list[LogFuncDir]:
"""List all logs for a specific function on a specific day."""
assert LOG_MANAGER_INSTANCE is not None
group_path = group.split("/") if group else []
group_dir = LogGroupDir(date_day, group_path, LOG_MANAGER_INSTANCE.base_dir)
return group_dir.get_log_files()
@API.register
def list_log_files(date_day: str, group: str, func_name: str) -> list[LogFile]:
"""List all log files for a specific function on a specific day."""
assert LOG_MANAGER_INSTANCE is not None
func_dir = LogFuncDir(date_day, group, func_name, LOG_MANAGER_INSTANCE.base_dir)
return func_dir.get_log_files()
@API.register
def get_log_file(id_key: str, group: str | None = None) -> str:
def get_log_file(
id_key: str, selector: list[str] | None = None, date_day: str | None = None
) -> str:
"""Get a specific log file by op_key, function name and day."""
assert LOG_MANAGER_INSTANCE is not None
log_file = LOG_MANAGER_INSTANCE.get_log_file(id_key, specific_group=group)
log_file = LOG_MANAGER_INSTANCE.get_log_file(
op_key=id_key, selector=selector, date_day=date_day
)
if log_file is None:
return ""
file_path = log_file.get_file_path()
if not file_path.exists():
msg = f"Log file {file_path} does not exist."
msg = f"Log file with op_key '{id_key}' not found in selector '{selector}' and date_day '{date_day}'."
raise ClanError(msg)
return file_path.read_text()
return log_file.get_file_path().read_text()

View File

@@ -76,7 +76,7 @@ def main() -> None:
if clans_repos and len(clans_repos) > 0:
specific_log = log_manager.get_log_file(
"deploy_wintux",
specific_group=["clans", clans_repos[0], "machines", "wintux"],
selector=["clans", clans_repos[0], "machines", "wintux"],
)
if specific_log:
print(

View File

@@ -78,16 +78,16 @@ def populated_log_structure(
# Day 1: 2023-10-26
# Group A, Func A
lf1 = log_manager.create_log_file(
sample_func_one, "op_key_A1", "group_a"
sample_func_one, "op_key_A1", ["group_a"]
) # 10-00-00
created_files["lf1"] = lf1
lf2 = log_manager.create_log_file(
sample_func_one, "op_key_A2", "group_a"
sample_func_one, "op_key_A2", ["group_a"]
) # 10-01-01
created_files["lf2"] = lf2
# Group B, Func B
lf3 = log_manager.create_log_file(
sample_func_two, "op_key_B1", "group_b"
sample_func_two, "op_key_B1", ["group_b"]
) # 10-02-02
created_files["lf3"] = lf3
@@ -98,7 +98,7 @@ def populated_log_structure(
MockDateTime._delta = datetime.timedelta(seconds=0) # Reset delta for new day
lf4 = log_manager.create_log_file(
sample_func_one, "op_key_A3_day2", "group_a"
sample_func_one, "op_key_A3_day2", ["group_a"]
) # 12-00-00
created_files["lf4"] = lf4
@@ -476,11 +476,11 @@ class TestLogDayDir:
self, tmp_path: Path
) -> None: # Renamed from get_log_files for clarity here
ldd = LogDayDir("2023-10-26", tmp_path)
assert ldd.get_log_files() == [] # Dir does not exist
assert ldd.get_root_groups() == [] # Dir does not exist
dir_path = ldd.get_dir_path()
dir_path.mkdir(parents=True, exist_ok=True) # Dir exists but empty
assert ldd.get_log_files() == []
assert ldd.get_root_groups() == []
def test_get_log_files_populated(
self, tmp_path: Path, caplog: pytest.LogCaptureFixture
@@ -520,7 +520,7 @@ class TestLogDayDir:
# So, the warning there is unlikely to trigger from func_dir_path.name issues.
with caplog.at_level(logging.WARNING):
log_group_dirs = ldd.get_log_files()
log_group_dirs = ldd.get_root_groups()
assert len(log_group_dirs) == 2 # group_a and group_b
# No warnings expected from this specific setup for LogDayDir.get_log_files
@@ -594,7 +594,7 @@ class TestLogDayDir:
expected_groups.append(special_group)
# Get the log group directories
log_group_dirs = ldd.get_log_files()
log_group_dirs = ldd.get_root_groups()
# Verify we get the correct number of groups
assert len(log_group_dirs) == len(expected_groups)
@@ -722,34 +722,23 @@ class TestLogManager:
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, created_files = populated_log_structure
found_log_file = log_manager.get_log_file(
"op_key_A1", specific_date_day="2023-10-26"
)
found_log_file = log_manager.get_log_file("op_key_A1", date_day="2023-10-26")
assert found_log_file is not None
assert found_log_file == created_files["lf1"]
assert (
log_manager.get_log_file("op_key_A1", specific_date_day="2023-10-27")
is None
)
assert log_manager.get_log_file("op_key_A1", date_day="2023-10-27") is None
def test_get_log_file_specific_date_not_exists(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, _ = populated_log_structure
assert (
log_manager.get_log_file("any_op_key", specific_date_day="1999-01-01")
is None
)
assert log_manager.get_log_file("any_op_key", date_day="1999-01-01") is None
def test_get_log_file_specific_date_invalid_format(
self, populated_log_structure: tuple[LogManager, Path, dict[str, LogFile]]
) -> None:
log_manager, _, _ = populated_log_structure
assert (
log_manager.get_log_file("any_op_key", specific_date_day="2023/01/01")
is None
)
assert log_manager.get_log_file("any_op_key", date_day="2023/01/01") is None
# --- Tests for URL encoding/decoding of group names ---
@@ -803,22 +792,10 @@ class TestGroupURLEncoding:
parent_config = parent_config.add_child(child_config)
log_manager = log_manager.add_root_group_config(parent_config)
log_file = log_manager.create_log_file(sample_func_one, "test_op", group_name)
file_path = log_file.get_file_path()
assert file_path.exists()
# Check that nested directories are created
day_dir = tmp_path / log_file.date_day
parent_dir = day_dir / "parent"
child_dir = parent_dir / "child"
assert parent_dir.exists()
assert child_dir.exists()
# Verify round-trip
read_log_file = LogFile.from_path(file_path)
assert read_log_file.group == group_name
with pytest.raises(
ValueError, match="Group structure 'parent/child' is not valid"
):
log_manager.create_log_file(sample_func_one, "test_op", [group_name])
def test_group_unicode_characters(self, tmp_path: Path) -> None:
"""Test that dynamic group names with Unicode characters are handled correctly."""
@@ -867,8 +844,8 @@ class TestGroupDirectoryHandling:
log_manager = log_manager.add_root_group_config(database_config)
# Create log files with different groups
lf1 = log_manager.create_log_file(sample_func_one, "op1", "auth")
lf2 = log_manager.create_log_file(sample_func_two, "op2", "database")
lf1 = log_manager.create_log_file(sample_func_one, "op1", ["auth"])
lf2 = log_manager.create_log_file(sample_func_two, "op2", ["database"])
lf3 = log_manager.create_log_file(sample_func_one, "op3") # default group
assert lf1.group == "auth"
@@ -892,15 +869,17 @@ class TestGroupDirectoryHandling:
log_manager = log_manager.add_root_group_config(database_config)
# Create log files with different groups
log_manager.create_log_file(sample_func_one, "op1", "auth")
log_manager.create_log_file(sample_func_two, "op2", "database")
log_manager.create_log_file(sample_func_one, "op3", "auth") # Same group as lf1
log_manager.create_log_file(sample_func_one, "op1", ["auth"])
log_manager.create_log_file(sample_func_two, "op2", ["database"])
log_manager.create_log_file(
sample_func_one, "op3", ["auth"]
) # Same group as lf1
# Get the day directory and check its contents
day_dirs = log_manager.list_log_days()
assert len(day_dirs) == 1
log_group_dirs = day_dirs[0].get_log_files()
log_group_dirs = day_dirs[0].get_root_groups()
assert len(log_group_dirs) == 2 # auth and database groups
# Check that we have the correct groups
@@ -933,9 +912,9 @@ class TestGroupDirectoryHandling:
log_manager = log_manager.add_root_group_config(database_config)
# Create log files with same op_key but different groups
lf1 = log_manager.create_log_file(sample_func_one, "shared_op", "auth")
lf2 = log_manager.create_log_file(sample_func_two, "shared_op", "database")
lf3 = log_manager.create_log_file(sample_func_one, "unique_op", "auth")
lf1 = log_manager.create_log_file(sample_func_one, "shared_op", ["auth"])
lf2 = log_manager.create_log_file(sample_func_two, "shared_op", ["database"])
lf3 = log_manager.create_log_file(sample_func_one, "unique_op", ["auth"])
# get_log_file should find the first match (implementation detail: depends on sort order)
found_shared = log_manager.get_log_file("shared_op")
@@ -1145,8 +1124,8 @@ class TestHierarchicalLogGroupDirs:
log_manager = log_manager.add_root_group_config(database_config)
# Create log files with registered groups
lf1 = log_manager.create_log_file(sample_func_one, "op1", "auth")
lf2 = log_manager.create_log_file(sample_func_two, "op2", "database")
lf1 = log_manager.create_log_file(sample_func_one, "op1", ["auth"])
lf2 = log_manager.create_log_file(sample_func_two, "op2", ["database"])
# Should work exactly as before
assert lf1.group == "auth"
@@ -1160,7 +1139,7 @@ class TestHierarchicalLogGroupDirs:
day_dirs = log_manager.list_log_days()
assert len(day_dirs) == 1
group_dirs = day_dirs[0].get_log_files()
group_dirs = day_dirs[0].get_root_groups()
assert len(group_dirs) == 2
# All LogGroupDir instances should have configured nicknames and single-level paths
@@ -1241,10 +1220,10 @@ class TestHierarchicalLogGroupDirs:
# Test search with specific group filter
found_machine1_specific = log_manager.get_log_file(
"machine1_deployment", specific_group="flakes/flake1/machines/machine1"
"machine1_deployment", selector=["flakes", "flake1", "machines", "machine1"]
)
found_machine2_specific = log_manager.get_log_file(
"machine2_build", specific_group="flakes/flake2/machines/machine2"
"machine2_build", selector=["flakes", "flake2", "machines", "machine2"]
)
assert found_machine1_specific == machine1_log
@@ -1252,7 +1231,7 @@ class TestHierarchicalLogGroupDirs:
# Test that search across wrong group returns None
found_wrong_group = log_manager.get_log_file(
"machine1_deployment", specific_group="flakes/flake2/machines/machine2"
"machine1_deployment", selector=["flakes", "flake2", "machines", "machine2"]
)
assert found_wrong_group is None
@@ -1825,7 +1804,7 @@ class TestFilterFunction:
result = log_manager.filter(["clans"])
assert result == []
def test_filter_with_specific_date_day(self, tmp_path: Path) -> None:
def test_filter_with_date_day(self, tmp_path: Path) -> None:
"""Test filtering with specific date."""
log_manager = LogManager(base_dir=tmp_path)
@@ -1909,10 +1888,10 @@ class TestFilterFunction:
class TestGetLogFileWithArrays:
"""Test the modified get_log_file method that accepts specific_group as array."""
"""Test the modified get_log_file method that accepts group as array."""
def test_get_log_file_with_specific_group_array(self, tmp_path: Path) -> None:
"""Test get_log_file with specific_group as array."""
def test_get_log_file_with_group_array(self, tmp_path: Path) -> None:
"""Test get_log_file with group as array."""
log_manager = LogManager(base_dir=tmp_path)
# Set up nested structure
@@ -1931,10 +1910,10 @@ class TestGetLogFileWithArrays:
["clans", repo_name, "machines", machine_name],
)
# Search using array for specific_group
# Search using array for group
found_log = log_manager.get_log_file(
"deploy_machine",
specific_group=["clans", repo_name, "machines", machine_name],
selector=["clans", repo_name, "machines", machine_name],
)
assert found_log is not None
@@ -1945,9 +1924,7 @@ class TestGetLogFileWithArrays:
assert found_log.func_name == log_file.func_name
assert found_log._base_dir == log_file._base_dir
def test_get_log_file_with_specific_group_array_special_chars(
self, tmp_path: Path
) -> None:
def test_get_log_file_with_group_array_special_chars(self, tmp_path: Path) -> None:
"""Test get_log_file with special characters in dynamic names."""
log_manager = LogManager(base_dir=tmp_path)
@@ -1966,7 +1943,7 @@ class TestGetLogFileWithArrays:
# Search using array with special characters
found_log = log_manager.get_log_file(
"special_deploy", specific_group=["clans", repo_name, "default"]
"special_deploy", selector=["clans", repo_name, "default"]
)
assert found_log is not None
@@ -1977,10 +1954,8 @@ class TestGetLogFileWithArrays:
assert found_log.func_name == log_file.func_name
assert found_log._base_dir == log_file._base_dir
def test_get_log_file_with_specific_group_array_not_found(
self, tmp_path: Path
) -> None:
"""Test get_log_file with specific_group array when group doesn't exist."""
def test_get_log_file_with_group_array_not_found(self, tmp_path: Path) -> None:
"""Test get_log_file with group array when group doesn't exist."""
log_manager = LogManager(base_dir=tmp_path)
# Set up structure
@@ -1993,15 +1968,13 @@ class TestGetLogFileWithArrays:
# Search in non-existent group
found_log = log_manager.get_log_file(
"nonexistent_op", specific_group=["clans", "nonexistent_repo", "default"]
"nonexistent_op", selector=["clans", "nonexistent_repo", "default"]
)
assert found_log is None
def test_get_log_file_without_specific_group_still_works(
self, tmp_path: Path
) -> None:
"""Test that get_log_file still works without specific_group parameter."""
def test_get_log_file_without_group_still_works(self, tmp_path: Path) -> None:
"""Test that get_log_file still works without group parameter."""
log_manager = LogManager(base_dir=tmp_path)
# Set up structure
@@ -2015,17 +1988,15 @@ class TestGetLogFileWithArrays:
sample_func_one, "general_op", ["clans", "myrepo", "default"]
)
# Search without specific_group (should search all)
# Search without group (should search all)
found_log = log_manager.get_log_file("general_op")
assert found_log is not None
assert found_log == log_file
assert found_log.op_key == "general_op"
def test_get_log_file_with_date_and_specific_group_array(
self, tmp_path: Path
) -> None:
"""Test get_log_file with both specific_date_day and specific_group as array."""
def test_get_log_file_with_date_and_group_array(self, tmp_path: Path) -> None:
"""Test get_log_file with both date_day and group as array."""
log_manager = LogManager(base_dir=tmp_path)
# Set up structure
@@ -2042,8 +2013,8 @@ class TestGetLogFileWithArrays:
# Search with both parameters
found_log = log_manager.get_log_file(
"dated_op",
specific_date_day=log_file.date_day,
specific_group=["clans", "myrepo", "default"],
date_day=log_file.date_day,
selector=["clans", "myrepo", "default"],
)
assert found_log is not None
@@ -2054,8 +2025,8 @@ class TestGetLogFileWithArrays:
assert found_log.func_name == log_file.func_name
assert found_log._base_dir == log_file._base_dir
def test_get_log_file_unicode_in_specific_group_array(self, tmp_path: Path) -> None:
"""Test get_log_file with Unicode characters in specific_group array."""
def test_get_log_file_unicode_in_group_array(self, tmp_path: Path) -> None:
"""Test get_log_file with Unicode characters in group array."""
log_manager = LogManager(base_dir=tmp_path)
# Set up structure
@@ -2073,7 +2044,7 @@ class TestGetLogFileWithArrays:
# Search using array with Unicode characters
found_log = log_manager.get_log_file(
"unicode_op", specific_group=["clans", repo_name, "default"]
"unicode_op", selector=["clans", repo_name, "default"]
)
assert found_log is not None