diff --git a/pkgs/clan-cli/clan_cli/cmd.py b/pkgs/clan-cli/clan_cli/cmd.py index e489a89e0..b896b6e06 100644 --- a/pkgs/clan-cli/clan_cli/cmd.py +++ b/pkgs/clan-cli/clan_cli/cmd.py @@ -63,7 +63,7 @@ def handle_output(process: subprocess.Popen, log: Log) -> tuple[str, str]: class TimeTable: """ This class is used to store the time taken by each command - and print it at the end of the program if env PERF=1 is set. + and print it at the end of the program if env CLAN_CLI_PERF=1 is set. """ def __init__(self) -> None: @@ -144,7 +144,8 @@ def run( stdout=stdout_buf, stderr=stderr_buf, cwd=cwd, - command=shlex.join(cmd), + env=env, + command_list=cmd, returncode=process.returncode, msg=error_msg, ) diff --git a/pkgs/clan-cli/clan_cli/errors.py b/pkgs/clan-cli/clan_cli/errors.py index 383e5db11..13e9e7370 100644 --- a/pkgs/clan-cli/clan_cli/errors.py +++ b/pkgs/clan-cli/clan_cli/errors.py @@ -1,42 +1,128 @@ +import contextlib +import json +import os +import shlex import shutil from dataclasses import dataclass from math import floor from pathlib import Path +from typing import cast -def get_term_filler(name: str) -> int: - width, height = shutil.get_terminal_size() +def get_term_filler(name: str) -> tuple[int, int]: + width, _ = shutil.get_terminal_size() filler = floor((width - len(name)) / 2) - return filler - 1 + return (filler - 1, width) def text_heading(heading: str) -> str: - filler = get_term_filler(heading) - return f"{'=' * filler} {heading} {'=' * filler}" + filler, total = get_term_filler(heading) + msg = f"{'=' * filler} {heading} {'=' * filler}" + if len(msg) < total: + msg += "=" + return msg + + +def optional_text(heading: str, text: str | None) -> str: + if text is None or text.strip() == "": + return "" + + with contextlib.suppress(json.JSONDecodeError): + text = json.dumps(json.loads(text), indent=4) + + return f"{text_heading(heading)}\n{text}" + + +@dataclass +class DictDiff: + added: dict[str, str] + removed: dict[str, str] + changed: dict[str, dict[str, str]] + + +def diff_dicts(dict1: dict[str, str], dict2: dict[str, str]) -> DictDiff: + """ + Compare two dictionaries and report additions, deletions, and changes. + + :param dict1: The first dictionary (baseline). + :param dict2: The second dictionary (to compare). + :return: A dictionary with keys 'added', 'removed', and 'changed', each containing + the respective differences. 'changed' is a nested dictionary with keys 'old' and 'new'. + """ + added = {k: dict2[k] for k in dict2 if k not in dict1} + removed = {k: dict1[k] for k in dict1 if k not in dict2} + changed = { + k: {"old": dict1[k], "new": dict2[k]} + for k in dict1 + if k in dict2 and dict1[k] != dict2[k] + } + + return DictDiff(added=added, removed=removed, changed=changed) + + +def indent_command(command_list: list[str]) -> str: + formatted_command = [] + i = 0 + while i < len(command_list): + arg = command_list[i] + formatted_command.append(shlex.quote(arg)) + + if i < len(command_list) - 1: + # Check if the current argument is an option + if arg.startswith("-"): + # Indent after the next argument + formatted_command.append(" ") + i += 1 + formatted_command.append(shlex.quote(command_list[i])) + + if i < len(command_list) - 1: + # Add line continuation only if it's not the last argument + formatted_command.append(" \\\n ") + + i += 1 + + # Join the list into a single string + final_command = "".join(formatted_command) + + # Remove the trailing continuation if it exists + if final_command.endswith(" \\ \n "): + final_command = final_command.rsplit(" \\ \n ", 1)[0] + return final_command @dataclass class CmdOut: stdout: str stderr: str + env: dict[str, str] | None cwd: Path - command: str + command_list: list[str] returncode: int msg: str | None + @property + def command(self) -> str: + return indent_command(self.command_list) + def __str__(self) -> str: + # Set a common indentation level, assuming a reasonable spacing + label_width = max(len("Return Code"), len("Work Dir"), len("Error Msg")) + diffed_dict = ( + diff_dicts(cast(dict[str, str], os.environ), self.env) if self.env else None + ) + diffed_dict_str = ( + json.dumps(diffed_dict.__dict__, indent=4) if diffed_dict else None + ) error_str = f""" -{text_heading(heading="Command")} -{self.command} -{text_heading(heading="Stderr")} -{self.stderr} -{text_heading(heading="Stdout")} -{self.stdout} +{optional_text("Stdout", self.stdout)} +{optional_text("Environment", diffed_dict_str)} +{optional_text("Stderr", self.stderr)} +{optional_text("Command", self.command)} {text_heading(heading="Metadata")} -Message: {self.msg} -Working Directory: '{self.cwd}' -Return Code: {self.returncode} +{'Return Code:':<{label_width}} {self.returncode} +{'Work Dir:':<{label_width}} '{self.cwd}' +{'Error Msg:':<{label_width}} {self.msg.capitalize() if self.msg else ""} """ return error_str