Reworked machines list, and history commands
This commit is contained in:
@@ -40,6 +40,7 @@ class ClanParameters:
|
|||||||
class ClanURI:
|
class ClanURI:
|
||||||
# Initialize the class with a clan:// URI
|
# Initialize the class with a clan:// URI
|
||||||
def __init__(self, uri: str) -> None:
|
def __init__(self, uri: str) -> None:
|
||||||
|
|
||||||
# Check if the URI starts with clan://
|
# Check if the URI starts with clan://
|
||||||
if uri.startswith("clan://"):
|
if uri.startswith("clan://"):
|
||||||
self._nested_uri = uri[7:]
|
self._nested_uri = uri[7:]
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ class FlakeConfig:
|
|||||||
flake_url: str | Path
|
flake_url: str | Path
|
||||||
flake_attr: str
|
flake_attr: str
|
||||||
|
|
||||||
|
nar_hash: str
|
||||||
icon: str | None
|
icon: str | None
|
||||||
description: str | None
|
description: str | None
|
||||||
last_updated: str
|
last_updated: str
|
||||||
@@ -29,7 +30,7 @@ def inspect_flake(flake_url: str | Path, flake_attr: str) -> FlakeConfig:
|
|||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
proc = subprocess.run(cmd, check=True, text=True, stdout=subprocess.PIPE)
|
proc = subprocess.run(cmd, text=True, capture_output=True)
|
||||||
assert proc.stdout is not None
|
assert proc.stdout is not None
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
@@ -38,6 +39,8 @@ command: {shlex.join(cmd)}
|
|||||||
exit code: {proc.returncode}
|
exit code: {proc.returncode}
|
||||||
stdout:
|
stdout:
|
||||||
{proc.stdout}
|
{proc.stdout}
|
||||||
|
stderr:
|
||||||
|
{proc.stderr}
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
res = proc.stdout.strip()
|
res = proc.stdout.strip()
|
||||||
@@ -51,6 +54,7 @@ stdout:
|
|||||||
return FlakeConfig(
|
return FlakeConfig(
|
||||||
flake_url=flake_url,
|
flake_url=flake_url,
|
||||||
flake_attr=flake_attr,
|
flake_attr=flake_attr,
|
||||||
|
nar_hash=meta["locked"]["narHash"],
|
||||||
icon=icon_path,
|
icon=icon_path,
|
||||||
description=meta.get("description"),
|
description=meta.get("description"),
|
||||||
last_updated=meta["lastModified"],
|
last_updated=meta["lastModified"],
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ import argparse
|
|||||||
import dataclasses
|
import dataclasses
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
import os
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -22,11 +21,13 @@ class EnhancedJSONEncoder(json.JSONEncoder):
|
|||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class HistoryEntry:
|
class HistoryEntry:
|
||||||
path: str
|
|
||||||
last_used: str
|
last_used: str
|
||||||
dir_datetime: str
|
|
||||||
flake: FlakeConfig
|
flake: FlakeConfig
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
if isinstance(self.flake, dict):
|
||||||
|
self.flake = FlakeConfig(**self.flake)
|
||||||
|
|
||||||
|
|
||||||
def list_history() -> list[HistoryEntry]:
|
def list_history() -> list[HistoryEntry]:
|
||||||
logs: list[HistoryEntry] = []
|
logs: list[HistoryEntry] = []
|
||||||
@@ -45,35 +46,26 @@ def list_history() -> list[HistoryEntry]:
|
|||||||
return logs
|
return logs
|
||||||
|
|
||||||
|
|
||||||
def get_dir_time(path: Path) -> str:
|
|
||||||
# Get the last modified dir time in seconds
|
|
||||||
dir_mtime = os.path.getmtime(path)
|
|
||||||
dir_datetime = datetime.datetime.fromtimestamp(dir_mtime).isoformat()
|
|
||||||
return dir_datetime
|
|
||||||
|
|
||||||
|
|
||||||
def add_history(path: Path) -> list[HistoryEntry]:
|
def add_history(path: Path) -> list[HistoryEntry]:
|
||||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||||
logs = list_history()
|
logs = list_history()
|
||||||
found = False
|
found = False
|
||||||
|
|
||||||
for entry in logs:
|
for entry in logs:
|
||||||
if entry.path == str(path):
|
if entry.flake.flake_url == str(path):
|
||||||
found = True
|
found = True
|
||||||
entry.last_used = datetime.datetime.now().isoformat()
|
entry.last_used = datetime.datetime.now().isoformat()
|
||||||
|
|
||||||
|
if found:
|
||||||
|
break
|
||||||
|
|
||||||
flake = inspect_flake(path, "defaultVM")
|
flake = inspect_flake(path, "defaultVM")
|
||||||
flake.flake_url = str(flake.flake_url)
|
flake.flake_url = str(flake.flake_url)
|
||||||
dir_datetime = get_dir_time(path)
|
|
||||||
|
|
||||||
history = HistoryEntry(
|
history = HistoryEntry(
|
||||||
flake=flake,
|
flake=flake,
|
||||||
dir_datetime=dir_datetime,
|
|
||||||
path=str(path),
|
|
||||||
last_used=datetime.datetime.now().isoformat(),
|
last_used=datetime.datetime.now().isoformat(),
|
||||||
)
|
)
|
||||||
if not found:
|
logs.append(history)
|
||||||
logs.append(history)
|
|
||||||
|
|
||||||
with locked_open(user_history_file(), "w+") as f:
|
with locked_open(user_history_file(), "w+") as f:
|
||||||
f.write(json.dumps(logs, cls=EnhancedJSONEncoder, indent=4))
|
f.write(json.dumps(logs, cls=EnhancedJSONEncoder, indent=4))
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from .add import list_history
|
|||||||
|
|
||||||
def list_history_command(args: argparse.Namespace) -> None:
|
def list_history_command(args: argparse.Namespace) -> None:
|
||||||
for history_entry in list_history():
|
for history_entry in list_history():
|
||||||
print(history_entry.path)
|
print(history_entry.flake.flake_url)
|
||||||
|
|
||||||
|
|
||||||
# takes a (sub)parser and configures it
|
# takes a (sub)parser and configures it
|
||||||
|
|||||||
@@ -3,11 +3,11 @@ import argparse
|
|||||||
import copy
|
import copy
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from ..dirs import user_history_file
|
from ..dirs import user_history_file
|
||||||
from ..locked_open import locked_open
|
from ..locked_open import locked_open
|
||||||
from .add import EnhancedJSONEncoder, HistoryEntry, get_dir_time, list_history
|
from ..nix import nix_metadata
|
||||||
|
from .add import EnhancedJSONEncoder, HistoryEntry, list_history
|
||||||
|
|
||||||
|
|
||||||
def update_history() -> list[HistoryEntry]:
|
def update_history() -> list[HistoryEntry]:
|
||||||
@@ -16,11 +16,17 @@ def update_history() -> list[HistoryEntry]:
|
|||||||
new_logs = []
|
new_logs = []
|
||||||
for entry in logs:
|
for entry in logs:
|
||||||
new_entry = copy.deepcopy(entry)
|
new_entry = copy.deepcopy(entry)
|
||||||
new_time = get_dir_time(Path(entry.path))
|
|
||||||
if new_time != entry.dir_datetime:
|
meta = nix_metadata(entry.flake.flake_url)
|
||||||
print(f"Updating {entry.path} from {entry.dir_datetime} to {new_time}")
|
new_hash = meta["locked"]["narHash"]
|
||||||
new_entry.dir_datetime = new_time
|
if new_hash != entry.flake.nar_hash:
|
||||||
|
print(
|
||||||
|
f"Updating {entry.flake.flake_url} from {entry.flake.nar_hash} to {new_hash}"
|
||||||
|
)
|
||||||
new_entry.last_used = datetime.datetime.now().isoformat()
|
new_entry.last_used = datetime.datetime.now().isoformat()
|
||||||
|
new_entry.flake.nar_hash = new_hash
|
||||||
|
|
||||||
|
# TODO: Delete stale entries
|
||||||
new_logs.append(new_entry)
|
new_logs.append(new_entry)
|
||||||
|
|
||||||
with locked_open(user_history_file(), "w+") as f:
|
with locked_open(user_history_file(), "w+") as f:
|
||||||
|
|||||||
@@ -1,24 +1,42 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import shlex
|
||||||
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from ..dirs import machines_dir
|
from ..errors import ClanError
|
||||||
from .types import validate_hostname
|
from ..nix import nix_config, nix_eval
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def list_machines(flake_dir: Path) -> list[str]:
|
def list_machines(flake_dir: Path) -> list[str]:
|
||||||
path = machines_dir(flake_dir)
|
config = nix_config()
|
||||||
log.debug(f"Listing machines in {path}")
|
system = config["system"]
|
||||||
if not path.exists():
|
cmd = nix_eval(
|
||||||
return []
|
[
|
||||||
objs: list[str] = []
|
f"{flake_dir}#clanInternals.machines.{system}",
|
||||||
for f in os.listdir(path):
|
"--apply",
|
||||||
if validate_hostname(f):
|
"builtins.attrNames",
|
||||||
objs.append(f)
|
"--json",
|
||||||
return objs
|
]
|
||||||
|
)
|
||||||
|
proc = subprocess.run(cmd, text=True, capture_output=True)
|
||||||
|
assert proc.stdout is not None
|
||||||
|
if proc.returncode != 0:
|
||||||
|
raise ClanError(
|
||||||
|
f"""
|
||||||
|
command: {shlex.join(cmd)}
|
||||||
|
exit code: {proc.returncode}
|
||||||
|
stdout:
|
||||||
|
{proc.stdout}
|
||||||
|
stderr:
|
||||||
|
{proc.stderr}
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
res = proc.stdout.strip()
|
||||||
|
return json.loads(res)
|
||||||
|
|
||||||
|
|
||||||
def list_command(args: argparse.Namespace) -> None:
|
def list_command(args: argparse.Namespace) -> None:
|
||||||
|
|||||||
@@ -21,13 +21,13 @@ def test_history_add(
|
|||||||
"add",
|
"add",
|
||||||
str(test_flake.path),
|
str(test_flake.path),
|
||||||
]
|
]
|
||||||
|
breakpoint()
|
||||||
cli.run(cmd)
|
cli.run(cmd)
|
||||||
|
|
||||||
history_file = user_history_file()
|
history_file = user_history_file()
|
||||||
assert history_file.exists()
|
assert history_file.exists()
|
||||||
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
history = [HistoryEntry(**entry) for entry in json.loads(open(history_file).read())]
|
||||||
assert history[0].path == str(test_flake.path)
|
assert history[0].flake.flake_url == str(test_flake.path)
|
||||||
|
|
||||||
|
|
||||||
def test_history_list(
|
def test_history_list(
|
||||||
|
|||||||
@@ -11,6 +11,22 @@
|
|||||||
}
|
}
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
"python.linting.mypyEnabled": true
|
"python.linting.mypyEnabled": true,
|
||||||
|
"files.exclude": {
|
||||||
|
"**/.direnv": true,
|
||||||
|
"**/.mypy_cache": true,
|
||||||
|
"**/.ruff_cache": true,
|
||||||
|
"**/.hypothesis": true,
|
||||||
|
"**/__pycache__": true,
|
||||||
|
"**/.reports": true
|
||||||
|
},
|
||||||
|
"search.exclude": {
|
||||||
|
"**/.direnv": true,
|
||||||
|
"**/.mypy_cache": true,
|
||||||
|
"**/.ruff_cache": true,
|
||||||
|
"**/.hypothesis": true,
|
||||||
|
"**/__pycache__": true,
|
||||||
|
"**/.reports": true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user