Merge branch 'main' into state-version-option-again
This commit is contained in:
@@ -1,34 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import importlib
|
||||
import json
|
||||
import pkgutil
|
||||
from types import ModuleType
|
||||
|
||||
|
||||
def import_all_modules_from_package(pkg: ModuleType) -> None:
|
||||
for _loader, module_name, _is_pkg in pkgutil.walk_packages(
|
||||
pkg.__path__, prefix=f"{pkg.__name__}."
|
||||
):
|
||||
base_name = module_name.split(".")[-1]
|
||||
|
||||
# Skip test modules
|
||||
if (
|
||||
base_name.startswith("test_")
|
||||
or base_name.endswith("_test")
|
||||
or base_name == "conftest"
|
||||
):
|
||||
continue
|
||||
|
||||
importlib.import_module(module_name)
|
||||
from clan_lib.api import load_in_all_api_functions
|
||||
|
||||
|
||||
def main() -> None:
|
||||
import clan_cli
|
||||
import clan_lib
|
||||
|
||||
import_all_modules_from_package(clan_cli)
|
||||
import_all_modules_from_package(clan_lib)
|
||||
load_in_all_api_functions()
|
||||
|
||||
from clan_lib.api import API
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ from tempfile import TemporaryDirectory
|
||||
from clan_lib.cmd import RunOpts, run
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.git import commit_files
|
||||
from clan_lib.machines.list import list_full_machines
|
||||
from clan_lib.machines.machines import Machine
|
||||
from clan_lib.nix import nix_shell
|
||||
|
||||
@@ -18,7 +19,6 @@ from clan_cli.completions import (
|
||||
complete_machines,
|
||||
complete_services_for_machine,
|
||||
)
|
||||
from clan_cli.machines.list import list_full_machines
|
||||
|
||||
from .check import check_secrets
|
||||
from .public_modules import FactStoreBase
|
||||
|
||||
@@ -2,7 +2,7 @@ import argparse
|
||||
import logging
|
||||
|
||||
from clan_lib.flake import Flake
|
||||
from clan_lib.machines.list import list_full_machines, query_machines_by_tags
|
||||
from clan_lib.machines.actions import list_machines
|
||||
|
||||
from clan_cli.completions import add_dynamic_completer, complete_tags
|
||||
|
||||
@@ -12,12 +12,8 @@ log = logging.getLogger(__name__)
|
||||
def list_command(args: argparse.Namespace) -> None:
|
||||
flake: Flake = args.flake
|
||||
|
||||
if args.tags:
|
||||
for name in query_machines_by_tags(flake, args.tags):
|
||||
print(name)
|
||||
else:
|
||||
for name in list_full_machines(flake):
|
||||
print(name)
|
||||
for name in list_machines(flake, opts={"filter": {"tags": args.tags}}):
|
||||
print(name)
|
||||
|
||||
|
||||
def register_list_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
||||
@@ -4,6 +4,7 @@ import sys
|
||||
|
||||
from clan_lib.async_run import AsyncContext, AsyncOpts, AsyncRuntime
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.machines.list import list_full_machines, query_machines_by_tags
|
||||
from clan_lib.machines.machines import Machine
|
||||
from clan_lib.machines.suggestions import validate_machine_names
|
||||
from clan_lib.machines.update import deploy_machine
|
||||
@@ -15,7 +16,6 @@ from clan_cli.completions import (
|
||||
complete_machines,
|
||||
complete_tags,
|
||||
)
|
||||
from clan_cli.machines.list import list_full_machines, query_machines_by_tags
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ from clan_cli.completions import (
|
||||
complete_machines,
|
||||
complete_services_for_machine,
|
||||
)
|
||||
from clan_cli.machines.list import list_full_machines
|
||||
from clan_cli.vars._types import StoreBase
|
||||
from clan_cli.vars.migration import check_can_migrate, migrate_files
|
||||
from clan_lib.api import API
|
||||
@@ -22,6 +21,7 @@ from clan_lib.cmd import RunOpts, run
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.flake import Flake
|
||||
from clan_lib.git import commit_files
|
||||
from clan_lib.machines.list import list_full_machines
|
||||
from clan_lib.nix import nix_config, nix_shell, nix_test_store
|
||||
|
||||
from .check import check_vars
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import importlib
|
||||
import logging
|
||||
import pkgutil
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from functools import wraps
|
||||
from inspect import Parameter, Signature, signature
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
Annotated,
|
||||
Any,
|
||||
@@ -12,6 +15,8 @@ from typing import (
|
||||
get_type_hints,
|
||||
)
|
||||
|
||||
from clan_lib.api.util import JSchemaTypeError
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
from .serde import dataclass_to_dict, from_dict, sanitize_string
|
||||
@@ -217,12 +222,16 @@ API.register(open_file)
|
||||
for name, func in self._registry.items():
|
||||
hints = get_type_hints(func)
|
||||
|
||||
serialized_hints = {
|
||||
key: type_to_dict(
|
||||
value, scope=name + " argument" if key != "return" else "return"
|
||||
)
|
||||
for key, value in hints.items()
|
||||
}
|
||||
try:
|
||||
serialized_hints = {
|
||||
key: type_to_dict(
|
||||
value, scope=name + " argument" if key != "return" else "return"
|
||||
)
|
||||
for key, value in hints.items()
|
||||
}
|
||||
except JSchemaTypeError as e:
|
||||
msg = f"Error serializing type hints for function '{name}': {e}"
|
||||
raise JSchemaTypeError(msg) from e
|
||||
|
||||
return_type = serialized_hints.pop("return")
|
||||
|
||||
@@ -283,4 +292,35 @@ API.register(open_file)
|
||||
return None
|
||||
|
||||
|
||||
def import_all_modules_from_package(pkg: ModuleType) -> None:
|
||||
for _loader, module_name, _is_pkg in pkgutil.walk_packages(
|
||||
pkg.__path__, prefix=f"{pkg.__name__}."
|
||||
):
|
||||
base_name = module_name.split(".")[-1]
|
||||
|
||||
# Skip test modules
|
||||
if (
|
||||
base_name.startswith("test_")
|
||||
or base_name.endswith("_test")
|
||||
or base_name == "conftest"
|
||||
):
|
||||
continue
|
||||
|
||||
importlib.import_module(module_name)
|
||||
|
||||
|
||||
def load_in_all_api_functions() -> None:
|
||||
"""
|
||||
For the global API object, to have all functions available.
|
||||
We have to make sure python loads every wrapped function at least once.
|
||||
This is done by importing all modules from the clan_lib and clan_cli packages.
|
||||
"""
|
||||
import clan_cli
|
||||
|
||||
import clan_lib
|
||||
|
||||
import_all_modules_from_package(clan_lib)
|
||||
import_all_modules_from_package(clan_cli)
|
||||
|
||||
|
||||
API = MethodRegistry()
|
||||
|
||||
@@ -2,16 +2,67 @@ import datetime
|
||||
import logging
|
||||
import urllib.parse
|
||||
from collections.abc import Callable # Union for str | None
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from functools import total_ordering
|
||||
from pathlib import Path
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LogGroupConfig:
|
||||
"""Configuration for a hierarchical log group with nickname support."""
|
||||
|
||||
name: str # The name of this group level (single directory name)
|
||||
nickname: str | None = None # Optional display name for easier visibility
|
||||
children: dict[str, "LogGroupConfig"] = field(
|
||||
default_factory=dict
|
||||
) # Nested child groups
|
||||
|
||||
def get_display_name(self) -> str:
|
||||
"""Get the display name for this log group.
|
||||
|
||||
Returns:
|
||||
The nickname if available, otherwise the group name.
|
||||
"""
|
||||
return self.nickname if self.nickname else self.name
|
||||
|
||||
def add_child(self, child: "LogGroupConfig") -> "LogGroupConfig":
|
||||
"""Add a child group configuration and return a new LogGroupConfig instance.
|
||||
|
||||
Args:
|
||||
child: The child LogGroupConfig to add.
|
||||
|
||||
Returns:
|
||||
A new LogGroupConfig instance with the child added.
|
||||
"""
|
||||
new_children = {**self.children, child.name: child}
|
||||
return LogGroupConfig(
|
||||
name=self.name, nickname=self.nickname, children=new_children
|
||||
)
|
||||
|
||||
def get_child(self, name: str) -> "LogGroupConfig | None":
|
||||
"""Get a child group configuration by name.
|
||||
|
||||
Args:
|
||||
name: The name of the child group to retrieve.
|
||||
|
||||
Returns:
|
||||
The child LogGroupConfig if found, None otherwise.
|
||||
"""
|
||||
return self.children.get(name)
|
||||
|
||||
|
||||
# Global helper function for format checking (used by LogManager and internally by classes)
|
||||
def is_correct_day_format(date_day: str) -> bool:
|
||||
"""Check if the date_day is in the correct format YYYY-MM-DD."""
|
||||
"""Check if the date_day string is in the correct format YYYY-MM-DD.
|
||||
|
||||
Args:
|
||||
date_day: The date string to validate.
|
||||
|
||||
Returns:
|
||||
True if the date_day matches YYYY-MM-DD format, False otherwise.
|
||||
"""
|
||||
try:
|
||||
datetime.datetime.strptime(date_day, "%Y-%m-%d").replace(tzinfo=datetime.UTC)
|
||||
except ValueError:
|
||||
@@ -30,6 +81,11 @@ class LogFile:
|
||||
date_second: str # HH-MM-SS
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Validate date and time formats after initialization.
|
||||
|
||||
Raises:
|
||||
ValueError: If date_day or date_second are not in the correct format.
|
||||
"""
|
||||
# Validate formats upon initialization.
|
||||
if not is_correct_day_format(self.date_day):
|
||||
msg = f"LogFile.date_day '{self.date_day}' is not in YYYY-MM-DD format."
|
||||
@@ -44,46 +100,42 @@ class LogFile:
|
||||
|
||||
@property
|
||||
def _datetime_obj(self) -> datetime.datetime:
|
||||
"""Get the datetime object for this log file.
|
||||
|
||||
Returns:
|
||||
A datetime object constructed from date_day and date_second.
|
||||
"""
|
||||
# Formats are pre-validated by __post_init__.
|
||||
return datetime.datetime.strptime(
|
||||
f"{self.date_day} {self.date_second}", "%Y-%m-%d %H-%M-%S"
|
||||
).replace(tzinfo=datetime.UTC)
|
||||
|
||||
@classmethod
|
||||
def from_path(cls, file: Path) -> "LogFile":
|
||||
date_day = file.parent.parent.parent.name
|
||||
group = urllib.parse.unquote(file.parent.parent.name)
|
||||
func_name = file.parent.name
|
||||
base_dir = file.parent.parent.parent.parent
|
||||
|
||||
filename_stem = file.stem
|
||||
parts = filename_stem.split("_", 1)
|
||||
if len(parts) != 2:
|
||||
msg = f"Log filename '{file.name}' in dir '{file.parent}' does not match 'HH-MM-SS_op_key.log' format."
|
||||
raise ValueError(msg)
|
||||
|
||||
date_second_str = parts[0]
|
||||
op_key_str = parts[1]
|
||||
|
||||
return LogFile(
|
||||
op_key=op_key_str,
|
||||
date_day=date_day,
|
||||
group=group,
|
||||
date_second=date_second_str,
|
||||
func_name=func_name,
|
||||
_base_dir=base_dir,
|
||||
)
|
||||
|
||||
def get_file_path(self) -> Path:
|
||||
return (
|
||||
self._base_dir
|
||||
/ self.date_day
|
||||
/ urllib.parse.quote(self.group, safe="")
|
||||
/ self.func_name
|
||||
/ f"{self.date_second}_{self.op_key}.log"
|
||||
)
|
||||
"""Get the full file path for this log file.
|
||||
|
||||
Returns:
|
||||
The complete Path object for this log file including nested directory structure.
|
||||
"""
|
||||
# Create nested directory structure for hierarchical groups
|
||||
path = self._base_dir / self.date_day
|
||||
|
||||
# Split group by slash and create nested directories
|
||||
# Dynamic elements are already URL encoded at LogFile creation time
|
||||
group_components = self.group.split("/")
|
||||
for component in group_components:
|
||||
path = path / component
|
||||
|
||||
return path / self.func_name / f"{self.date_second}_{self.op_key}.log"
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
"""Check equality with another LogFile instance.
|
||||
|
||||
Args:
|
||||
other: The object to compare with.
|
||||
|
||||
Returns:
|
||||
True if all significant fields are equal, False otherwise.
|
||||
"""
|
||||
if not isinstance(other, LogFile):
|
||||
return NotImplemented
|
||||
# Compare all significant fields for equality
|
||||
@@ -96,6 +148,16 @@ class LogFile:
|
||||
)
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
"""Compare LogFile instances for sorting.
|
||||
|
||||
Sorting order: datetime (newest first), then group, func_name, op_key (all ascending).
|
||||
|
||||
Args:
|
||||
other: The object to compare with.
|
||||
|
||||
Returns:
|
||||
True if this instance should be sorted before the other.
|
||||
"""
|
||||
if not isinstance(other, LogFile):
|
||||
return NotImplemented
|
||||
# Primary sort: datetime (newest first). self is "less than" other if self is newer.
|
||||
@@ -111,154 +173,31 @@ class LogFile:
|
||||
return self.op_key < other.op_key
|
||||
|
||||
|
||||
@total_ordering
|
||||
@dataclass(frozen=True)
|
||||
class LogFuncDir:
|
||||
date_day: str
|
||||
group: str
|
||||
func_name: str
|
||||
_base_dir: Path
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not is_correct_day_format(self.date_day):
|
||||
msg = f"LogFuncDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
|
||||
raise ValueError(msg)
|
||||
|
||||
@property
|
||||
def _date_obj(self) -> datetime.date:
|
||||
return (
|
||||
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
|
||||
.replace(tzinfo=datetime.UTC)
|
||||
.date()
|
||||
)
|
||||
|
||||
def get_dir_path(self) -> Path:
|
||||
return (
|
||||
self._base_dir
|
||||
/ self.date_day
|
||||
/ urllib.parse.quote(self.group, safe="")
|
||||
/ self.func_name
|
||||
)
|
||||
|
||||
def get_log_files(self) -> list[LogFile]:
|
||||
dir_path = self.get_dir_path()
|
||||
if not dir_path.exists() or not dir_path.is_dir():
|
||||
return []
|
||||
|
||||
log_files_list: list[LogFile] = []
|
||||
for file_path in dir_path.iterdir():
|
||||
if file_path.is_file() and file_path.suffix == ".log":
|
||||
try:
|
||||
log_files_list.append(LogFile.from_path(file_path))
|
||||
except ValueError:
|
||||
log.warning(
|
||||
f"Skipping malformed log file '{file_path.name}' in '{dir_path}'."
|
||||
)
|
||||
|
||||
return sorted(log_files_list) # Sorts using LogFile.__lt__ (newest first)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, LogFuncDir):
|
||||
return NotImplemented
|
||||
return (
|
||||
self.date_day == other.date_day
|
||||
and self.group == other.group
|
||||
and self.func_name == other.func_name
|
||||
and self._base_dir == other._base_dir
|
||||
)
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
if not isinstance(other, LogFuncDir):
|
||||
return NotImplemented
|
||||
# Primary sort: date (newest first)
|
||||
if self._date_obj != other._date_obj:
|
||||
return self._date_obj > other._date_obj
|
||||
# Secondary sort: group (alphabetical ascending)
|
||||
if self.group != other.group:
|
||||
return self.group < other.group
|
||||
# Tertiary sort: func_name (alphabetical ascending)
|
||||
return self.func_name < other.func_name
|
||||
|
||||
|
||||
@total_ordering
|
||||
@dataclass(frozen=True)
|
||||
class LogGroupDir:
|
||||
date_day: str
|
||||
group: str
|
||||
_base_dir: Path
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not is_correct_day_format(self.date_day):
|
||||
msg = f"LogGroupDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
|
||||
raise ValueError(msg)
|
||||
|
||||
@property
|
||||
def _date_obj(self) -> datetime.date:
|
||||
return (
|
||||
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
|
||||
.replace(tzinfo=datetime.UTC)
|
||||
.date()
|
||||
)
|
||||
|
||||
def get_dir_path(self) -> Path:
|
||||
return self._base_dir / self.date_day / urllib.parse.quote(self.group, safe="")
|
||||
|
||||
def get_log_files(self) -> list[LogFuncDir]:
|
||||
dir_path = self.get_dir_path()
|
||||
if not dir_path.exists() or not dir_path.is_dir():
|
||||
return []
|
||||
|
||||
func_dirs_list: list[LogFuncDir] = []
|
||||
for func_dir_path in dir_path.iterdir():
|
||||
if func_dir_path.is_dir():
|
||||
try:
|
||||
func_dirs_list.append(
|
||||
LogFuncDir(
|
||||
date_day=self.date_day,
|
||||
group=self.group,
|
||||
func_name=func_dir_path.name,
|
||||
_base_dir=self._base_dir,
|
||||
)
|
||||
)
|
||||
except ValueError:
|
||||
log.warning(
|
||||
f"Skipping malformed function directory '{func_dir_path.name}' in '{dir_path}'."
|
||||
)
|
||||
|
||||
return sorted(func_dirs_list)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, LogGroupDir):
|
||||
return NotImplemented
|
||||
return (
|
||||
self.date_day == other.date_day
|
||||
and self.group == other.group
|
||||
and self._base_dir == other._base_dir
|
||||
)
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
if not isinstance(other, LogGroupDir):
|
||||
return NotImplemented
|
||||
# Primary sort: date (newest first)
|
||||
if self._date_obj != other._date_obj:
|
||||
return self._date_obj > other._date_obj
|
||||
# Secondary sort: group (alphabetical ascending)
|
||||
return self.group < other.group
|
||||
|
||||
|
||||
@total_ordering
|
||||
@dataclass(frozen=True)
|
||||
class LogDayDir:
|
||||
"""Represents a single day's log directory."""
|
||||
|
||||
date_day: str
|
||||
_base_dir: Path
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Validate date format after initialization.
|
||||
|
||||
Raises:
|
||||
ValueError: If date_day is not in YYYY-MM-DD format.
|
||||
"""
|
||||
if not is_correct_day_format(self.date_day):
|
||||
msg = f"LogDayDir.date_day '{self.date_day}' is not in YYYY-MM-DD format."
|
||||
raise ValueError(msg)
|
||||
|
||||
@property
|
||||
def _date_obj(self) -> datetime.date:
|
||||
"""Get the date object for this log day directory.
|
||||
|
||||
Returns:
|
||||
A date object constructed from date_day.
|
||||
"""
|
||||
return (
|
||||
datetime.datetime.strptime(self.date_day, "%Y-%m-%d")
|
||||
.replace(tzinfo=datetime.UTC)
|
||||
@@ -266,39 +205,37 @@ class LogDayDir:
|
||||
)
|
||||
|
||||
def get_dir_path(self) -> Path:
|
||||
"""Get the directory path for this log day.
|
||||
|
||||
Returns:
|
||||
The Path object for this day's log directory.
|
||||
"""
|
||||
return self._base_dir / self.date_day
|
||||
|
||||
def get_log_files(self) -> list[LogGroupDir]:
|
||||
dir_path = self.get_dir_path()
|
||||
if not dir_path.exists() or not dir_path.is_dir():
|
||||
return []
|
||||
|
||||
group_dirs_list: list[LogGroupDir] = []
|
||||
|
||||
# First level: group directories
|
||||
for group_dir_path in dir_path.iterdir():
|
||||
if group_dir_path.is_dir():
|
||||
group_name = urllib.parse.unquote(group_dir_path.name)
|
||||
try:
|
||||
group_dirs_list.append(
|
||||
LogGroupDir(
|
||||
date_day=self.date_day,
|
||||
group=group_name,
|
||||
_base_dir=self._base_dir,
|
||||
)
|
||||
)
|
||||
except ValueError:
|
||||
log.warning(
|
||||
f"Warning: Skipping malformed group directory '{group_dir_path.name}' in '{dir_path}'."
|
||||
)
|
||||
return sorted(group_dirs_list)
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
"""Check equality with another LogDayDir instance.
|
||||
|
||||
Args:
|
||||
other: The object to compare with.
|
||||
|
||||
Returns:
|
||||
True if date_day and base_dir are equal, False otherwise.
|
||||
"""
|
||||
if not isinstance(other, LogDayDir):
|
||||
return NotImplemented
|
||||
return self.date_day == other.date_day and self._base_dir == other._base_dir
|
||||
|
||||
def __lt__(self, other: object) -> bool:
|
||||
"""Compare LogDayDir instances for sorting.
|
||||
|
||||
Sorting order: date (newest first).
|
||||
|
||||
Args:
|
||||
other: The object to compare with.
|
||||
|
||||
Returns:
|
||||
True if this instance should be sorted before the other.
|
||||
"""
|
||||
if not isinstance(other, LogDayDir):
|
||||
return NotImplemented
|
||||
# Primary sort: date (newest first)
|
||||
@@ -307,20 +244,105 @@ class LogDayDir:
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LogManager:
|
||||
"""Manages hierarchical log files with group configurations and filtering capabilities.
|
||||
|
||||
Provides functionality to create, search, and organize log files in a hierarchical
|
||||
directory structure with support for dynamic group names and nicknames.
|
||||
|
||||
Attributes:
|
||||
base_dir: The base directory where all log files are stored.
|
||||
root_group_configs: Dictionary of root-level group configurations.
|
||||
"""
|
||||
|
||||
base_dir: Path
|
||||
root_group_configs: dict[str, LogGroupConfig] = field(default_factory=dict)
|
||||
|
||||
def add_root_group_config(self, group_config: LogGroupConfig) -> "LogManager":
|
||||
"""Return a new LogManager with the added root-level group configuration.
|
||||
|
||||
Args:
|
||||
group_config: The root-level group configuration to add.
|
||||
|
||||
Returns:
|
||||
A new LogManager instance with the group configuration added.
|
||||
"""
|
||||
new_configs = {**self.root_group_configs, group_config.name: group_config}
|
||||
return LogManager(base_dir=self.base_dir, root_group_configs=new_configs)
|
||||
|
||||
def find_group_config(self, group_path: list[str]) -> LogGroupConfig | None:
|
||||
"""Find group configuration by traversing the hierarchical path.
|
||||
|
||||
Only looks at structure elements (even indices), ignoring dynamic names (odd indices).
|
||||
|
||||
Args:
|
||||
group_path: The group path components to search for.
|
||||
|
||||
Returns:
|
||||
The LogGroupConfig if found, None otherwise.
|
||||
"""
|
||||
if not group_path:
|
||||
return None
|
||||
|
||||
current_config = self.root_group_configs.get(group_path[0])
|
||||
if not current_config:
|
||||
return None
|
||||
|
||||
# If only root group, return it
|
||||
if len(group_path) == 1:
|
||||
return current_config
|
||||
|
||||
# Traverse down the hierarchy, only looking at structure elements (even indices)
|
||||
for i in range(2, len(group_path), 2):
|
||||
structure_name = group_path[i]
|
||||
current_config = current_config.get_child(structure_name)
|
||||
if not current_config:
|
||||
return None
|
||||
|
||||
return current_config
|
||||
|
||||
def create_log_file(
|
||||
self, func: Callable, op_key: str, group: str | None = None
|
||||
self, func: Callable, op_key: str, group_path: list[str] | None = None
|
||||
) -> LogFile:
|
||||
"""Create a new log file for the given function and operation.
|
||||
|
||||
Args:
|
||||
func: The function to create a log file for.
|
||||
op_key: The operation key identifier.
|
||||
group_path: Optional group path components. Defaults to ["default"].
|
||||
|
||||
Returns:
|
||||
A new LogFile instance with the log file created on disk.
|
||||
|
||||
Raises:
|
||||
ValueError: If the group structure is not registered.
|
||||
FileExistsError: If the log file already exists.
|
||||
"""
|
||||
now_utc = datetime.datetime.now(tz=datetime.UTC)
|
||||
|
||||
if group is None:
|
||||
group = "default"
|
||||
if group_path is None:
|
||||
group_path = ["default"]
|
||||
|
||||
# Validate that the group path structure is registered in the configuration
|
||||
if not self._is_group_path_registered(group_path):
|
||||
group_str = "/".join(group_path)
|
||||
msg = f"Group structure '{group_str}' is not valid. Root group '{group_path[0]}' or structure elements at even indices are not registered."
|
||||
raise ValueError(msg)
|
||||
|
||||
# URL encode dynamic elements (odd indices) before creating group string
|
||||
encoded_group_path = []
|
||||
for i, component in enumerate(group_path):
|
||||
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
|
||||
encoded_group_path.append(urllib.parse.quote(component, safe=""))
|
||||
else: # Even index = structure element, no encoding needed
|
||||
encoded_group_path.append(component)
|
||||
|
||||
# Convert encoded path to string for LogFile
|
||||
group_str = "/".join(encoded_group_path)
|
||||
|
||||
log_file = LogFile(
|
||||
op_key=op_key,
|
||||
date_day=now_utc.strftime("%Y-%m-%d"),
|
||||
group=group,
|
||||
group=group_str,
|
||||
date_second=now_utc.strftime("%H-%M-%S"), # Corrected original's %H-$M-%S
|
||||
func_name=func.__name__,
|
||||
_base_dir=self.base_dir,
|
||||
@@ -336,7 +358,75 @@ class LogManager:
|
||||
log_path.touch()
|
||||
return log_file
|
||||
|
||||
def _is_group_path_registered(self, group_path: list[str]) -> bool:
|
||||
"""Check if the given group path structure is registered in the configuration.
|
||||
|
||||
This validates the group structure (e.g., clans/<name>/machines) but allows
|
||||
dynamic names (e.g., <name> can be any value).
|
||||
|
||||
Args:
|
||||
group_path: The group path components to validate.
|
||||
|
||||
Returns:
|
||||
True if the group structure is registered, False otherwise.
|
||||
"""
|
||||
# Special case: allow "default" group without registration
|
||||
if group_path == ["default"]:
|
||||
return True
|
||||
|
||||
# For dynamic group validation, we need to check if the structure exists
|
||||
# by matching the pattern, not the exact path
|
||||
return self._validate_group_structure(group_path)
|
||||
|
||||
def _validate_group_structure(self, group_path: list[str]) -> bool:
|
||||
"""Validate that the group structure exists, allowing dynamic names.
|
||||
|
||||
Pattern alternates: structure -> dynamic -> structure -> dynamic -> ...
|
||||
- Even indices (0, 2, 4, ...): must be registered group names (structure elements)
|
||||
- Odd indices (1, 3, 5, ...): can be any dynamic names (will be URL encoded)
|
||||
|
||||
Examples:
|
||||
- ["clans", "repo-name", "default"] -> clans(structure) -> repo-name(dynamic) -> default(structure)
|
||||
- ["clans", "repo-name", "machines", "machine-name"] -> clans(struct) -> repo-name(dyn) -> machines(struct) -> machine-name(dyn)
|
||||
|
||||
Args:
|
||||
group_path: The group path components to validate.
|
||||
|
||||
Returns:
|
||||
True if the group structure is valid, False otherwise.
|
||||
"""
|
||||
if not group_path:
|
||||
return False
|
||||
|
||||
# Check if root group exists (index 0 - always structure)
|
||||
root_group = group_path[0]
|
||||
if root_group not in self.root_group_configs:
|
||||
return False
|
||||
|
||||
if len(group_path) == 1:
|
||||
return True
|
||||
|
||||
# For longer paths, traverse the structure elements only
|
||||
current_config = self.root_group_configs[root_group]
|
||||
|
||||
# Check all structure elements (even indices starting from 2)
|
||||
for i in range(2, len(group_path), 2):
|
||||
structure_name = group_path[i]
|
||||
|
||||
# Look for this structure in current config's children
|
||||
if structure_name not in current_config.children:
|
||||
return False
|
||||
|
||||
current_config = current_config.children[structure_name]
|
||||
|
||||
return True
|
||||
|
||||
def list_log_days(self) -> list[LogDayDir]:
|
||||
"""List all available log days in the base directory.
|
||||
|
||||
Returns:
|
||||
A sorted list of LogDayDir instances (newest first). Returns empty list if base directory doesn't exist.
|
||||
"""
|
||||
if not self.base_dir.exists() or not self.base_dir.is_dir():
|
||||
return []
|
||||
|
||||
@@ -361,43 +451,211 @@ class LogManager:
|
||||
|
||||
def get_log_file(
|
||||
self,
|
||||
op_key_to_find: str,
|
||||
specific_date_day: str | None = None,
|
||||
specific_group: str | None = None,
|
||||
op_key: str,
|
||||
*,
|
||||
date_day: str | None = None,
|
||||
selector: list[str] | None = None,
|
||||
) -> LogFile | None:
|
||||
"""Get a specific log file by operation key.
|
||||
|
||||
Args:
|
||||
op_key: The operation key to search for.
|
||||
date_day: Optional specific date to search in (YYYY-MM-DD format).
|
||||
selector: Optional group path to search in. If None, searches all groups.
|
||||
|
||||
Returns:
|
||||
The LogFile if found, None otherwise.
|
||||
"""
|
||||
days_to_search: list[LogDayDir]
|
||||
|
||||
if specific_date_day:
|
||||
if not is_correct_day_format(specific_date_day):
|
||||
# print(f"Warning: Provided specific_date_day '{specific_date_day}' is not in YYYY-MM-DD format.")
|
||||
if date_day:
|
||||
if not is_correct_day_format(date_day):
|
||||
return None
|
||||
try:
|
||||
target_day_dir = LogDayDir(
|
||||
date_day=specific_date_day, _base_dir=self.base_dir
|
||||
date_day=date_day,
|
||||
_base_dir=self.base_dir,
|
||||
)
|
||||
if (
|
||||
not target_day_dir.get_dir_path().exists()
|
||||
): # Check if dir exists on disk
|
||||
if not target_day_dir.get_dir_path().exists():
|
||||
return None
|
||||
days_to_search = [target_day_dir] # Search only this specific day
|
||||
except ValueError: # If LogDayDir construction fails (e.g. date_day format despite is_correct_day_format)
|
||||
days_to_search = [target_day_dir]
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
days_to_search = self.list_log_days() # Already sorted, newest day first
|
||||
days_to_search = self.list_log_days()
|
||||
|
||||
for day_dir in (
|
||||
days_to_search
|
||||
): # Iterates newest day first if days_to_search came from list_log_days()
|
||||
# day_dir.get_log_files() returns List[LogGroupDir], sorted by group name
|
||||
for group_dir in day_dir.get_log_files():
|
||||
# Skip this group if specific_group is provided and doesn't match
|
||||
if specific_group is not None and group_dir.group != specific_group:
|
||||
# Search for the log file directly using filesystem traversal
|
||||
for day_dir in days_to_search:
|
||||
result = self._find_log_file_in_day(day_dir, op_key, selector)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
def _find_log_file_in_day(
|
||||
self, day_dir: LogDayDir, op_key: str, selector: list[str] | None = None
|
||||
) -> LogFile | None:
|
||||
"""Find a log file in a specific day directory.
|
||||
|
||||
Args:
|
||||
day_dir: The LogDayDir to search in.
|
||||
op_key: The operation key to search for.
|
||||
selector: Optional group path to search in. If None, searches all groups.
|
||||
|
||||
Returns:
|
||||
The LogFile if found, None otherwise.
|
||||
"""
|
||||
base_path = day_dir.get_dir_path()
|
||||
|
||||
if selector is not None:
|
||||
# Search in specific group path
|
||||
search_path = base_path
|
||||
for i, component in enumerate(selector):
|
||||
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
|
||||
search_path = search_path / urllib.parse.quote(component, safe="")
|
||||
else: # Even index = structure element, no encoding needed
|
||||
search_path = search_path / component
|
||||
|
||||
if search_path.exists() and search_path.is_dir():
|
||||
return self._search_in_path(search_path, op_key, selector)
|
||||
else:
|
||||
# Search all groups in this day
|
||||
if base_path.exists() and base_path.is_dir():
|
||||
return self._search_in_path(base_path, op_key, None)
|
||||
|
||||
return None
|
||||
|
||||
def _search_in_path(
|
||||
self, search_path: Path, op_key: str, group_path: list[str] | None
|
||||
) -> LogFile | None:
|
||||
"""Search for log files in a given path.
|
||||
|
||||
Args:
|
||||
search_path: The path to search in.
|
||||
op_key: The operation key to search for.
|
||||
group_path: The group path used to construct the LogFile.
|
||||
|
||||
Returns:
|
||||
The LogFile if found, None otherwise.
|
||||
"""
|
||||
log_files: list[LogFile] = []
|
||||
|
||||
# Recursively search for log files
|
||||
for log_file_path in search_path.rglob("*.log"):
|
||||
if log_file_path.is_file():
|
||||
try:
|
||||
# Parse filename to get op_key and time
|
||||
filename_stem = log_file_path.stem
|
||||
parts = filename_stem.split("_", 1)
|
||||
if len(parts) == 2:
|
||||
date_second_str, file_op_key = parts
|
||||
|
||||
if file_op_key == op_key:
|
||||
# Find the base directory (contains date directories)
|
||||
base_dir = self.base_dir
|
||||
|
||||
# Get path relative to base directory
|
||||
try:
|
||||
relative_to_base = log_file_path.relative_to(base_dir)
|
||||
path_parts = relative_to_base.parts
|
||||
|
||||
if len(path_parts) >= 3: # date/[groups...]/func/file
|
||||
date_day = path_parts[0]
|
||||
func_name = path_parts[
|
||||
-2
|
||||
] # Second to last is function name
|
||||
group_parts = path_parts[
|
||||
1:-2
|
||||
] # Between date and function
|
||||
|
||||
# Create group string (already URL encoded in filesystem)
|
||||
group_str = (
|
||||
"/".join(group_parts)
|
||||
if group_parts
|
||||
else "default"
|
||||
)
|
||||
|
||||
if is_correct_day_format(date_day):
|
||||
log_file = LogFile(
|
||||
op_key=file_op_key,
|
||||
date_day=date_day,
|
||||
group=group_str,
|
||||
func_name=func_name,
|
||||
_base_dir=self.base_dir,
|
||||
date_second=date_second_str,
|
||||
)
|
||||
log_files.append(log_file)
|
||||
except ValueError:
|
||||
# Skip files that can't be made relative to base_dir
|
||||
continue
|
||||
except (ValueError, IndexError):
|
||||
# Skip malformed files
|
||||
continue
|
||||
|
||||
# group_dir.get_log_files() returns List[LogFuncDir], sorted by func_name
|
||||
for func_dir in group_dir.get_log_files():
|
||||
# func_dir.get_log_files() returns List[LogFile], sorted newest file first
|
||||
for log_file in func_dir.get_log_files():
|
||||
if log_file.op_key == op_key_to_find:
|
||||
return log_file
|
||||
# Return the newest log file if any found
|
||||
if log_files:
|
||||
return sorted(log_files)[0] # LogFile.__lt__ sorts newest first
|
||||
|
||||
return None
|
||||
|
||||
def filter(
|
||||
self, selector: list[str] | None = None, date_day: str | None = None
|
||||
) -> list[str]:
|
||||
"""Filter and list folders at the specified hierarchical path.
|
||||
|
||||
Args:
|
||||
selector: List of path components to navigate to. Empty list returns top-level groups.
|
||||
For alternating structure/dynamic pattern:
|
||||
- ["clans"] lists all dynamic names under clans
|
||||
- ["clans", <name>, "machines"] lists all dynamic names under machines
|
||||
- [] lists all top-level groups
|
||||
date_day: Optional date to filter by (YYYY-MM-DD format). If None, uses most recent day.
|
||||
|
||||
Returns:
|
||||
List of folder names (decoded) at the specified path level.
|
||||
"""
|
||||
if selector is None:
|
||||
selector = []
|
||||
|
||||
# Get the day to search in
|
||||
if date_day is None:
|
||||
days = self.list_log_days()
|
||||
if not days:
|
||||
return []
|
||||
day_dir = days[0] # Most recent day
|
||||
else:
|
||||
if not is_correct_day_format(date_day):
|
||||
return []
|
||||
try:
|
||||
day_dir = LogDayDir(
|
||||
date_day=date_day,
|
||||
_base_dir=self.base_dir,
|
||||
)
|
||||
if not day_dir.get_dir_path().exists():
|
||||
return []
|
||||
except ValueError:
|
||||
return []
|
||||
|
||||
# Empty path means list top-level groups
|
||||
if not selector:
|
||||
return list(self.root_group_configs.keys())
|
||||
|
||||
# Build the directory path to search in
|
||||
dir_path = day_dir.get_dir_path()
|
||||
for i, component in enumerate(selector):
|
||||
if i % 2 == 1: # Odd index = dynamic element, needs URL encoding
|
||||
dir_path = dir_path / urllib.parse.quote(component, safe="")
|
||||
else: # Even index = structure element, no encoding needed
|
||||
dir_path = dir_path / component
|
||||
|
||||
if not dir_path.exists() or not dir_path.is_dir():
|
||||
return []
|
||||
|
||||
# List directories and decode their names
|
||||
folder_names = []
|
||||
for subdir_path in dir_path.iterdir():
|
||||
if subdir_path.is_dir():
|
||||
# Decode the directory name
|
||||
decoded_name = urllib.parse.unquote(subdir_path.name)
|
||||
folder_names.append(decoded_name)
|
||||
|
||||
return sorted(folder_names)
|
||||
|
||||
@@ -1,53 +1,69 @@
|
||||
from clan_lib.api import API
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.log_manager import LogDayDir, LogFile, LogFuncDir, LogGroupDir, LogManager
|
||||
from clan_lib.log_manager import LogManager
|
||||
|
||||
LOG_MANAGER_INSTANCE: LogManager | None = None
|
||||
|
||||
|
||||
@API.register
|
||||
def list_log_days() -> list[LogDayDir]:
|
||||
"""List all logs."""
|
||||
def list_log_days() -> list[str]:
|
||||
"""List all available log days.
|
||||
|
||||
Returns:
|
||||
A list of date strings in YYYY-MM-DD format representing all available log days.
|
||||
|
||||
Raises:
|
||||
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
|
||||
"""
|
||||
assert LOG_MANAGER_INSTANCE is not None
|
||||
return LOG_MANAGER_INSTANCE.list_log_days()
|
||||
return [day.date_day for day in LOG_MANAGER_INSTANCE.list_log_days()]
|
||||
|
||||
|
||||
@API.register
|
||||
def list_log_groups(date_day: str) -> list[LogGroupDir]:
|
||||
"""List all log groups."""
|
||||
def list_log_groups(
|
||||
selector: list[str] | None, date_day: str | None = None
|
||||
) -> list[str]:
|
||||
"""List all log groups at the specified hierarchical path.
|
||||
|
||||
Args:
|
||||
selector: List of path components to navigate to. Empty list returns top-level groups.
|
||||
date_day: Optional date to filter by (YYYY-MM-DD format). If None, uses most recent day.
|
||||
|
||||
Returns:
|
||||
A list of folder names (decoded) at the specified path level.
|
||||
|
||||
Raises:
|
||||
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
|
||||
"""
|
||||
assert LOG_MANAGER_INSTANCE is not None
|
||||
day_dir = LogDayDir(date_day, LOG_MANAGER_INSTANCE.base_dir)
|
||||
return day_dir.get_log_files()
|
||||
return LOG_MANAGER_INSTANCE.filter(selector, date_day=date_day)
|
||||
|
||||
|
||||
@API.register
|
||||
def list_log_funcs_at_day(date_day: str, group: str) -> list[LogFuncDir]:
|
||||
"""List all logs for a specific function on a specific day."""
|
||||
assert LOG_MANAGER_INSTANCE is not None
|
||||
group_dir = LogGroupDir(date_day, group, LOG_MANAGER_INSTANCE.base_dir)
|
||||
return group_dir.get_log_files()
|
||||
def get_log_file(
|
||||
id_key: str, selector: list[str] | None = None, date_day: str | None = None
|
||||
) -> str:
|
||||
"""Get the contents of a specific log file by operation key.
|
||||
|
||||
Args:
|
||||
id_key: The operation key to search for.
|
||||
selector: Optional group path to search in. If None, searches all groups.
|
||||
date_day: Optional specific date to search in (YYYY-MM-DD format). If None, searches all days.
|
||||
|
||||
@API.register
|
||||
def list_log_files(date_day: str, group: str, func_name: str) -> list[LogFile]:
|
||||
"""List all log files for a specific function on a specific day."""
|
||||
assert LOG_MANAGER_INSTANCE is not None
|
||||
func_dir = LogFuncDir(date_day, group, func_name, LOG_MANAGER_INSTANCE.base_dir)
|
||||
return func_dir.get_log_files()
|
||||
Returns:
|
||||
The contents of the log file as a string.
|
||||
|
||||
|
||||
@API.register
|
||||
def get_log_file(id_key: str, group: str | None = None) -> str:
|
||||
"""Get a specific log file by op_key, function name and day."""
|
||||
Raises:
|
||||
ClanError: If the log file is not found.
|
||||
AssertionError: If LOG_MANAGER_INSTANCE is not initialized.
|
||||
"""
|
||||
assert LOG_MANAGER_INSTANCE is not None
|
||||
|
||||
log_file = LOG_MANAGER_INSTANCE.get_log_file(id_key, specific_group=group)
|
||||
log_file = LOG_MANAGER_INSTANCE.get_log_file(
|
||||
op_key=id_key, selector=selector, date_day=date_day
|
||||
)
|
||||
if log_file is None:
|
||||
return ""
|
||||
|
||||
file_path = log_file.get_file_path()
|
||||
if not file_path.exists():
|
||||
msg = f"Log file {file_path} does not exist."
|
||||
msg = f"Log file with op_key '{id_key}' not found in selector '{selector}' and date_day '{date_day}'."
|
||||
raise ClanError(msg)
|
||||
|
||||
return file_path.read_text()
|
||||
return log_file.get_file_path().read_text()
|
||||
|
||||
98
pkgs/clan-cli/clan_lib/log_manager/example_usage.py
Executable file
98
pkgs/clan-cli/clan_lib/log_manager/example_usage.py
Executable file
@@ -0,0 +1,98 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple LogManager example with filter function.
|
||||
|
||||
This demonstrates:
|
||||
- Dynamic group names with URL encoding
|
||||
- Hierarchical structure navigation using the filter function
|
||||
- Pattern: clans -> <dynamic_name> -> machines -> <dynamic_name>
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from clan_lib.log_manager import LogGroupConfig, LogManager
|
||||
|
||||
|
||||
def example_function() -> None:
|
||||
"""Example function for creating logs."""
|
||||
|
||||
|
||||
def deploy_machine() -> None:
|
||||
"""Function for deploying machines."""
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Simple LogManager demonstration with filter function."""
|
||||
# Setup
|
||||
log_manager = LogManager(base_dir=Path("/tmp/clan_logs"))
|
||||
|
||||
# Configure structure: clans -> <dynamic> -> machines -> <dynamic>
|
||||
clans_config = LogGroupConfig("clans", "Clans")
|
||||
machines_config = LogGroupConfig("machines", "Machines")
|
||||
clans_config = clans_config.add_child(machines_config)
|
||||
log_manager = log_manager.add_root_group_config(clans_config)
|
||||
|
||||
print("=== LogManager Filter Function Example ===\n")
|
||||
|
||||
# Create some example logs
|
||||
repos = ["/home/user/Projects/qubasas_clan", "https://github.com/qubasa/myclan"]
|
||||
machines = ["wintux", "demo", "gchq-local"]
|
||||
|
||||
for repo in repos:
|
||||
for machine in machines:
|
||||
log_manager.create_log_file(
|
||||
deploy_machine,
|
||||
f"deploy_{machine}",
|
||||
["clans", repo, "machines", machine],
|
||||
)
|
||||
|
||||
print("Created log files for multiple repos and machines\n")
|
||||
|
||||
# Demonstrate filter function
|
||||
print("=== Using the filter() function ===")
|
||||
|
||||
# 1. List top-level groups
|
||||
top_level = log_manager.filter([])
|
||||
print(f"1. Top-level groups: {top_level}")
|
||||
|
||||
# 2. List all repositories under 'clans'
|
||||
clans_repos = log_manager.filter(["clans"])
|
||||
print(f"2. Repositories under clans: {clans_repos}")
|
||||
|
||||
# 3. List machines under first repository
|
||||
if clans_repos:
|
||||
first_repo = clans_repos[0]
|
||||
repo_machines = log_manager.filter(["clans", first_repo, "machines"])
|
||||
print(f"3. Machines under '{first_repo}': {repo_machines}")
|
||||
|
||||
# 4. List machines under second repository
|
||||
if len(clans_repos) > 1:
|
||||
second_repo = clans_repos[1]
|
||||
repo_machines = log_manager.filter(["clans", second_repo, "machines"])
|
||||
print(f"4. Machines under '{second_repo}': {repo_machines}")
|
||||
|
||||
print("\n=== Using get_log_file with arrays ===")
|
||||
# Demonstrate the new array-based get_log_file functionality
|
||||
if clans_repos and len(clans_repos) > 0:
|
||||
specific_log = log_manager.get_log_file(
|
||||
"deploy_wintux",
|
||||
selector=["clans", clans_repos[0], "machines", "wintux"],
|
||||
)
|
||||
if specific_log:
|
||||
print(
|
||||
f"5. Found specific log: {specific_log.op_key} in {specific_log.func_name}"
|
||||
)
|
||||
else:
|
||||
print("5. Specific log not found")
|
||||
|
||||
print("\n=== Key Features ===")
|
||||
print("✓ Dynamic names with special chars (/, spaces, etc.) work")
|
||||
print("✓ Names are URL encoded in filesystem but returned decoded")
|
||||
print("✓ Filter function navigates hierarchy with simple arrays")
|
||||
print("✓ get_log_file now accepts specific_group as array")
|
||||
print("✓ Empty array [] lists top-level groups")
|
||||
print("✓ Odd indices are dynamic, even indices are structure")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
File diff suppressed because it is too large
Load Diff
187
pkgs/clan-cli/clan_lib/log_manager/test_url_encoding.py
Normal file
187
pkgs/clan-cli/clan_lib/log_manager/test_url_encoding.py
Normal file
@@ -0,0 +1,187 @@
|
||||
# Test file specifically for URL encoding functionality
|
||||
import urllib.parse
|
||||
from pathlib import Path
|
||||
|
||||
from clan_lib.log_manager import LogGroupConfig, LogManager
|
||||
|
||||
|
||||
def sample_function() -> None:
|
||||
"""Sample function for testing."""
|
||||
|
||||
|
||||
class TestURLEncoding:
|
||||
"""Test URL encoding for dynamic group names."""
|
||||
|
||||
def test_dynamic_name_url_encoding_forward_slash(self, tmp_path: Path) -> None:
|
||||
"""Test that dynamic names with forward slashes get URL encoded."""
|
||||
log_manager = LogManager(base_dir=tmp_path)
|
||||
|
||||
# Register structure elements
|
||||
clans_config = LogGroupConfig("clans", "Clans")
|
||||
default_config = LogGroupConfig("default", "Default")
|
||||
clans_config = clans_config.add_child(default_config)
|
||||
log_manager = log_manager.add_root_group_config(clans_config)
|
||||
|
||||
# Use a dynamic name with forward slashes
|
||||
dynamic_name = "/home/user/Projects/qubasas_clan"
|
||||
group_path = ["clans", dynamic_name, "default"]
|
||||
|
||||
log_file = log_manager.create_log_file(sample_function, "test_op", group_path)
|
||||
|
||||
# Check that the LogFile uses encoded path for file system operations
|
||||
file_path = log_file.get_file_path()
|
||||
expected_encoded = urllib.parse.quote(dynamic_name, safe="")
|
||||
|
||||
# Verify the encoded name appears in the file path
|
||||
assert expected_encoded in str(file_path)
|
||||
assert file_path.exists()
|
||||
|
||||
# Verify that no intermediate directories were created from the forward slashes
|
||||
# The encoded name should be a single directory
|
||||
day_dir = tmp_path / log_file.date_day / "clans"
|
||||
direct_children = [p.name for p in day_dir.iterdir() if p.is_dir()]
|
||||
assert len(direct_children) == 1
|
||||
assert direct_children[0] == expected_encoded
|
||||
|
||||
def test_dynamic_name_url_encoding_special_characters(self, tmp_path: Path) -> None:
|
||||
"""Test URL encoding of dynamic names with various special characters."""
|
||||
log_manager = LogManager(base_dir=tmp_path)
|
||||
|
||||
# Register structure elements
|
||||
clans_config = LogGroupConfig("clans", "Clans")
|
||||
machines_config = LogGroupConfig("machines", "Machines")
|
||||
clans_config = clans_config.add_child(machines_config)
|
||||
log_manager = log_manager.add_root_group_config(clans_config)
|
||||
|
||||
# Test various special characters
|
||||
test_cases = [
|
||||
"repo with spaces",
|
||||
"repo&with&ersands",
|
||||
"repo!with!exclamations",
|
||||
"repo%with%percent",
|
||||
"repo@with@symbols",
|
||||
"repo#with#hash",
|
||||
"repo+with+plus",
|
||||
]
|
||||
|
||||
for dynamic_name in test_cases:
|
||||
group_path = ["clans", dynamic_name, "machines", f"machine-{dynamic_name}"]
|
||||
|
||||
log_file = log_manager.create_log_file(
|
||||
sample_function, f"test_{dynamic_name}", group_path
|
||||
)
|
||||
|
||||
# Check that the file was created and encoded names appear in path
|
||||
file_path = log_file.get_file_path()
|
||||
assert file_path.exists()
|
||||
|
||||
# Verify encoding for both dynamic elements (indices 1 and 3)
|
||||
expected_encoded_repo = urllib.parse.quote(dynamic_name, safe="")
|
||||
expected_encoded_machine = urllib.parse.quote(
|
||||
f"machine-{dynamic_name}", safe=""
|
||||
)
|
||||
|
||||
assert expected_encoded_repo in str(file_path)
|
||||
assert expected_encoded_machine in str(file_path)
|
||||
|
||||
def test_structure_elements_not_encoded(self, tmp_path: Path) -> None:
|
||||
"""Test that structure elements (even indices) are NOT URL encoded."""
|
||||
log_manager = LogManager(base_dir=tmp_path)
|
||||
|
||||
# Register structure elements with special characters in their names
|
||||
# (though this is not typical, testing to ensure they're not encoded)
|
||||
test_config = LogGroupConfig("test-group", "Test Group")
|
||||
sub_config = LogGroupConfig("sub-group", "Sub Group")
|
||||
test_config = test_config.add_child(sub_config)
|
||||
log_manager = log_manager.add_root_group_config(test_config)
|
||||
|
||||
# Use structure names that contain hyphens (common case)
|
||||
group_path = ["test-group", "dynamic-name", "sub-group", "another-dynamic"]
|
||||
|
||||
log_file = log_manager.create_log_file(sample_function, "test_op", group_path)
|
||||
file_path = log_file.get_file_path()
|
||||
|
||||
# Structure elements should NOT be encoded
|
||||
assert "test-group" in str(file_path) # Structure element, not encoded
|
||||
assert "sub-group" in str(file_path) # Structure element, not encoded
|
||||
|
||||
# Dynamic elements should be encoded
|
||||
expected_dynamic1 = urllib.parse.quote("dynamic-name", safe="")
|
||||
expected_dynamic2 = urllib.parse.quote("another-dynamic", safe="")
|
||||
assert expected_dynamic1 in str(file_path)
|
||||
assert expected_dynamic2 in str(file_path)
|
||||
|
||||
def test_url_encoding_with_unicode_characters(self, tmp_path: Path) -> None:
|
||||
"""Test URL encoding with Unicode characters in dynamic names."""
|
||||
log_manager = LogManager(base_dir=tmp_path)
|
||||
|
||||
# Register structure elements
|
||||
clans_config = LogGroupConfig("clans", "Clans")
|
||||
default_config = LogGroupConfig("default", "Default")
|
||||
clans_config = clans_config.add_child(default_config)
|
||||
log_manager = log_manager.add_root_group_config(clans_config)
|
||||
|
||||
# Use Unicode characters in dynamic name
|
||||
dynamic_name = "项目/中文/测试" # Chinese characters with slashes
|
||||
group_path = ["clans", dynamic_name, "default"]
|
||||
|
||||
log_file = log_manager.create_log_file(
|
||||
sample_function, "unicode_test", group_path
|
||||
)
|
||||
file_path = log_file.get_file_path()
|
||||
|
||||
# Check that file was created and Unicode was properly encoded
|
||||
assert file_path.exists()
|
||||
expected_encoded = urllib.parse.quote(dynamic_name, safe="")
|
||||
assert expected_encoded in str(file_path)
|
||||
|
||||
# Verify no intermediate directories from slashes in Unicode string
|
||||
day_dir = tmp_path / log_file.date_day / "clans"
|
||||
direct_children = [p.name for p in day_dir.iterdir() if p.is_dir()]
|
||||
assert len(direct_children) == 1
|
||||
assert direct_children[0] == expected_encoded
|
||||
|
||||
def test_backward_compatibility_single_element_paths(self, tmp_path: Path) -> None:
|
||||
"""Test that single-element paths (no dynamic names) still work."""
|
||||
log_manager = LogManager(base_dir=tmp_path)
|
||||
|
||||
# Register simple structure
|
||||
default_config = LogGroupConfig("default", "Default")
|
||||
log_manager = log_manager.add_root_group_config(default_config)
|
||||
|
||||
# Use simple single-element path (no dynamic names to encode)
|
||||
group_path = ["default"]
|
||||
|
||||
log_file = log_manager.create_log_file(
|
||||
sample_function, "simple_test", group_path
|
||||
)
|
||||
file_path = log_file.get_file_path()
|
||||
|
||||
# Should work exactly as before
|
||||
assert file_path.exists()
|
||||
assert "default" in str(file_path)
|
||||
# No encoding should have occurred
|
||||
assert urllib.parse.quote("default", safe="") == "default" # No special chars
|
||||
|
||||
def test_empty_dynamic_name_encoding(self, tmp_path: Path) -> None:
|
||||
"""Test URL encoding with empty string as dynamic name."""
|
||||
log_manager = LogManager(base_dir=tmp_path)
|
||||
|
||||
# Register structure elements
|
||||
clans_config = LogGroupConfig("clans", "Clans")
|
||||
default_config = LogGroupConfig("default", "Default")
|
||||
clans_config = clans_config.add_child(default_config)
|
||||
log_manager = log_manager.add_root_group_config(clans_config)
|
||||
|
||||
# Use empty string as dynamic name
|
||||
group_path = ["clans", "", "default"]
|
||||
|
||||
log_file = log_manager.create_log_file(
|
||||
sample_function, "empty_test", group_path
|
||||
)
|
||||
file_path = log_file.get_file_path()
|
||||
|
||||
# Should work - empty string gets encoded as empty string
|
||||
assert file_path.exists()
|
||||
expected_encoded = urllib.parse.quote("", safe="")
|
||||
assert expected_encoded == "" # Empty string encodes to empty string
|
||||
@@ -1,4 +1,5 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import TypedDict
|
||||
|
||||
from clan_lib.api import API
|
||||
from clan_lib.errors import ClanError
|
||||
@@ -10,15 +11,44 @@ from clan_lib.persist.inventory_store import InventoryStore
|
||||
from clan_lib.persist.util import set_value_by_path
|
||||
|
||||
|
||||
class MachineFilter(TypedDict):
|
||||
tags: list[str]
|
||||
|
||||
|
||||
class ListOptions(TypedDict):
|
||||
filter: MachineFilter
|
||||
|
||||
|
||||
@API.register
|
||||
def list_machines(flake: Flake) -> dict[str, InventoryMachine]:
|
||||
def list_machines(
|
||||
flake: Flake, opts: ListOptions | None = None
|
||||
) -> dict[str, InventoryMachine]:
|
||||
"""
|
||||
List machines in the inventory for the UI.
|
||||
List machines of a clan
|
||||
|
||||
Usage Example:
|
||||
|
||||
machines = list_machines(flake, {"filter": {"tags": ["foo" "bar"]}})
|
||||
|
||||
lists only machines that include both "foo" AND "bar"
|
||||
|
||||
"""
|
||||
inventory_store = InventoryStore(flake=flake)
|
||||
inventory = inventory_store.read()
|
||||
|
||||
machines = inventory.get("machines", {})
|
||||
|
||||
if opts and opts.get("filter"):
|
||||
filtered_machines = {}
|
||||
filter_tags = opts.get("filter", {}).get("tags", [])
|
||||
|
||||
for machine_name, machine in machines.items():
|
||||
machine_tags = machine.get("tags", [])
|
||||
if all(ft in machine_tags for ft in filter_tags):
|
||||
filtered_machines[machine_name] = machine
|
||||
|
||||
return filtered_machines
|
||||
|
||||
return machines
|
||||
|
||||
|
||||
|
||||
0
pkgs/clan-cli/clan_lib/machines/actions_test.py
Normal file
0
pkgs/clan-cli/clan_lib/machines/actions_test.py
Normal file
@@ -16,35 +16,39 @@ from clan_lib.nix_models.clan import InventoryMachine
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def convert_inventory_to_machines(
|
||||
flake: Flake, machines: dict[str, InventoryMachine]
|
||||
) -> dict[str, Machine]:
|
||||
return {
|
||||
name: Machine.from_inventory(name, flake, inventory_machine)
|
||||
for name, inventory_machine in machines.items()
|
||||
}
|
||||
|
||||
|
||||
def list_full_machines(flake: Flake) -> dict[str, Machine]:
|
||||
"""
|
||||
Like `list_machines`, but returns a full 'machine' instance for each machine.
|
||||
"""
|
||||
machines = list_machines(flake)
|
||||
|
||||
res: dict[str, Machine] = {}
|
||||
|
||||
for name in machines:
|
||||
machine = Machine(name=name, flake=flake)
|
||||
res[machine.name] = machine
|
||||
|
||||
return res
|
||||
return convert_inventory_to_machines(flake, machines)
|
||||
|
||||
|
||||
def query_machines_by_tags(flake: Flake, tags: list[str]) -> dict[str, Machine]:
|
||||
def query_machines_by_tags(
|
||||
flake: Flake, tags: list[str]
|
||||
) -> dict[str, InventoryMachine]:
|
||||
"""
|
||||
Query machines by their respective tags, if multiple tags are specified
|
||||
then only machines that have those respective tags specified will be listed.
|
||||
It is an intersection of the tags and machines.
|
||||
"""
|
||||
machines = list_full_machines(flake)
|
||||
machines = list_machines(flake)
|
||||
|
||||
filtered_machines = {}
|
||||
for machine in machines.values():
|
||||
inv_machine = get_machine(machine.flake, machine.name)
|
||||
machine_tags = inv_machine.get("tags", [])
|
||||
for machine_name, machine in machines.items():
|
||||
machine_tags = machine.get("tags", [])
|
||||
if all(tag in machine_tags for tag in tags):
|
||||
filtered_machines[machine.name] = machine
|
||||
filtered_machines[machine_name] = machine
|
||||
|
||||
return filtered_machines
|
||||
|
||||
|
||||
@@ -29,6 +29,15 @@ class Machine:
|
||||
name: str
|
||||
flake: Flake
|
||||
|
||||
@classmethod
|
||||
def from_inventory(
|
||||
cls,
|
||||
name: str,
|
||||
flake: Flake,
|
||||
_inventory_machine: InventoryMachine,
|
||||
) -> "Machine":
|
||||
return cls(name=name, flake=flake)
|
||||
|
||||
def get_inv_machine(self) -> "InventoryMachine":
|
||||
return get_machine(self.flake, self.name)
|
||||
|
||||
@@ -166,7 +175,7 @@ class Machine:
|
||||
@dataclass(frozen=True)
|
||||
class RemoteSource:
|
||||
data: Remote
|
||||
source: Literal["inventory", "nix_machine"]
|
||||
source: Literal["inventory", "machine"]
|
||||
|
||||
|
||||
@API.register
|
||||
@@ -179,15 +188,15 @@ def get_host(
|
||||
machine = Machine(name=name, flake=flake)
|
||||
inv_machine = machine.get_inv_machine()
|
||||
|
||||
source: Literal["inventory", "nix_machine"] = "inventory"
|
||||
source: Literal["inventory", "machine"] = "inventory"
|
||||
host_str = inv_machine.get("deploy", {}).get(field)
|
||||
|
||||
if host_str is None:
|
||||
machine.debug(
|
||||
f"'{field}' is not set in inventory, falling back to slower Nix config, set it either through the Nix or json interface to improve performance"
|
||||
machine.warn(
|
||||
f"'{field}' is not set in `inventory.machines.${name}.deploy.targetHost` - falling back to _slower_ nixos option: `clan.core.networking.targetHost`"
|
||||
)
|
||||
host_str = machine.select(f'config.clan.core.networking."{field}"')
|
||||
source = "nix_machine"
|
||||
source = "machine"
|
||||
|
||||
if not host_str:
|
||||
return None
|
||||
|
||||
@@ -100,6 +100,20 @@
|
||||
cp ${self'.legacyPackages.schemas.inventory}/* $out
|
||||
'';
|
||||
};
|
||||
clan-lib-openapi = pkgs.stdenv.mkDerivation {
|
||||
name = "clan-lib-openapi";
|
||||
src = ./.;
|
||||
|
||||
buildInputs = [
|
||||
pkgs.python3
|
||||
];
|
||||
|
||||
installPhase = ''
|
||||
export INPUT_PATH=${self'.packages.clan-ts-api}/API.json
|
||||
python openapi.py
|
||||
cp openapi.json $out
|
||||
'';
|
||||
};
|
||||
|
||||
default = self'.packages.clan-cli;
|
||||
};
|
||||
|
||||
191
pkgs/clan-cli/openapi.py
Normal file
191
pkgs/clan-cli/openapi.py
Normal file
@@ -0,0 +1,191 @@
|
||||
import json
|
||||
import os
|
||||
from copy import deepcopy
|
||||
from pathlib import Path
|
||||
|
||||
# !!! IMPORTANT !!!
|
||||
# AVOID VERBS NOT IN THIS LIST
|
||||
# We might restrict this even further to build a consistent and easy to use API
|
||||
COMMON_VERBS = {
|
||||
"get",
|
||||
"list",
|
||||
"show",
|
||||
"set",
|
||||
"create",
|
||||
"update",
|
||||
"delete",
|
||||
"generate",
|
||||
"maybe",
|
||||
"open",
|
||||
"flash",
|
||||
"install",
|
||||
"deploy",
|
||||
"check",
|
||||
"cancel",
|
||||
}
|
||||
|
||||
|
||||
def is_verb(word: str) -> bool:
|
||||
return word in COMMON_VERBS
|
||||
|
||||
|
||||
def singular(word: str) -> str:
|
||||
if word.endswith("ies"):
|
||||
return word[:-3] + "y"
|
||||
if word.endswith("ses"):
|
||||
return word[:-2]
|
||||
if word.endswith("s") and not word.endswith("ss"):
|
||||
return word[:-1]
|
||||
return word
|
||||
|
||||
|
||||
def normalize_tag(parts: list[str]) -> list[str]:
|
||||
# parts contains [ VERB NOUN NOUN ... ]
|
||||
# Where each NOUN is a SUB-RESOURCE
|
||||
verb = parts[0]
|
||||
|
||||
nouns = parts[1:]
|
||||
if not nouns:
|
||||
nouns = ["misc"]
|
||||
# msg = "Operation names MUST have at least one NOUN"
|
||||
# raise Error(msg)
|
||||
nouns = [singular(p).capitalize() for p in nouns]
|
||||
return [verb, *nouns]
|
||||
|
||||
|
||||
def operation_to_tag(op_name: str) -> str:
|
||||
def check_operation_name(verb: str, _resource_nouns: list[str]) -> None:
|
||||
if not is_verb(verb):
|
||||
print(
|
||||
f"""⚠️ WARNING: Verb '{op_name}' of API operation {op_name} is not allowed.
|
||||
Use one of: {", ".join(COMMON_VERBS)}
|
||||
"""
|
||||
)
|
||||
|
||||
parts = op_name.lower().split("_")
|
||||
normalized = normalize_tag(parts)
|
||||
|
||||
check_operation_name(normalized[0], normalized[1:])
|
||||
|
||||
return " / ".join(normalized[1:])
|
||||
|
||||
|
||||
def fix_nullables(schema: dict) -> dict:
|
||||
if isinstance(schema, dict):
|
||||
# If 'oneOf' present
|
||||
if "oneOf" in schema and isinstance(schema["oneOf"], list):
|
||||
# Filter out 'type:null' schemas
|
||||
non_nulls = [s for s in schema["oneOf"] if s.get("type") != "null"]
|
||||
if len(non_nulls) == 1:
|
||||
# Only one non-null schema remains - convert to that + nullable:true
|
||||
new_schema = deepcopy(non_nulls[0])
|
||||
new_schema["nullable"] = True
|
||||
# Merge any other keys from original schema except oneOf
|
||||
for k, v in schema.items():
|
||||
if k != "oneOf":
|
||||
new_schema[k] = v
|
||||
return fix_nullables(new_schema)
|
||||
# More than one non-null, keep oneOf without nulls
|
||||
schema["oneOf"] = non_nulls
|
||||
return {k: fix_nullables(v) for k, v in schema.items()}
|
||||
# Recursively fix nested schemas
|
||||
return {k: fix_nullables(v) for k, v in schema.items()}
|
||||
if isinstance(schema, list):
|
||||
return [fix_nullables(i) for i in schema]
|
||||
return schema
|
||||
|
||||
|
||||
def fix_error_refs(schema: dict) -> None:
|
||||
if isinstance(schema, dict):
|
||||
for key, value in schema.items():
|
||||
if key == "$ref" and value == "#/$defs/error":
|
||||
schema[key] = "#/components/schemas/error"
|
||||
else:
|
||||
fix_error_refs(value)
|
||||
elif isinstance(schema, list):
|
||||
for item in schema:
|
||||
fix_error_refs(item)
|
||||
|
||||
|
||||
# === Helper to make reusable schema names ===
|
||||
def make_schema_name(func_name: str, part: str) -> str:
|
||||
return f"{func_name}_{part}"
|
||||
|
||||
|
||||
def main() -> None:
|
||||
input_path = Path(os.environ["INPUT_PATH"])
|
||||
|
||||
# === Load input JSON Schema ===
|
||||
with input_path.open() as f:
|
||||
schema = json.load(f)
|
||||
|
||||
defs = schema.get("$defs", {})
|
||||
functions = schema["properties"]
|
||||
|
||||
# === Start OpenAPI 3.0 spec in JSON ===
|
||||
openapi = {
|
||||
"openapi": "3.0.3",
|
||||
"info": {
|
||||
"title": "Function-Based Python API",
|
||||
"version": "1.0.0",
|
||||
"description": "!!! INTERNAL USE ONLY !!! We don't provide a world usable API yet.\nThis prototype maps python function calls to POST Requests because we are planning towards RESTfull API in the future.",
|
||||
},
|
||||
"paths": {},
|
||||
"components": {"schemas": {}},
|
||||
}
|
||||
|
||||
# === Convert each function ===
|
||||
for func_name, func_schema in functions.items():
|
||||
args_schema = fix_nullables(deepcopy(func_schema["properties"]["arguments"]))
|
||||
return_schema = fix_nullables(deepcopy(func_schema["properties"]["return"]))
|
||||
fix_error_refs(return_schema)
|
||||
# Register schemas under components
|
||||
args_name = make_schema_name(func_name, "args")
|
||||
return_name = make_schema_name(func_name, "return")
|
||||
openapi["components"]["schemas"][args_name] = args_schema # type: ignore
|
||||
openapi["components"]["schemas"][return_name] = return_schema # type: ignore
|
||||
tag = operation_to_tag(func_name)
|
||||
# Create a POST endpoint for the function
|
||||
openapi["paths"][f"/{func_name}"] = { # type: ignore
|
||||
"post": {
|
||||
"summary": func_name,
|
||||
"operationId": func_name,
|
||||
"tags": [tag],
|
||||
"requestBody": {
|
||||
"required": True,
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {"$ref": f"#/components/schemas/{args_name}"}
|
||||
}
|
||||
},
|
||||
},
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful response",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": f"#/components/schemas/{return_name}"
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
# === Add global definitions from $defs ===
|
||||
for def_name, def_schema in defs.items():
|
||||
fixed_schema = fix_nullables(deepcopy(def_schema))
|
||||
fix_error_refs(fixed_schema)
|
||||
openapi["components"]["schemas"][def_name] = fixed_schema # type: ignore
|
||||
|
||||
# === Write to output JSON ===
|
||||
with Path("openapi.json").open("w") as f:
|
||||
json.dump(openapi, f, indent=2)
|
||||
|
||||
print("✅ OpenAPI 3.0 JSON written to openapi.json")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -16,6 +16,7 @@ mkShell {
|
||||
with ps;
|
||||
[
|
||||
mypy
|
||||
pytest-cov
|
||||
]
|
||||
++ (clan-cli.devshellPyDeps ps)
|
||||
))
|
||||
|
||||
Reference in New Issue
Block a user