clan-cli: Improved --debug output by indenting commands, add TRACE_DEPTH environment variable

This commit is contained in:
Qubasa
2024-11-11 21:22:25 +07:00
parent df2aa78745
commit 2df1179666
5 changed files with 111 additions and 33 deletions

View File

@@ -1,13 +1,24 @@
import logging
import os
import shlex
from clan_app import main
from clan_cli.custom_logger import get_caller
from clan_cli.custom_logger import get_callers
log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
def run(args: list[str]) -> None:
cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd} \nCaller: {get_caller()}")
print_trace(f"$ {cmd}")
main(args)

View File

@@ -14,9 +14,9 @@ from enum import Enum
from pathlib import Path
from typing import IO, Any
from clan_cli.errors import ClanError
from clan_cli.errors import ClanError, indent_command
from .custom_logger import get_caller
from .custom_logger import get_callers
from .errors import ClanCmdError, CmdOut
logger = logging.getLogger(__name__)
@@ -155,14 +155,33 @@ def run(
) -> CmdOut:
if cwd is None:
cwd = Path.cwd()
if input:
logger.debug(
f"""$: echo "{input.decode('utf-8', 'replace')}" | {shlex.join(cmd)} \nCaller: {get_caller()}"""
)
else:
logger.debug(f"$: {shlex.join(cmd)} \nCaller: {get_caller()}")
start = timeit.default_timer()
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(3, 4 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = callers[1:]
else:
callers.pop()
if len(callers) == 1:
callers_str = f"Caller: {callers[0]}\n"
else:
callers_str = "\n".join(
f"{i+1}: {caller}" for i, caller in enumerate(callers)
)
callers_str = f"Callers:\n{callers_str}"
logger.debug(f"{msg} \n{callers_str}")
if input:
print_trace(
f"$: echo '{input.decode('utf-8', 'replace')}' | {indent_command(cmd)}"
)
elif logger.isEnabledFor(logging.DEBUG):
print_trace(f"$: {indent_command(cmd)}")
start = timeit.default_timer()
with ExitStack() as stack:
process = stack.enter_context(
subprocess.Popen(
@@ -223,7 +242,7 @@ def run_no_stdout(
"""
if cwd is None:
cwd = Path.cwd()
if logging.getLogger(__name__.split(".")[0]).isEnabledFor(logging.DEBUG):
if logger.isEnabledFor(logging.DEBUG):
return run(cmd, env=env, log=log, check=check, error_msg=error_msg)
log = Log.NONE
return run(

View File

@@ -1,5 +1,6 @@
import inspect
import logging
import os
from collections.abc import Callable
from pathlib import Path
from typing import Any
@@ -57,26 +58,49 @@ class ThreadFormatter(logging.Formatter):
return FORMATTER[record.levelno](record, False).format(record)
def get_caller() -> str:
def get_callers(start: int = 2, end: int = 2) -> list[str]:
"""
Get a list of caller information for a given range in the call stack.
:param start: The starting position in the call stack (1 being directly above in the call stack).
:param end: The end position in the call stack.
:return: A list of strings, each containing the file, line number, and function of the caller.
"""
frame = inspect.currentframe()
if frame is None:
return "unknown"
caller_frame = frame.f_back
if caller_frame is None:
return "unknown"
caller_frame = caller_frame.f_back
if caller_frame is None:
return "unknown"
frame_info = inspect.getframeinfo(caller_frame)
return ["unknown"]
try:
filepath = Path(frame_info.filename).resolve()
filepath = Path("~", filepath.relative_to(Path.home()))
except Exception:
filepath = Path(frame_info.filename)
callers = []
current_frame = frame.f_back # start from the caller of this function
ret = f"{filepath}:{frame_info.lineno}::{frame_info.function}"
return ret
# Skip `start - 1` frames.
for _ in range(start - 1):
if current_frame is not None:
current_frame = current_frame.f_back
else:
# If there aren't enough frames, return what we have as "unknown".
return ["unknown"] * (end - start + 1)
# Collect frame info until the `end` position.
for _ in range(end - start + 1):
if current_frame is not None:
frame_info = inspect.getframeinfo(current_frame)
try:
filepath = Path(frame_info.filename).resolve()
filepath = Path("~", filepath.relative_to(Path.home()))
except Exception:
filepath = Path(frame_info.filename)
ret = f"{filepath}:{frame_info.lineno}::{frame_info.function}"
callers.append(ret)
current_frame = current_frame.f_back
else:
# If there are no more frames but we haven't reached `end`, append "unknown".
callers.append("unknown")
return callers
def setup_logging(level: Any, root_log_name: str = __name__.split(".")[0]) -> None:
@@ -89,7 +113,8 @@ def setup_logging(level: Any, root_log_name: str = __name__.split(".")[0]) -> No
# Create and add your custom handler
default_handler.setLevel(level)
default_handler.setFormatter(CustomFormatter(str(level) == str(logging.DEBUG)))
trace_depth = bool(int(os.environ.get("TRACE_DEPTH", "0")))
default_handler.setFormatter(CustomFormatter(trace_depth))
main_logger.addHandler(default_handler)
# Set logging level for other modules used by this module

View File

@@ -1,18 +1,30 @@
import argparse
import logging
import os
import shlex
from clan_cli import create_parser
from clan_cli.custom_logger import get_caller
from clan_cli.custom_logger import get_callers
log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
def run(args: list[str]) -> argparse.Namespace:
parser = create_parser(prog="clan")
parsed = parser.parse_args(args)
cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd} \nCaller: {get_caller()}")
print_trace(f"$ {cmd}")
if hasattr(parsed, "func"):
parsed.func(parsed)
return parsed

View File

@@ -1,14 +1,25 @@
import logging
import os
import shlex
from clan_cli.custom_logger import get_caller
from clan_cli.custom_logger import get_callers
from clan_vm_manager import main
log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
class Cli:
def run(self, args: list[str]) -> None:
cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd} \nCaller: {get_caller()}")
print_trace(f"$ {cmd}")
main(args)