Added ipdb as breakpoint console. Improved logging.

This commit is contained in:
Qubasa
2023-10-18 18:29:10 +02:00
parent af3f04736b
commit 9f464dd14e
10 changed files with 58 additions and 26 deletions

View File

@@ -1,6 +1,7 @@
import logging
from typing import Any, Callable
from pathlib import Path
import inspect
grey = "\x1b[38;20m"
yellow = "\x1b[33;20m"
@@ -10,10 +11,16 @@ green = "\u001b[32m"
blue = "\u001b[34m"
def get_formatter(color: str) -> Callable[[logging.LogRecord], logging.Formatter]:
def myformatter(record: logging.LogRecord) -> logging.Formatter:
def get_formatter(color: str) -> Callable[[logging.LogRecord, bool], logging.Formatter]:
def myformatter(record: logging.LogRecord, with_location: bool) -> logging.Formatter:
reset = "\x1b[0m"
filepath = Path(record.pathname).resolve()
if not with_location:
return logging.Formatter(
f"{color}%(levelname)s{reset}: %(message)s"
)
return logging.Formatter(
f"{color}%(levelname)s{reset}: %(message)s\n {filepath}:%(lineno)d::%(funcName)s\n"
)
@@ -31,11 +38,32 @@ FORMATTER = {
class CustomFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return FORMATTER[record.levelno](record).format(record)
return FORMATTER[record.levelno](record, True).format(record)
class ThreadFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return FORMATTER[record.levelno](record, False).format(record)
def get_caller() -> str:
frame = inspect.currentframe()
if frame is None:
return "unknown"
caller_frame = frame.f_back
if caller_frame is None:
return "unknown"
caller_frame = caller_frame.f_back
if caller_frame is None:
return "unknown"
frame_info = inspect.getframeinfo(caller_frame)
ret = f"{frame_info.filename}:{frame_info.lineno}::{frame_info.function}"
return ret
def register(level: Any) -> None:
ch = logging.StreamHandler()
ch.setLevel(level)
ch.setFormatter(CustomFormatter())
logging.basicConfig(level=level, handlers=[ch])
handler = logging.StreamHandler()
handler.setLevel(level)
handler.setFormatter(CustomFormatter())
logger = logging.getLogger("registerHandler")
logger.addHandler(handler)
#logging.basicConfig(level=level, handlers=[handler])

View File

@@ -95,6 +95,7 @@ def generate_secrets_from_nix(
) -> None:
generate_host_key(flake_name, machine_name)
errors = {}
with TemporaryDirectory() as d:
# if any of the secrets are missing, we regenerate all connected facts/secrets
for secret_group, secret_options in secret_submodules.items():

View File

@@ -12,6 +12,7 @@ from pathlib import Path
from typing import Any, Iterator, Optional, Type, TypeVar
from uuid import UUID, uuid4
from .custom_logger import get_caller, ThreadFormatter, CustomFormatter
from .errors import ClanError
@@ -38,7 +39,8 @@ class Command:
cwd: Optional[Path] = None,
) -> None:
self.running = True
self.log.debug(f"Running command: {shlex.join(cmd)}")
self.log.debug(f"Command: {shlex.join(cmd)}")
self.log.debug(f"Caller: {get_caller()}")
cwd_res = None
if cwd is not None:
@@ -94,7 +96,13 @@ class BaseTask:
def __init__(self, uuid: UUID, num_cmds: int) -> None:
# constructor
self.uuid: UUID = uuid
self.log = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(ThreadFormatter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)
self.log = logger
self.log = logger
self.procs: list[Command] = []
self.status = TaskStatus.NOTSTARTED
self.logs_lock = threading.Lock()

View File

@@ -13,6 +13,7 @@ from ..dirs import specific_flake_dir
from ..nix import nix_build, nix_config, nix_shell
from ..task_manager import BaseTask, Command, create_task
from .inspect import VmConfig, inspect_vm
import pydantic
class BuildVmTask(BaseTask):
@@ -58,6 +59,7 @@ class BuildVmTask(BaseTask):
env = os.environ.copy()
env["CLAN_DIR"] = str(self.vm.flake_url)
env["PYTHONPATH"] = str(
":".join(sys.path)
) # TODO do this in the clanCore module
@@ -70,7 +72,7 @@ class BuildVmTask(BaseTask):
env=env,
)
else:
cmd.run(["echo", "won't generate secrets for non local clan"])
self.log.warning("won't generate secrets for non local clan")
cmd = next(cmds)
cmd.run(

View File

@@ -12,6 +12,7 @@ log = logging.getLogger(__name__)
def validate_path(base_dir: Path, value: Path) -> Path:
user_path = (base_dir / value).resolve()
# Check if the path is within the data directory
if not str(user_path).startswith(str(base_dir)):
if not str(user_path).startswith("/tmp/pytest"):

View File

@@ -11,6 +11,8 @@
, pytest-xdist
, pytest-subprocess
, pytest-timeout
, remote-pdb
, ipdb
, python3
, runCommand
, setuptools
@@ -47,6 +49,8 @@ let
pytest-subprocess
pytest-xdist
pytest-timeout
remote-pdb
ipdb
openssh
git
gnupg

View File

@@ -20,7 +20,7 @@ clan_cli = [ "config/jsonschema/*", "webui/assets/**/*"]
testpaths = "tests"
faulthandler_timeout = 60
log_level = "DEBUG"
log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s"
log_format = "%(levelname)s: %(message)s"
addopts = "--cov . --cov-report term --cov-report html:.reports/html --no-cov-on-fail --durations 5 --color=yes --maxfail=1 --new-first -nauto" # Add --pdb for debugging
norecursedirs = "tests/helpers"
markers = [ "impure" ]

View File

@@ -43,6 +43,7 @@ mkShell {
export PATH="$tmp_path/python/bin:${checkScript}/bin:$PATH"
export PYTHONPATH="$source:$tmp_path/python/${pythonWithDeps.sitePackages}:"
export PYTHONBREAKPOINT=ipdb.set_trace
export XDG_DATA_DIRS="$tmp_path/share''${XDG_DATA_DIRS:+:$XDG_DATA_DIRS}"
export fish_complete_path="$tmp_path/share/fish/vendor_completions.d''${fish_complete_path:+:$fish_complete_path}"

View File

@@ -4,24 +4,11 @@ import logging
import shlex
from clan_cli import create_parser
from clan_cli.custom_logger import get_caller
log = logging.getLogger(__name__)
def get_caller() -> str:
frame = inspect.currentframe()
if frame is None:
return "unknown"
caller_frame = frame.f_back
if caller_frame is None:
return "unknown"
caller_frame = caller_frame.f_back
if caller_frame is None:
return "unknown"
frame_info = inspect.getframeinfo(caller_frame)
ret = f"{frame_info.filename}:{frame_info.lineno}::{frame_info.function}"
return ret
class Cli:
def __init__(self) -> None:

View File

@@ -89,7 +89,7 @@ def generic_create_vm_test(api: TestClient, flake: Path, vm: str) -> None:
assert (
data["status"] == "FINISHED"
), f"Expected to be finished, but got {data['status']} ({data})"
@pytest.mark.skipif(not os.path.exists("/dev/kvm"), reason="Requires KVM")
@pytest.mark.impure