Files
clan-core/pkgs/clan-cli/clan_cli/custom_logger.py

123 lines
3.9 KiB
Python

import inspect
import logging
import os
from collections.abc import Callable
from pathlib import Path
from typing import Any
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
red = "\x1b[31;20m"
bold_red = "\x1b[31;1m"
green = "\u001b[32m"
blue = "\u001b[34m"
def get_formatter(color: str) -> Callable[[logging.LogRecord, bool], logging.Formatter]:
def myformatter(
record: logging.LogRecord, with_location: bool
) -> logging.Formatter:
reset = "\x1b[0m"
try:
filepath = Path(record.pathname).resolve()
filepath = Path("~", filepath.relative_to(Path.home()))
except Exception:
filepath = Path(record.pathname)
if not with_location:
return logging.Formatter(f"{color}%(levelname)s{reset}: %(message)s")
return logging.Formatter(
f"{color}%(levelname)s{reset}: %(message)s\nLocation: {filepath}:%(lineno)d::%(funcName)s\n"
)
return myformatter
FORMATTER = {
logging.DEBUG: get_formatter(blue),
logging.INFO: get_formatter(green),
logging.WARNING: get_formatter(yellow),
logging.ERROR: get_formatter(red),
logging.CRITICAL: get_formatter(bold_red),
}
class CustomFormatter(logging.Formatter):
def __init__(self, log_locations: bool) -> None:
super().__init__()
self.log_locations = log_locations
def format(self, record: logging.LogRecord) -> str:
return FORMATTER[record.levelno](record, self.log_locations).format(record)
class ThreadFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return FORMATTER[record.levelno](record, False).format(record)
def get_callers(start: int = 2, end: int = 2) -> list[str]:
"""
Get a list of caller information for a given range in the call stack.
:param start: The starting position in the call stack (1 being directly above in the call stack).
:param end: The end position in the call stack.
:return: A list of strings, each containing the file, line number, and function of the caller.
"""
frame = inspect.currentframe()
if frame is None:
return ["unknown"]
callers = []
current_frame = frame.f_back # start from the caller of this function
# Skip `start - 1` frames.
for _ in range(start - 1):
if current_frame is not None:
current_frame = current_frame.f_back
else:
# If there aren't enough frames, return what we have as "unknown".
return ["unknown"] * (end - start + 1)
# Collect frame info until the `end` position.
for _ in range(end - start + 1):
if current_frame is not None:
frame_info = inspect.getframeinfo(current_frame)
try:
filepath = Path(frame_info.filename).resolve()
filepath = Path("~", filepath.relative_to(Path.home()))
except Exception:
filepath = Path(frame_info.filename)
ret = f"{filepath}:{frame_info.lineno}::{frame_info.function}"
callers.append(ret)
current_frame = current_frame.f_back
else:
# If there are no more frames but we haven't reached `end`, append "unknown".
callers.append("unknown")
return callers
def setup_logging(level: Any, root_log_name: str = __name__.split(".")[0]) -> None:
# Get the root logger and set its level
main_logger = logging.getLogger(root_log_name)
main_logger.setLevel(level)
# Create and add the default handler
default_handler = logging.StreamHandler()
# Create and add your custom handler
default_handler.setLevel(level)
trace_depth = bool(int(os.environ.get("TRACE_DEPTH", "0")))
default_handler.setFormatter(CustomFormatter(trace_depth))
main_logger.addHandler(default_handler)
# Set logging level for other modules used by this module
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("httpx").setLevel(level=logging.WARNING)