Merge pull request 'CLI: History supports multiple attrs from the same url now. Errors when executing the cli are formatted better' (#735) from Qubasa-main into main
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import traceback
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
@@ -9,6 +10,7 @@ from typing import Any
|
|||||||
from . import backups, config, flakes, history, machines, secrets, vms
|
from . import backups, config, flakes, history, machines, secrets, vms
|
||||||
from .custom_logger import setup_logging
|
from .custom_logger import setup_logging
|
||||||
from .dirs import get_clan_flake_toplevel
|
from .dirs import get_clan_flake_toplevel
|
||||||
|
from .errors import ClanCmdError, ClanError
|
||||||
from .ssh import cli as ssh_cli
|
from .ssh import cli as ssh_cli
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -130,7 +132,18 @@ def main() -> None:
|
|||||||
if not hasattr(args, "func"):
|
if not hasattr(args, "func"):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
args.func(args)
|
args.func(args)
|
||||||
|
except ClanError as e:
|
||||||
|
if args.debug:
|
||||||
|
traceback.print_exc()
|
||||||
|
sys.exit(1)
|
||||||
|
if isinstance(e, ClanCmdError):
|
||||||
|
if e.cmd.msg:
|
||||||
|
print(e.cmd.msg, file=sys.stderr)
|
||||||
|
else:
|
||||||
|
print(e, file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -1,7 +1,21 @@
|
|||||||
|
import os
|
||||||
|
from math import floor
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import NamedTuple
|
from typing import NamedTuple
|
||||||
|
|
||||||
|
|
||||||
|
def get_term_filler(name: str) -> int:
|
||||||
|
width, height = os.get_terminal_size()
|
||||||
|
|
||||||
|
filler = floor((width - len(name)) / 2)
|
||||||
|
return filler - 1
|
||||||
|
|
||||||
|
|
||||||
|
def text_heading(heading: str) -> str:
|
||||||
|
filler = get_term_filler(heading)
|
||||||
|
return f"{'=' * filler} {heading} {'=' * filler}"
|
||||||
|
|
||||||
|
|
||||||
class CmdOut(NamedTuple):
|
class CmdOut(NamedTuple):
|
||||||
stdout: str
|
stdout: str
|
||||||
stderr: str
|
stderr: str
|
||||||
@@ -12,15 +26,16 @@ class CmdOut(NamedTuple):
|
|||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return f"""
|
return f"""
|
||||||
|
{text_heading(heading="Command")}
|
||||||
|
{self.command}
|
||||||
|
{text_heading(heading="Stderr")}
|
||||||
|
{self.stderr}
|
||||||
|
{text_heading(heading="Stdout")}
|
||||||
|
{self.stdout}
|
||||||
|
{text_heading(heading="Metadata")}
|
||||||
Message: {self.msg}
|
Message: {self.msg}
|
||||||
Working Directory: '{self.cwd}'
|
Working Directory: '{self.cwd}'
|
||||||
Return Code: {self.returncode}
|
Return Code: {self.returncode}
|
||||||
=================== Command ===================
|
|
||||||
{self.command}
|
|
||||||
=================== STDERR ===================
|
|
||||||
{self.stderr}
|
|
||||||
=================== STDOUT ===================
|
|
||||||
{self.stdout}
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -63,26 +63,32 @@ def list_history() -> list[HistoryEntry]:
|
|||||||
return logs
|
return logs
|
||||||
|
|
||||||
|
|
||||||
# TODO: Add all vm entries to history
|
def new_history_entry(uri: ClanURI) -> HistoryEntry:
|
||||||
|
flake = inspect_flake(uri.get_internal(), uri.params.flake_attr)
|
||||||
|
flake.flake_url = str(flake.flake_url)
|
||||||
|
return HistoryEntry(
|
||||||
|
flake=flake,
|
||||||
|
last_used=datetime.datetime.now().isoformat(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def add_history(uri: ClanURI) -> list[HistoryEntry]:
|
def add_history(uri: ClanURI) -> list[HistoryEntry]:
|
||||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||||
logs = list_history()
|
logs = list_history()
|
||||||
found = False
|
found = False
|
||||||
path = uri.get_internal()
|
uri_path = uri.get_internal()
|
||||||
machine = uri.params.flake_attr
|
uri_machine = uri.params.flake_attr
|
||||||
|
|
||||||
for entry in logs:
|
for entry in logs:
|
||||||
if entry.flake.flake_url == str(path):
|
if (
|
||||||
|
entry.flake.flake_url == str(uri_path)
|
||||||
|
and entry.flake.flake_attr == uri_machine
|
||||||
|
):
|
||||||
found = True
|
found = True
|
||||||
entry.last_used = datetime.datetime.now().isoformat()
|
entry.last_used = datetime.datetime.now().isoformat()
|
||||||
|
|
||||||
if not found:
|
if not found:
|
||||||
flake = inspect_flake(path, machine)
|
history = new_history_entry(uri)
|
||||||
flake.flake_url = str(flake.flake_url)
|
|
||||||
history = HistoryEntry(
|
|
||||||
flake=flake,
|
|
||||||
last_used=datetime.datetime.now().isoformat(),
|
|
||||||
)
|
|
||||||
logs.append(history)
|
logs.append(history)
|
||||||
|
|
||||||
write_history_file(logs)
|
write_history_file(logs)
|
||||||
|
|||||||
@@ -1,34 +1,36 @@
|
|||||||
# !/usr/bin/env python3
|
# !/usr/bin/env python3
|
||||||
import argparse
|
import argparse
|
||||||
import copy
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
|
from ..clan_uri import ClanParameters, ClanURI
|
||||||
|
from ..errors import ClanCmdError
|
||||||
from ..locked_open import write_history_file
|
from ..locked_open import write_history_file
|
||||||
from ..nix import nix_metadata
|
from ..nix import nix_metadata
|
||||||
from .add import HistoryEntry, list_history
|
from .add import HistoryEntry, list_history, new_history_entry
|
||||||
|
|
||||||
|
|
||||||
def update_history() -> list[HistoryEntry]:
|
def update_history() -> list[HistoryEntry]:
|
||||||
logs = list_history()
|
logs = list_history()
|
||||||
|
|
||||||
new_logs = []
|
|
||||||
for entry in logs:
|
for entry in logs:
|
||||||
new_entry = copy.deepcopy(entry)
|
try:
|
||||||
|
|
||||||
meta = nix_metadata(entry.flake.flake_url)
|
meta = nix_metadata(entry.flake.flake_url)
|
||||||
|
except ClanCmdError as e:
|
||||||
|
print(f"Failed to update {entry.flake.flake_url}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
new_hash = meta["locked"]["narHash"]
|
new_hash = meta["locked"]["narHash"]
|
||||||
if new_hash != entry.flake.nar_hash:
|
if new_hash != entry.flake.nar_hash:
|
||||||
print(
|
print(
|
||||||
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
|
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
|
||||||
)
|
)
|
||||||
new_entry.last_used = datetime.datetime.now().isoformat()
|
uri = ClanURI.from_str(
|
||||||
new_entry.flake.nar_hash = new_hash
|
url=str(entry.flake.flake_url),
|
||||||
|
params=ClanParameters(entry.flake.flake_attr),
|
||||||
|
)
|
||||||
|
entry = new_history_entry(uri)
|
||||||
|
|
||||||
# TODO: Delete stale entries
|
write_history_file(logs)
|
||||||
new_logs.append(new_entry)
|
return logs
|
||||||
|
|
||||||
write_history_file(new_logs)
|
|
||||||
return new_logs
|
|
||||||
|
|
||||||
|
|
||||||
def add_update_command(args: argparse.Namespace) -> None:
|
def add_update_command(args: argparse.Namespace) -> None:
|
||||||
|
|||||||
@@ -25,7 +25,8 @@ def inspect_vm(flake_url: str | Path, flake_attr: str) -> VmConfig:
|
|||||||
|
|
||||||
cmd = nix_eval(
|
cmd = nix_eval(
|
||||||
[
|
[
|
||||||
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.vm.inspect'
|
f'{flake_url}#clanInternals.machines."{system}"."{flake_attr}".config.clanCore.vm.inspect',
|
||||||
|
"--refresh",
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user