Merge pull request 'clan-cli: Improved --debug output by indenting commands, add TRACE_DEPTH environment variable' (#2375) from Qubasa/clan-core:Qubasa-main into main

This commit is contained in:
clan-bot
2024-11-11 14:36:56 +00:00
5 changed files with 111 additions and 33 deletions

View File

@@ -1,13 +1,24 @@
import logging import logging
import os
import shlex import shlex
from clan_app import main from clan_app import main
from clan_cli.custom_logger import get_caller from clan_cli.custom_logger import get_callers
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
def run(args: list[str]) -> None: def run(args: list[str]) -> None:
cmd = shlex.join(["clan", *args]) cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd} \nCaller: {get_caller()}") print_trace(f"$ {cmd}")
main(args) main(args)

View File

@@ -14,9 +14,9 @@ from enum import Enum
from pathlib import Path from pathlib import Path
from typing import IO, Any from typing import IO, Any
from clan_cli.errors import ClanError from clan_cli.errors import ClanError, indent_command
from .custom_logger import get_caller from .custom_logger import get_callers
from .errors import ClanCmdError, CmdOut from .errors import ClanCmdError, CmdOut
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -155,14 +155,33 @@ def run(
) -> CmdOut: ) -> CmdOut:
if cwd is None: if cwd is None:
cwd = Path.cwd() cwd = Path.cwd()
if input:
logger.debug(
f"""$: echo "{input.decode('utf-8', 'replace')}" | {shlex.join(cmd)} \nCaller: {get_caller()}"""
)
else:
logger.debug(f"$: {shlex.join(cmd)} \nCaller: {get_caller()}")
start = timeit.default_timer()
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(3, 4 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = callers[1:]
else:
callers.pop()
if len(callers) == 1:
callers_str = f"Caller: {callers[0]}\n"
else:
callers_str = "\n".join(
f"{i+1}: {caller}" for i, caller in enumerate(callers)
)
callers_str = f"Callers:\n{callers_str}"
logger.debug(f"{msg} \n{callers_str}")
if input:
print_trace(
f"$: echo '{input.decode('utf-8', 'replace')}' | {indent_command(cmd)}"
)
elif logger.isEnabledFor(logging.DEBUG):
print_trace(f"$: {indent_command(cmd)}")
start = timeit.default_timer()
with ExitStack() as stack: with ExitStack() as stack:
process = stack.enter_context( process = stack.enter_context(
subprocess.Popen( subprocess.Popen(
@@ -223,7 +242,7 @@ def run_no_stdout(
""" """
if cwd is None: if cwd is None:
cwd = Path.cwd() cwd = Path.cwd()
if logging.getLogger(__name__.split(".")[0]).isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
return run(cmd, env=env, log=log, check=check, error_msg=error_msg) return run(cmd, env=env, log=log, check=check, error_msg=error_msg)
log = Log.NONE log = Log.NONE
return run( return run(

View File

@@ -1,5 +1,6 @@
import inspect import inspect
import logging import logging
import os
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -57,17 +58,34 @@ class ThreadFormatter(logging.Formatter):
return FORMATTER[record.levelno](record, False).format(record) return FORMATTER[record.levelno](record, False).format(record)
def get_caller() -> str: def get_callers(start: int = 2, end: int = 2) -> list[str]:
"""
Get a list of caller information for a given range in the call stack.
:param start: The starting position in the call stack (1 being directly above in the call stack).
:param end: The end position in the call stack.
:return: A list of strings, each containing the file, line number, and function of the caller.
"""
frame = inspect.currentframe() frame = inspect.currentframe()
if frame is None: if frame is None:
return "unknown" return ["unknown"]
caller_frame = frame.f_back
if caller_frame is None: callers = []
return "unknown" current_frame = frame.f_back # start from the caller of this function
caller_frame = caller_frame.f_back
if caller_frame is None: # Skip `start - 1` frames.
return "unknown" for _ in range(start - 1):
frame_info = inspect.getframeinfo(caller_frame) if current_frame is not None:
current_frame = current_frame.f_back
else:
# If there aren't enough frames, return what we have as "unknown".
return ["unknown"] * (end - start + 1)
# Collect frame info until the `end` position.
for _ in range(end - start + 1):
if current_frame is not None:
frame_info = inspect.getframeinfo(current_frame)
try: try:
filepath = Path(frame_info.filename).resolve() filepath = Path(frame_info.filename).resolve()
@@ -76,7 +94,13 @@ def get_caller() -> str:
filepath = Path(frame_info.filename) filepath = Path(frame_info.filename)
ret = f"{filepath}:{frame_info.lineno}::{frame_info.function}" ret = f"{filepath}:{frame_info.lineno}::{frame_info.function}"
return ret callers.append(ret)
current_frame = current_frame.f_back
else:
# If there are no more frames but we haven't reached `end`, append "unknown".
callers.append("unknown")
return callers
def setup_logging(level: Any, root_log_name: str = __name__.split(".")[0]) -> None: def setup_logging(level: Any, root_log_name: str = __name__.split(".")[0]) -> None:
@@ -89,7 +113,8 @@ def setup_logging(level: Any, root_log_name: str = __name__.split(".")[0]) -> No
# Create and add your custom handler # Create and add your custom handler
default_handler.setLevel(level) default_handler.setLevel(level)
default_handler.setFormatter(CustomFormatter(str(level) == str(logging.DEBUG))) trace_depth = bool(int(os.environ.get("TRACE_DEPTH", "0")))
default_handler.setFormatter(CustomFormatter(trace_depth))
main_logger.addHandler(default_handler) main_logger.addHandler(default_handler)
# Set logging level for other modules used by this module # Set logging level for other modules used by this module

View File

@@ -1,18 +1,30 @@
import argparse import argparse
import logging import logging
import os
import shlex import shlex
from clan_cli import create_parser from clan_cli import create_parser
from clan_cli.custom_logger import get_caller from clan_cli.custom_logger import get_callers
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
def run(args: list[str]) -> argparse.Namespace: def run(args: list[str]) -> argparse.Namespace:
parser = create_parser(prog="clan") parser = create_parser(prog="clan")
parsed = parser.parse_args(args) parsed = parser.parse_args(args)
cmd = shlex.join(["clan", *args]) cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd} \nCaller: {get_caller()}")
print_trace(f"$ {cmd}")
if hasattr(parsed, "func"): if hasattr(parsed, "func"):
parsed.func(parsed) parsed.func(parsed)
return parsed return parsed

View File

@@ -1,14 +1,25 @@
import logging import logging
import os
import shlex import shlex
from clan_cli.custom_logger import get_caller from clan_cli.custom_logger import get_callers
from clan_vm_manager import main from clan_vm_manager import main
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
class Cli: class Cli:
def run(self, args: list[str]) -> None: def run(self, args: list[str]) -> None:
cmd = shlex.join(["clan", *args]) cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd} \nCaller: {get_caller()}") print_trace(f"$ {cmd}")
main(args) main(args)