clan_cli: history_add now returns newly added HistoryEntry. clan-vm-manager: Join now uses signals instead of callbacks.

This commit is contained in:
Qubasa
2024-03-03 12:47:18 +07:00
parent bf214011cf
commit 127009b303
3 changed files with 84 additions and 89 deletions

View File

@@ -35,14 +35,14 @@ class HistoryEntry:
self.flake = FlakeConfig(**self.flake)
def merge_dicts(d1: dict, d2: dict) -> dict:
def _merge_dicts(d1: dict, d2: dict) -> dict:
# create a new dictionary that copies d1
merged = dict(d1)
# iterate over the keys and values of d2
for key, value in d2.items():
# if the key is in d1 and both values are dictionaries, merge them recursively
if key in d1 and isinstance(d1[key], dict) and isinstance(value, dict):
merged[key] = merge_dicts(d1[key], value)
merged[key] = _merge_dicts(d1[key], value)
# otherwise, update the value of the key in the merged dictionary
else:
merged[key] = value
@@ -59,7 +59,7 @@ def list_history() -> list[HistoryEntry]:
parsed = read_history_file()
for i, p in enumerate(parsed.copy()):
# Everything from the settings dict is merged into the flake dict, and can override existing values
parsed[i] = merge_dicts(p, p.get("settings", {}))
parsed[i] = _merge_dicts(p, p.get("settings", {}))
logs = [HistoryEntry(**p) for p in parsed]
except (json.JSONDecodeError, TypeError) as ex:
raise ClanError(f"History file at {user_history_file()} is corrupted") from ex
@@ -76,40 +76,47 @@ def new_history_entry(url: str, machine: str) -> HistoryEntry:
)
def add_history(uri: ClanURI, *, all_machines: bool) -> list[HistoryEntry]:
def add_all_to_history(uri: ClanURI) -> list[HistoryEntry]:
history = list_history()
new_entries: list[HistoryEntry] = []
for machine in list_machines(uri.get_internal()):
new_entry = _add_maschine_to_history_list(uri.get_internal(), machine, history)
new_entries.append(new_entry)
write_history_file(history)
return new_entries
def add_history(uri: ClanURI) -> HistoryEntry:
user_history_file().parent.mkdir(parents=True, exist_ok=True)
history = list_history()
if not all_machines:
add_maschine_to_history(uri.get_internal(), uri.params.flake_attr, history)
if all_machines:
for machine in list_machines(uri.get_internal()):
add_maschine_to_history(uri.get_internal(), machine, history)
new_entry = _add_maschine_to_history_list(
uri.get_internal(), uri.params.flake_attr, history
)
write_history_file(history)
return history
return new_entry
def add_maschine_to_history(
uri_path: str, uri_machine: str, logs: list[HistoryEntry]
) -> None:
found = False
for entry in logs:
def _add_maschine_to_history_list(
uri_path: str, uri_machine: str, entries: list[HistoryEntry]
) -> HistoryEntry:
for new_entry in entries:
if (
entry.flake.flake_url == str(uri_path)
and entry.flake.flake_attr == uri_machine
new_entry.flake.flake_url == str(uri_path)
and new_entry.flake.flake_attr == uri_machine
):
found = True
entry.last_used = datetime.datetime.now().isoformat()
new_entry.last_used = datetime.datetime.now().isoformat()
return new_entry
if not found:
history = new_history_entry(uri_path, uri_machine)
logs.append(history)
new_entry = new_history_entry(uri_path, uri_machine)
entries.append(new_entry)
return new_entry
def add_history_command(args: argparse.Namespace) -> None:
add_history(args.uri, all_machines=args.all)
if args.all:
add_all_to_history(args.uri)
else:
add_history(args.uri)
# takes a (sub)parser and configures it