HistoryEntry: add cleaner typesafe json deserialization
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import argparse
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.cmd import run
|
||||
@@ -31,11 +32,19 @@ class FlakeConfig:
|
||||
revision: str | None
|
||||
vm: VmConfig
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if isinstance(self.vm, dict):
|
||||
self.vm = VmConfig(**self.vm)
|
||||
if isinstance(self.flake_url, dict):
|
||||
self.flake_url = FlakeId(**self.flake_url)
|
||||
@classmethod
|
||||
def from_json(cls: type["FlakeConfig"], data: dict[str, Any]) -> "FlakeConfig":
|
||||
return cls(
|
||||
flake_url=FlakeId.from_json(data["flake_url"]),
|
||||
flake_attr=data["flake_attr"],
|
||||
clan_name=data["clan_name"],
|
||||
nar_hash=data["nar_hash"],
|
||||
icon=data.get("icon"),
|
||||
description=data.get("description"),
|
||||
last_updated=data["last_updated"],
|
||||
revision=data.get("revision"),
|
||||
vm=VmConfig.from_json(data["vm"]),
|
||||
)
|
||||
|
||||
|
||||
def run_cmd(cmd: list[str]) -> str:
|
||||
|
||||
@@ -3,16 +3,16 @@ import urllib.parse
|
||||
import urllib.request
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class FlakeId:
|
||||
loc: str
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert isinstance(
|
||||
self.loc, str
|
||||
), f"Flake {self.loc} has an invalid format: {type(self.loc)}"
|
||||
@classmethod
|
||||
def from_json(cls: type["FlakeId"], data: dict[str, Any]) -> "FlakeId":
|
||||
return cls(loc=data["loc"])
|
||||
|
||||
def __str__(self) -> str:
|
||||
return str(self.loc)
|
||||
|
||||
@@ -22,9 +22,13 @@ class HistoryEntry:
|
||||
flake: FlakeConfig
|
||||
settings: dict[str, Any] = dataclasses.field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if isinstance(self.flake, dict):
|
||||
self.flake = FlakeConfig(**self.flake)
|
||||
@classmethod
|
||||
def from_json(cls: type["HistoryEntry"], data: dict[str, Any]) -> "HistoryEntry":
|
||||
return cls(
|
||||
last_used=data["last_used"],
|
||||
flake=FlakeConfig.from_json(data["flake"]),
|
||||
settings=data.get("settings", {}),
|
||||
)
|
||||
|
||||
|
||||
def _merge_dicts(d1: dict, d2: dict) -> dict:
|
||||
@@ -52,7 +56,7 @@ def list_history() -> list[HistoryEntry]:
|
||||
for i, p in enumerate(parsed.copy()):
|
||||
# Everything from the settings dict is merged into the flake dict, and can override existing values
|
||||
parsed[i] = _merge_dicts(p, p.get("settings", {}))
|
||||
logs = [HistoryEntry(**p) for p in parsed]
|
||||
logs = [HistoryEntry.from_json(p) for p in parsed]
|
||||
except (json.JSONDecodeError, TypeError) as ex:
|
||||
msg = f"History file at {user_history_file()} is corrupted"
|
||||
raise ClanError(msg) from ex
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import argparse
|
||||
import dataclasses
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.completions import add_dynamic_completer, complete_machines
|
||||
@@ -13,6 +15,13 @@ class WaypipeConfig:
|
||||
enable: bool
|
||||
command: list[str]
|
||||
|
||||
@classmethod
|
||||
def from_json(cls: type["WaypipeConfig"], data: dict[str, Any]) -> "WaypipeConfig":
|
||||
return cls(
|
||||
enable=data["enable"],
|
||||
command=data["command"],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VmConfig:
|
||||
@@ -28,20 +37,28 @@ class VmConfig:
|
||||
machine_description: str | None
|
||||
machine_icon: Path | None
|
||||
|
||||
waypipe: WaypipeConfig | None = None
|
||||
waypipe: WaypipeConfig
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if isinstance(self.flake_url, str):
|
||||
self.flake_url = FlakeId(self.flake_url)
|
||||
if isinstance(self.waypipe, dict):
|
||||
self.waypipe = WaypipeConfig(**self.waypipe)
|
||||
if isinstance(self.flake_url, dict):
|
||||
self.flake_url = FlakeId(**self.flake_url)
|
||||
@classmethod
|
||||
def from_json(cls: type["VmConfig"], data: dict[str, Any]) -> "VmConfig":
|
||||
return cls(
|
||||
machine_name=data["machine_name"],
|
||||
flake_url=FlakeId.from_json(data["flake_url"]),
|
||||
cores=data["cores"],
|
||||
memory_size=data["memory_size"],
|
||||
graphics=data["graphics"],
|
||||
clan_name=data["clan_name"],
|
||||
machine_description=data.get("machine_description"),
|
||||
machine_icon=data.get("machine_icon"),
|
||||
waypipe=WaypipeConfig.from_json(data["waypipe"]),
|
||||
)
|
||||
|
||||
|
||||
def inspect_vm(machine: Machine) -> VmConfig:
|
||||
data = json.loads(machine.eval_nix("config.clan.core.vm.inspect"))
|
||||
return VmConfig(flake_url=machine.flake, **data)
|
||||
# HACK!
|
||||
data["flake_url"] = dataclasses.asdict(machine.flake)
|
||||
return VmConfig.from_json(data)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -103,7 +103,7 @@ def qemu_command(
|
||||
f'regInfo={nixos_config["regInfo"]}/registration',
|
||||
"console=hvc0",
|
||||
]
|
||||
if not vm.waypipe:
|
||||
if not vm.waypipe.enable:
|
||||
kernel_cmdline.append("console=tty0")
|
||||
hostfwd = ",".join(f"hostfwd=tcp::{h}-:{g}" for h, g in portmap.items())
|
||||
# fmt: off
|
||||
|
||||
@@ -280,7 +280,7 @@ def spawn_vm(
|
||||
packages = ["nixpkgs#qemu"]
|
||||
|
||||
extra_env = {}
|
||||
if vm.graphics and not vm.waypipe:
|
||||
if vm.graphics and not vm.waypipe.enable:
|
||||
packages.append("nixpkgs#virt-viewer")
|
||||
remote_viewer_mimetypes = module_root() / "vms" / "mimetypes"
|
||||
extra_env["XDG_DATA_DIRS"] = (
|
||||
@@ -374,7 +374,7 @@ def run_command(
|
||||
vm: VmConfig = inspect_vm(machine=machine_obj)
|
||||
|
||||
if not os.environ.get("WAYLAND_DISPLAY"):
|
||||
vm.waypipe = False
|
||||
vm.waypipe.enable = False
|
||||
|
||||
portmap = dict(p.split(":") for p in args.publish)
|
||||
|
||||
|
||||
@@ -25,7 +25,9 @@ def test_history_add(
|
||||
|
||||
history_file = user_history_file()
|
||||
assert history_file.exists()
|
||||
history = [HistoryEntry(**entry) for entry in json.loads(history_file.read_text())]
|
||||
history = [
|
||||
HistoryEntry.from_json(entry) for entry in json.loads(history_file.read_text())
|
||||
]
|
||||
assert str(history[0].flake.flake_url) == str(test_flake_with_core.path)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user