clan-app: rename clan-vm-manager
This commit is contained in:
@@ -1,150 +0,0 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
|
||||
from gi.repository import Adw
|
||||
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
from clan_vm_manager.views.logs import Logs
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ToastOverlay:
|
||||
"""
|
||||
The ToastOverlay is a class that manages the display of toasts
|
||||
It should be used as a singleton in your application to prevent duplicate toasts
|
||||
Usage
|
||||
"""
|
||||
|
||||
# For some reason, the adw toast overlay cannot be subclassed
|
||||
# Thats why it is added as a class property
|
||||
overlay: Adw.ToastOverlay
|
||||
active_toasts: set[str]
|
||||
|
||||
_instance: "None | ToastOverlay" = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "ToastOverlay":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls.overlay = Adw.ToastOverlay()
|
||||
cls.active_toasts = set()
|
||||
|
||||
return cls._instance
|
||||
|
||||
def add_toast_unique(self, toast: Adw.Toast, key: str) -> None:
|
||||
if key not in self.active_toasts:
|
||||
self.active_toasts.add(key)
|
||||
self.overlay.add_toast(toast)
|
||||
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
|
||||
|
||||
|
||||
class ErrorToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(
|
||||
self, message: str, persistent: bool = False, details: str = ""
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(
|
||||
f"""<span foreground='red'>❌ Error </span> {message}"""
|
||||
)
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.HIGH)
|
||||
self.toast.set_button_label("Show more")
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
views = ViewStack.use().view
|
||||
|
||||
# we cannot check this type, python is not smart enough
|
||||
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
|
||||
logs_view.set_message(details)
|
||||
|
||||
self.toast.connect(
|
||||
"button-clicked",
|
||||
lambda _: views.set_visible_child_name("logs"),
|
||||
)
|
||||
|
||||
|
||||
class WarningToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(self, message: str, persistent: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(
|
||||
f"<span foreground='orange'>⚠ Warning </span> {message}"
|
||||
)
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
|
||||
class InfoToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(self, message: str, persistent: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(f"<span>❕</span> {message}")
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
|
||||
class SuccessToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(self, message: str, persistent: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(f"<span foreground='green'>✅</span> {message}")
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
|
||||
class LogToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
on_button_click: Callable[[], None],
|
||||
button_label: str = "More",
|
||||
persistent: bool = False,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(
|
||||
f"""Logs are available <span weight="regular">{message}</span>"""
|
||||
)
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
self.toast.set_button_label(button_label)
|
||||
self.toast.connect(
|
||||
"button-clicked",
|
||||
lambda _: on_button_click(),
|
||||
)
|
||||
@@ -1,106 +0,0 @@
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from typing import Any, ClassVar, cast
|
||||
|
||||
import gi
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
from clan_cli.history.add import HistoryEntry, add_history
|
||||
|
||||
from clan_vm_manager.components.gkvstore import GKVStore
|
||||
from clan_vm_manager.singletons.use_vms import ClanStore
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Gio, GLib, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JoinValue(GObject.Object):
|
||||
__gsignals__: ClassVar = {
|
||||
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
|
||||
}
|
||||
|
||||
url: ClanURI
|
||||
entry: HistoryEntry | None
|
||||
|
||||
def _join_finished_task(self) -> bool:
|
||||
self.emit("join_finished")
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def __init__(self, url: ClanURI) -> None:
|
||||
super().__init__()
|
||||
self.url: ClanURI = url
|
||||
self.entry: HistoryEntry | None = None
|
||||
|
||||
def __join(self) -> None:
|
||||
new_entry = add_history(self.url)
|
||||
self.entry = new_entry
|
||||
GLib.idle_add(self._join_finished_task)
|
||||
|
||||
def join(self) -> None:
|
||||
threading.Thread(target=self.__join).start()
|
||||
|
||||
|
||||
class JoinList:
|
||||
"""
|
||||
This is a singleton.
|
||||
It is initialized with the first call of use()
|
||||
"""
|
||||
|
||||
_instance: "None | JoinList" = None
|
||||
list_store: Gio.ListStore
|
||||
|
||||
# Make sure the VMS class is used as a singleton
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "JoinList":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls.list_store = Gio.ListStore.new(JoinValue)
|
||||
|
||||
ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list)
|
||||
|
||||
return cls._instance
|
||||
|
||||
def _rerender_join_list(
|
||||
self, source: GKVStore, position: int, removed: int, added: int
|
||||
) -> None:
|
||||
self.list_store.items_changed(
|
||||
0, self.list_store.get_n_items(), self.list_store.get_n_items()
|
||||
)
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return self.list_store.get_n_items() == 0
|
||||
|
||||
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
|
||||
"""
|
||||
Add a join request.
|
||||
This method can add multiple join requests if called subsequently for each request.
|
||||
"""
|
||||
|
||||
value = JoinValue(uri)
|
||||
if value.url.machine.get_id() in [
|
||||
cast(JoinValue, item).url.machine.get_id() for item in self.list_store
|
||||
]:
|
||||
log.info(f"Join request already exists: {value.url}. Ignoring.")
|
||||
return
|
||||
|
||||
value.connect("join_finished", self._on_join_finished)
|
||||
value.connect("join_finished", after_join)
|
||||
|
||||
self.list_store.append(value)
|
||||
|
||||
def _on_join_finished(self, source: JoinValue) -> None:
|
||||
log.info(f"Join finished: {source.url}")
|
||||
self.discard(source)
|
||||
assert source.entry is not None
|
||||
ClanStore.use().push_history_entry(source.entry)
|
||||
|
||||
def discard(self, value: JoinValue) -> None:
|
||||
(has, idx) = self.list_store.find(value)
|
||||
if has:
|
||||
self.list_store.remove(idx)
|
||||
@@ -1,36 +0,0 @@
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Adw
|
||||
|
||||
|
||||
class ViewStack:
|
||||
"""
|
||||
This is a singleton.
|
||||
It is initialized with the first call of use()
|
||||
|
||||
Usage:
|
||||
|
||||
ViewStack.use().set_visible()
|
||||
|
||||
ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
|
||||
|
||||
"""
|
||||
|
||||
_instance: "None | ViewStack" = None
|
||||
view: Adw.ViewStack
|
||||
|
||||
# Make sure the VMS class is used as a singleton
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "ViewStack":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls.view = Adw.ViewStack()
|
||||
|
||||
return cls._instance
|
||||
@@ -1,181 +0,0 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, ClassVar
|
||||
|
||||
import gi
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
from clan_cli.history.add import HistoryEntry
|
||||
|
||||
from clan_vm_manager import assets
|
||||
from clan_vm_manager.components.gkvstore import GKVStore
|
||||
from clan_vm_manager.components.vmobj import VMObject
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
from clan_vm_manager.views.logs import Logs
|
||||
|
||||
gi.require_version("GObject", "2.0")
|
||||
gi.require_version("Gtk", "4.0")
|
||||
from gi.repository import Gio, GLib, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VMStore(GKVStore):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
|
||||
|
||||
|
||||
class Emitter(GObject.GObject):
|
||||
__gsignals__: ClassVar = {
|
||||
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
|
||||
}
|
||||
|
||||
|
||||
class ClanStore:
|
||||
_instance: "None | ClanStore" = None
|
||||
_clan_store: GKVStore[str, VMStore]
|
||||
|
||||
_emitter: Emitter
|
||||
|
||||
# set the vm that is outputting logs
|
||||
# build logs are automatically streamed to the logs-view
|
||||
_logging_vm: VMObject | None = None
|
||||
|
||||
# Make sure the VMS class is used as a singleton
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "ClanStore":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls._clan_store = GKVStore(
|
||||
VMStore, lambda store: store.first().data.flake.flake_url
|
||||
)
|
||||
cls._emitter = Emitter()
|
||||
|
||||
return cls._instance
|
||||
|
||||
def emit(self, signal: str) -> None:
|
||||
self._emitter.emit(signal)
|
||||
|
||||
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
|
||||
self._emitter.connect(signal, cb)
|
||||
|
||||
def set_logging_vm(self, ident: str) -> VMObject | None:
|
||||
vm = self.get_vm(ClanURI(f"clan://{ident}"))
|
||||
if vm is not None:
|
||||
self._logging_vm = vm
|
||||
|
||||
return self._logging_vm
|
||||
|
||||
def register_on_deep_change(
|
||||
self, callback: Callable[[GKVStore, int, int, int], None]
|
||||
) -> None:
|
||||
"""
|
||||
Register a callback that is called when a clan_store or one of the included VMStores changes
|
||||
"""
|
||||
|
||||
def on_vmstore_change(
|
||||
store: VMStore, position: int, removed: int, added: int
|
||||
) -> None:
|
||||
callback(store, position, removed, added)
|
||||
|
||||
def on_clanstore_change(
|
||||
store: "GKVStore", position: int, removed: int, added: int
|
||||
) -> None:
|
||||
if added > 0:
|
||||
store.values()[position].register_on_change(on_vmstore_change)
|
||||
callback(store, position, removed, added)
|
||||
|
||||
self.clan_store.register_on_change(on_clanstore_change)
|
||||
|
||||
@property
|
||||
def clan_store(self) -> GKVStore[str, VMStore]:
|
||||
return self._clan_store
|
||||
|
||||
def create_vm_task(self, vm: HistoryEntry) -> bool:
|
||||
self.push_history_entry(vm)
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def push_history_entry(self, entry: HistoryEntry) -> None:
|
||||
# TODO: We shouldn't do this here but in the list view
|
||||
if entry.flake.icon is None:
|
||||
icon: Path = assets.loc / "placeholder.jpeg"
|
||||
else:
|
||||
icon = Path(entry.flake.icon)
|
||||
|
||||
def log_details(gfile: Gio.File) -> None:
|
||||
self.log_details(vm, gfile)
|
||||
|
||||
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
|
||||
self.push(vm)
|
||||
|
||||
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
|
||||
views = ViewStack.use().view
|
||||
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
|
||||
|
||||
def file_read_callback(
|
||||
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
|
||||
) -> None:
|
||||
try:
|
||||
# Finish the asynchronous read operation
|
||||
res = source_object.load_contents_finish(result)
|
||||
_success, contents, _etag_out = res
|
||||
|
||||
# Convert the byte array to a string and print it
|
||||
logs_view.set_message(contents.decode("utf-8"))
|
||||
except Exception as e:
|
||||
print(f"Error reading file: {e}")
|
||||
|
||||
# only one vm can output logs at a time
|
||||
if vm == self._logging_vm:
|
||||
gfile.load_contents_async(None, file_read_callback, None)
|
||||
|
||||
# we cannot check this type, python is not smart enough
|
||||
|
||||
def push(self, vm: VMObject) -> None:
|
||||
url = str(vm.data.flake.flake_url)
|
||||
|
||||
# Only write to the store if the Clan is not already in it
|
||||
# Every write to the KVStore rerenders bound widgets to the clan_store
|
||||
if url not in self.clan_store:
|
||||
log.debug(f"Creating new VMStore for {url}")
|
||||
vm_store = VMStore()
|
||||
vm_store.append(vm)
|
||||
self.clan_store[url] = vm_store
|
||||
else:
|
||||
vm_store = self.clan_store[url]
|
||||
machine = vm.data.flake.flake_attr
|
||||
old_vm = vm_store.get(machine)
|
||||
|
||||
if old_vm:
|
||||
log.info(
|
||||
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
|
||||
)
|
||||
old_vm.update(vm.data)
|
||||
else:
|
||||
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
|
||||
vm_store.append(vm)
|
||||
|
||||
def remove(self, vm: VMObject) -> None:
|
||||
del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr]
|
||||
|
||||
def get_vm(self, uri: ClanURI) -> None | VMObject:
|
||||
vm_store = self.clan_store.get(str(uri.flake_id))
|
||||
if vm_store is None:
|
||||
return None
|
||||
machine = vm_store.get(uri.machine.name, None)
|
||||
return machine
|
||||
|
||||
def get_running_vms(self) -> list[VMObject]:
|
||||
return [
|
||||
vm
|
||||
for clan in self.clan_store.values()
|
||||
for vm in clan.values()
|
||||
if vm.is_running()
|
||||
]
|
||||
|
||||
def kill_all(self) -> None:
|
||||
for vm in self.get_running_vms():
|
||||
vm.kill()
|
||||
Reference in New Issue
Block a user