clan-vm-manager: Restore to known good version
14
pkgs/clan-vm-manager/clan_vm_manager/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from clan_cli.profiler import profile
|
||||
|
||||
from clan_vm_manager.app import MainApplication
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@profile
|
||||
def main(argv: list[str] = sys.argv) -> int:
|
||||
app = MainApplication()
|
||||
return app.run(argv)
|
||||
6
pkgs/clan-vm-manager/clan_vm_manager/__main__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
import sys
|
||||
|
||||
from . import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
127
pkgs/clan-vm-manager/clan_vm_manager/app.py
Normal file
@@ -0,0 +1,127 @@
|
||||
#!/usr/bin/env python3
|
||||
import logging
|
||||
from typing import Any, ClassVar
|
||||
|
||||
import gi
|
||||
|
||||
from clan_vm_manager import assets
|
||||
from clan_vm_manager.singletons.toast import InfoToast, ToastOverlay
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
|
||||
from clan_cli.custom_logger import setup_logging
|
||||
from gi.repository import Adw, Gdk, Gio, Gtk
|
||||
|
||||
from clan_vm_manager.components.interfaces import ClanConfig
|
||||
from clan_vm_manager.singletons.use_join import GLib, GObject
|
||||
|
||||
from .windows.main_window import MainWindow
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MainApplication(Adw.Application):
|
||||
"""
|
||||
This class is initialized every time the app is started
|
||||
Only the Adw.ApplicationWindow is a singleton.
|
||||
So don't use any singletons in the Adw.Application class.
|
||||
"""
|
||||
|
||||
__gsignals__: ClassVar = {
|
||||
"join_request": (GObject.SignalFlags.RUN_FIRST, None, [str]),
|
||||
}
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(
|
||||
application_id="org.clan.vm-manager",
|
||||
flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE,
|
||||
)
|
||||
|
||||
self.add_main_option(
|
||||
"debug",
|
||||
ord("d"),
|
||||
GLib.OptionFlags.NONE,
|
||||
GLib.OptionArg.NONE,
|
||||
"enable debug mode",
|
||||
None,
|
||||
)
|
||||
|
||||
self.window: MainWindow | None = None
|
||||
self.connect("activate", self.on_activate)
|
||||
self.connect("shutdown", self.on_shutdown)
|
||||
|
||||
def on_shutdown(self, source: "MainApplication") -> None:
|
||||
log.debug("Shutting down Adw.Application")
|
||||
|
||||
if self.get_windows() == []:
|
||||
log.warning("No windows to destroy")
|
||||
if self.window:
|
||||
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
|
||||
# self.get_windows() returns an empty list. Desync between window and application?
|
||||
self.window.close()
|
||||
# Killing vms directly. This is dirty
|
||||
self.window.kill_vms()
|
||||
else:
|
||||
log.error("No window to destroy")
|
||||
|
||||
def do_command_line(self, command_line: Any) -> int:
|
||||
options = command_line.get_options_dict()
|
||||
# convert GVariantDict -> GVariant -> dict
|
||||
options = options.end().unpack()
|
||||
|
||||
if "debug" in options and self.window is None:
|
||||
setup_logging(logging.DEBUG, root_log_name=__name__.split(".")[0])
|
||||
setup_logging(logging.DEBUG, root_log_name="clan_cli")
|
||||
elif self.window is None:
|
||||
setup_logging(logging.INFO, root_log_name=__name__.split(".")[0])
|
||||
log.debug("Debug logging enabled")
|
||||
|
||||
if "debug" in options:
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
InfoToast("Debug logging enabled").toast, "info.debugging.enabled"
|
||||
)
|
||||
|
||||
args = command_line.get_arguments()
|
||||
|
||||
self.activate()
|
||||
|
||||
if len(args) > 1:
|
||||
uri = args[1]
|
||||
self.emit("join_request", uri)
|
||||
return 0
|
||||
|
||||
def on_window_hide_unhide(self, *_args: Any) -> None:
|
||||
if not self.window:
|
||||
log.error("No window to hide/unhide")
|
||||
return
|
||||
if self.window.is_visible():
|
||||
self.window.hide()
|
||||
else:
|
||||
self.window.present()
|
||||
|
||||
def dummy_menu_entry(self) -> None:
|
||||
log.info("Dummy menu entry called")
|
||||
|
||||
def on_activate(self, source: "MainApplication") -> None:
|
||||
if not self.window:
|
||||
self.init_style()
|
||||
self.window = MainWindow(config=ClanConfig(initial_view="list"))
|
||||
self.window.set_application(self)
|
||||
|
||||
self.window.show()
|
||||
|
||||
# TODO: For css styling
|
||||
def init_style(self) -> None:
|
||||
resource_path = assets.loc / "style.css"
|
||||
|
||||
log.debug(f"Style css path: {resource_path}")
|
||||
css_provider = Gtk.CssProvider()
|
||||
css_provider.load_from_path(str(resource_path))
|
||||
display = Gdk.Display.get_default()
|
||||
assert display is not None
|
||||
Gtk.StyleContext.add_provider_for_display(
|
||||
display,
|
||||
css_provider,
|
||||
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
|
||||
)
|
||||
7
pkgs/clan-vm-manager/clan_vm_manager/assets/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from pathlib import Path
|
||||
|
||||
loc: Path = Path(__file__).parent
|
||||
|
||||
|
||||
def get_asset(name: str | Path) -> Path:
|
||||
return loc / name
|
||||
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/clan_black.png
Normal file
|
After Width: | Height: | Size: 108 KiB |
|
After Width: | Height: | Size: 95 KiB |
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/clan_white.png
Normal file
|
After Width: | Height: | Size: 106 KiB |
|
After Width: | Height: | Size: 3.1 KiB |
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/cybernet.jpeg
Normal file
|
After Width: | Height: | Size: 104 KiB |
|
After Width: | Height: | Size: 98 KiB |
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/firestorm.jpeg
Normal file
|
After Width: | Height: | Size: 155 KiB |
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/penguin.jpeg
Normal file
|
After Width: | Height: | Size: 86 KiB |
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder.jpeg
Normal file
|
After Width: | Height: | Size: 163 KiB |
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/placeholder2.jpeg
Normal file
|
After Width: | Height: | Size: 183 KiB |
66
pkgs/clan-vm-manager/clan_vm_manager/assets/style.css
Normal file
@@ -0,0 +1,66 @@
|
||||
/* Insert custom styles here */
|
||||
|
||||
navigation-view {
|
||||
padding: 5px;
|
||||
/* padding-left: 5px;
|
||||
padding-right: 5px;
|
||||
padding-bottom: 5px; */
|
||||
}
|
||||
|
||||
avatar {
|
||||
margin: 2px;
|
||||
}
|
||||
|
||||
.trust {
|
||||
padding-top: 25px;
|
||||
padding-bottom: 25px;
|
||||
}
|
||||
|
||||
.join-list {
|
||||
margin-top: 1px;
|
||||
margin-left: 2px;
|
||||
margin-right: 2px;
|
||||
|
||||
}
|
||||
|
||||
.progress-bar {
|
||||
margin-right: 25px;
|
||||
min-width: 200px;
|
||||
}
|
||||
|
||||
.group-list {
|
||||
background-color: inherit;
|
||||
}
|
||||
.group-list > .activatable:hover {
|
||||
background-color: unset;
|
||||
}
|
||||
|
||||
.group-list > row {
|
||||
margin-top: 12px;
|
||||
border-bottom: unset;
|
||||
}
|
||||
|
||||
|
||||
.vm-list {
|
||||
margin-top: 25px;
|
||||
margin-bottom: 25px;
|
||||
}
|
||||
|
||||
.no-shadow {
|
||||
box-shadow: none;
|
||||
}
|
||||
|
||||
.search-entry {
|
||||
margin-bottom: 12px;
|
||||
}
|
||||
|
||||
searchbar {
|
||||
margin-bottom: 25px;
|
||||
}
|
||||
|
||||
|
||||
.log-view {
|
||||
margin-top: 12px;
|
||||
font-family: monospace;
|
||||
padding: 8px;
|
||||
}
|
||||
BIN
pkgs/clan-vm-manager/clan_vm_manager/assets/zenith.jpeg
Normal file
|
After Width: | Height: | Size: 152 KiB |
132
pkgs/clan-vm-manager/clan_vm_manager/components/executor.py
Normal file
@@ -0,0 +1,132 @@
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("GdkPixbuf", "2.0")
|
||||
|
||||
import dataclasses
|
||||
import multiprocessing as mp
|
||||
from collections.abc import Callable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||
def _kill_group(proc: mp.Process) -> None:
|
||||
pid = proc.pid
|
||||
if proc.is_alive() and pid:
|
||||
os.killpg(pid, signal.SIGTERM)
|
||||
else:
|
||||
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MPProcess:
|
||||
name: str
|
||||
proc: mp.Process
|
||||
out_file: Path
|
||||
|
||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||
def kill_group(self) -> None:
|
||||
_kill_group(proc=self.proc)
|
||||
|
||||
|
||||
def _set_proc_name(name: str) -> None:
|
||||
if sys.platform != "linux":
|
||||
return
|
||||
import ctypes
|
||||
|
||||
# Define the prctl function with the appropriate arguments and return type
|
||||
libc = ctypes.CDLL("libc.so.6")
|
||||
prctl = libc.prctl
|
||||
prctl.argtypes = [
|
||||
ctypes.c_int,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_ulong,
|
||||
ctypes.c_ulong,
|
||||
ctypes.c_ulong,
|
||||
]
|
||||
prctl.restype = ctypes.c_int
|
||||
|
||||
# Set the process name to "my_process"
|
||||
prctl(15, name.encode(), 0, 0, 0)
|
||||
|
||||
|
||||
def _init_proc(
|
||||
func: Callable,
|
||||
out_file: Path,
|
||||
proc_name: str,
|
||||
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
# Create a new process group
|
||||
os.setsid()
|
||||
|
||||
# Open stdout and stderr
|
||||
with open(out_file, "w") as out_fd:
|
||||
os.dup2(out_fd.fileno(), sys.stdout.fileno())
|
||||
os.dup2(out_fd.fileno(), sys.stderr.fileno())
|
||||
|
||||
# Print some information
|
||||
pid = os.getpid()
|
||||
gpid = os.getpgid(pid=pid)
|
||||
|
||||
# Set the process name
|
||||
_set_proc_name(proc_name)
|
||||
|
||||
# Close stdin
|
||||
sys.stdin.close()
|
||||
|
||||
linebreak = "=" * 5
|
||||
# Execute the main function
|
||||
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
|
||||
try:
|
||||
func(**kwargs)
|
||||
except Exception as ex:
|
||||
traceback.print_exc()
|
||||
if on_except is not None:
|
||||
on_except(ex, mp.current_process())
|
||||
|
||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||
pid = os.getpid()
|
||||
gpid = os.getpgid(pid=pid)
|
||||
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
|
||||
os.killpg(gpid, signal.SIGTERM)
|
||||
sys.exit(1)
|
||||
# Don't use a finally block here, because we want the exitcode to be set to
|
||||
# 0 if the function returns normally
|
||||
|
||||
|
||||
def spawn(
|
||||
*,
|
||||
out_file: Path,
|
||||
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
|
||||
func: Callable,
|
||||
**kwargs: Any,
|
||||
) -> MPProcess:
|
||||
# Decouple the process from the parent
|
||||
if mp.get_start_method(allow_none=True) is None:
|
||||
mp.set_start_method(method="forkserver")
|
||||
|
||||
# Set names
|
||||
proc_name = f"MPExec:{func.__name__}"
|
||||
|
||||
# Start the process
|
||||
proc = mp.Process(
|
||||
target=_init_proc,
|
||||
args=(func, out_file, proc_name, on_except),
|
||||
name=proc_name,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
proc.start()
|
||||
|
||||
# Return the process
|
||||
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
|
||||
|
||||
return mp_proc
|
||||
220
pkgs/clan-vm-manager/clan_vm_manager/components/gkvstore.py
Normal file
@@ -0,0 +1,220 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any, Generic, TypeVar
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gio", "2.0")
|
||||
from gi.repository import Gio, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Define type variables for key and value types
|
||||
K = TypeVar("K") # Key type
|
||||
V = TypeVar(
|
||||
"V", bound=GObject.Object
|
||||
) # Value type, bound to GObject.GObject or its subclasses
|
||||
|
||||
|
||||
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
|
||||
"""
|
||||
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
|
||||
Only use self[key] and del self[key] for accessing the items for better performance.
|
||||
This class could be optimized by having the objects remember their position in the list.
|
||||
"""
|
||||
|
||||
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
|
||||
super().__init__()
|
||||
self.gtype = gtype
|
||||
self.key_gen = key_gen
|
||||
# From Python 3.7 onwards dictionaries are ordered by default
|
||||
self._items: dict[K, V] = dict()
|
||||
|
||||
##################################
|
||||
# #
|
||||
# Gio.ListStore Interface #
|
||||
# #
|
||||
##################################
|
||||
@classmethod
|
||||
def new(cls: Any, gtype: type[V]) -> "GKVStore":
|
||||
return cls.__new__(cls, gtype)
|
||||
|
||||
def append(self, item: V) -> None:
|
||||
key = self.key_gen(item)
|
||||
self[key] = item
|
||||
|
||||
def find(self, item: V) -> tuple[bool, int]:
|
||||
log.warning("Finding is O(n) in GKVStore. Better use indexing")
|
||||
for i, v in enumerate(self.values()):
|
||||
if v == item:
|
||||
return True, i
|
||||
return False, -1
|
||||
|
||||
def find_with_equal_func(
|
||||
self, item: V, equal_func: Callable[[V, V], bool]
|
||||
) -> tuple[bool, int]:
|
||||
log.warning("Finding is O(n) in GKVStore. Better use indexing")
|
||||
for i, v in enumerate(self.values()):
|
||||
if equal_func(v, item):
|
||||
return True, i
|
||||
return False, -1
|
||||
|
||||
def find_with_equal_func_full(
|
||||
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
|
||||
) -> tuple[bool, int]:
|
||||
log.warning("Finding is O(n) in GKVStore. Better use indexing")
|
||||
for i, v in enumerate(self.values()):
|
||||
if equal_func(v, item, user_data):
|
||||
return True, i
|
||||
return False, -1
|
||||
|
||||
def insert(self, position: int, item: V) -> None:
|
||||
log.warning("Inserting is O(n) in GKVStore. Better use append")
|
||||
log.warning(
|
||||
"This functions may have incorrect items_changed signal behavior. Please test it"
|
||||
)
|
||||
key = self.key_gen(item)
|
||||
if key in self._items:
|
||||
raise ValueError("Key already exists in the dictionary")
|
||||
if position < 0 or position > len(self._items):
|
||||
raise IndexError("Index out of range")
|
||||
|
||||
# Temporary storage for items to be reinserted
|
||||
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
|
||||
|
||||
# Delete items from the original dict
|
||||
for k in list(self.keys())[position:]:
|
||||
del self._items[k]
|
||||
|
||||
# Insert the new key-value pair
|
||||
self._items[key] = item
|
||||
|
||||
# Reinsert the items
|
||||
for i, (k, v) in enumerate(temp_list):
|
||||
self._items[k] = v
|
||||
|
||||
# Notify the model of the changes
|
||||
self.items_changed(position, 0, 1)
|
||||
|
||||
def insert_sorted(
|
||||
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
|
||||
) -> None:
|
||||
raise NotImplementedError("insert_sorted is not implemented in GKVStore")
|
||||
|
||||
def remove(self, position: int) -> None:
|
||||
if position < 0 or position >= self.get_n_items():
|
||||
return
|
||||
key = self.keys()[position]
|
||||
del self[key]
|
||||
self.items_changed(position, 1, 0)
|
||||
|
||||
def remove_all(self) -> None:
|
||||
self._items.clear()
|
||||
self.items_changed(0, len(self._items), 0)
|
||||
|
||||
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
|
||||
raise NotImplementedError("sort is not implemented in GKVStore")
|
||||
|
||||
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
|
||||
raise NotImplementedError("splice is not implemented in GKVStore")
|
||||
|
||||
##################################
|
||||
# #
|
||||
# Gio.ListModel Interface #
|
||||
# #
|
||||
##################################
|
||||
def get_item(self, position: int) -> V | None:
|
||||
if position < 0 or position >= self.get_n_items():
|
||||
return None
|
||||
# Access items by index since OrderedDict does not support direct indexing
|
||||
key = list(self._items.keys())[position]
|
||||
return self._items[key]
|
||||
|
||||
def do_get_item(self, position: int) -> V | None:
|
||||
return self.get_item(position)
|
||||
|
||||
def get_item_type(self) -> Any:
|
||||
return self.gtype.__gtype__ # type: ignore[attr-defined]
|
||||
|
||||
def do_get_item_type(self) -> GObject.GType:
|
||||
return self.get_item_type()
|
||||
|
||||
def get_n_items(self) -> int:
|
||||
return len(self._items)
|
||||
|
||||
def do_get_n_items(self) -> int:
|
||||
return self.get_n_items()
|
||||
|
||||
##################################
|
||||
# #
|
||||
# Dict Interface #
|
||||
# #
|
||||
##################################
|
||||
def keys(self) -> list[K]:
|
||||
return list(self._items.keys())
|
||||
|
||||
def values(self) -> list[V]:
|
||||
return list(self._items.values())
|
||||
|
||||
def items(self) -> list[tuple[K, V]]:
|
||||
return list(self._items.items())
|
||||
|
||||
def get(self, key: K, default: V | None = None) -> V | None:
|
||||
return self._items.get(key, default)
|
||||
|
||||
# O(1) operation if the key does not exist, O(n) if it does
|
||||
def __setitem__(self, key: K, value: V) -> None:
|
||||
# If the key already exists, remove it O(n)
|
||||
if key in self._items:
|
||||
log.debug("Updating an existing key in GKVStore is O(n)")
|
||||
position = self.keys().index(key)
|
||||
self._items[key] = value
|
||||
self.items_changed(position, 1, 1)
|
||||
else:
|
||||
# Add the new key-value pair
|
||||
self._items[key] = value
|
||||
position = max(len(self._items) - 1, 0)
|
||||
self.items_changed(position, 0, 1)
|
||||
|
||||
# O(n) operation
|
||||
def __delitem__(self, key: K) -> None:
|
||||
position = self.keys().index(key)
|
||||
del self._items[key]
|
||||
self.items_changed(position, 1, 0)
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._items)
|
||||
|
||||
# O(1) operation
|
||||
def __getitem__(self, key: K) -> V: # type: ignore[override]
|
||||
return self._items[key]
|
||||
|
||||
def __contains__(self, key: K) -> bool: # type: ignore[override]
|
||||
return key in self._items
|
||||
|
||||
def __str__(self) -> str:
|
||||
resp = "GKVStore(\n"
|
||||
for k, v in self._items.items():
|
||||
resp += f"{k}: {v}\n"
|
||||
resp += ")"
|
||||
return resp
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self._items.__str__()
|
||||
|
||||
##################################
|
||||
# #
|
||||
# Custom Methods #
|
||||
# #
|
||||
##################################
|
||||
def first(self) -> V:
|
||||
return self.values()[0]
|
||||
|
||||
def last(self) -> V:
|
||||
return self.values()[-1]
|
||||
|
||||
def register_on_change(
|
||||
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
|
||||
) -> None:
|
||||
self.connect("items-changed", callback)
|
||||
@@ -0,0 +1,10 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClanConfig:
|
||||
initial_view: str
|
||||
@@ -0,0 +1,74 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import TypeVar
|
||||
|
||||
import gi
|
||||
|
||||
from clan_vm_manager import assets
|
||||
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ListItem = TypeVar("ListItem", bound=GObject.Object)
|
||||
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
|
||||
|
||||
|
||||
class EmptySplash(Gtk.Box):
|
||||
def __init__(self, on_join: Callable[[str], None]) -> None:
|
||||
super().__init__(orientation=Gtk.Orientation.VERTICAL)
|
||||
self.on_join = on_join
|
||||
|
||||
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
|
||||
clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png")))
|
||||
|
||||
if clan_icon:
|
||||
image = Gtk.Image.new_from_pixbuf(clan_icon)
|
||||
else:
|
||||
image = Gtk.Image.new_from_icon_name("image-missing")
|
||||
# same as the clamp
|
||||
image.set_pixel_size(400)
|
||||
image.set_opacity(0.5)
|
||||
image.set_margin_top(20)
|
||||
image.set_margin_bottom(10)
|
||||
|
||||
vbox.append(image)
|
||||
|
||||
empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.")
|
||||
join_entry = Gtk.Entry()
|
||||
join_entry.set_placeholder_text("clan://<url>")
|
||||
join_entry.set_hexpand(True)
|
||||
|
||||
join_button = Gtk.Button(label="Join")
|
||||
join_button.connect("clicked", self._on_join, join_entry)
|
||||
|
||||
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
|
||||
|
||||
clamp = Adw.Clamp()
|
||||
clamp.set_maximum_size(400)
|
||||
clamp.set_margin_bottom(40)
|
||||
vbox.append(empty_label)
|
||||
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
|
||||
hbox.append(join_entry)
|
||||
hbox.append(join_button)
|
||||
vbox.append(hbox)
|
||||
clamp.set_child(vbox)
|
||||
|
||||
self.append(clamp)
|
||||
|
||||
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
|
||||
try:
|
||||
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
|
||||
return pixbuf
|
||||
except Exception as e:
|
||||
log.error(f"Failed to load image: {e}")
|
||||
return None
|
||||
|
||||
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
|
||||
"""
|
||||
Callback for the join button
|
||||
Extracts the text from the entry and calls the on_join callback
|
||||
"""
|
||||
log.info(f"Splash screen: Joining {entry.get_text()}")
|
||||
self.on_join(entry.get_text())
|
||||
1189
pkgs/clan-vm-manager/clan_vm_manager/components/trayicon.py
Normal file
384
pkgs/clan-vm-manager/clan_vm_manager/components/vmobj.py
Normal file
@@ -0,0 +1,384 @@
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import weakref
|
||||
from collections.abc import Callable, Generator
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import IO, ClassVar
|
||||
|
||||
import gi
|
||||
from clan_cli import vms
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
from clan_cli.dirs import vm_state_dir
|
||||
from clan_cli.history.add import HistoryEntry
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.vms.qemu import QMPWrapper
|
||||
|
||||
from clan_vm_manager.components.executor import MPProcess, spawn
|
||||
from clan_vm_manager.singletons.toast import (
|
||||
InfoToast,
|
||||
SuccessToast,
|
||||
ToastOverlay,
|
||||
WarningToast,
|
||||
)
|
||||
|
||||
gi.require_version("GObject", "2.0")
|
||||
gi.require_version("Gtk", "4.0")
|
||||
from gi.repository import Gio, GLib, GObject, Gtk
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VMObject(GObject.Object):
|
||||
# Define a custom signal with the name "vm_stopped" and a string argument for the message
|
||||
__gsignals__: ClassVar = {
|
||||
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []),
|
||||
"vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]),
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
icon: Path,
|
||||
data: HistoryEntry,
|
||||
build_log_cb: Callable[[Gio.File], None],
|
||||
) -> None:
|
||||
super().__init__()
|
||||
|
||||
# Store the data from the history entry
|
||||
self.data: HistoryEntry = data
|
||||
|
||||
self.build_log_cb = build_log_cb
|
||||
|
||||
# Create a process object to store the VM process
|
||||
self.vm_process: MPProcess = MPProcess(
|
||||
"vm_dummy", mp.Process(), Path("./dummy")
|
||||
)
|
||||
self.build_process: MPProcess = MPProcess(
|
||||
"build_dummy", mp.Process(), Path("./dummy")
|
||||
)
|
||||
self._start_thread: threading.Thread = threading.Thread()
|
||||
self.machine: Machine | None = None
|
||||
self.qmp_wrap: QMPWrapper | None = None
|
||||
|
||||
# Watcher to stop the VM
|
||||
self.KILL_TIMEOUT: int = 20 # seconds
|
||||
self._stop_thread: threading.Thread = threading.Thread()
|
||||
|
||||
# Build progress bar vars
|
||||
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
|
||||
self.progress_bar.hide()
|
||||
self.progress_bar.set_hexpand(True) # Horizontally expand
|
||||
self.prog_bar_id: int = 0
|
||||
|
||||
# Create a temporary directory to store the logs
|
||||
self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory(
|
||||
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
|
||||
)
|
||||
self._logs_id: int = 0
|
||||
self._log_file: IO[str] | None = None
|
||||
|
||||
# To be able to set the switch state programmatically
|
||||
# we need to store the handler id returned by the connect method
|
||||
# and block the signal while we change the state. This is cursed.
|
||||
self.switch: Gtk.Switch = Gtk.Switch()
|
||||
self.switch_handler_id: int = self.switch.connect(
|
||||
"notify::active", self._on_switch_toggle
|
||||
)
|
||||
self.connect("vm_status_changed", self._on_vm_status_changed)
|
||||
|
||||
# Make sure the VM is killed when the reference to this object is dropped
|
||||
self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop)
|
||||
|
||||
def _vm_status_changed_task(self) -> bool:
|
||||
self.emit("vm_status_changed")
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def update(self, data: HistoryEntry) -> None:
|
||||
self.data = data
|
||||
|
||||
def _on_vm_status_changed(self, source: "VMObject") -> None:
|
||||
# Signal may be emitted multiple times
|
||||
self.emit("vm_build_notify", self.is_building(), self.is_running())
|
||||
|
||||
prev_state = self.switch.get_state()
|
||||
next_state = self.is_running() and not self.is_building()
|
||||
|
||||
self.switch.set_state(next_state)
|
||||
if prev_state is False and next_state is True:
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
SuccessToast(f"{source.data.flake.flake_attr} started").toast,
|
||||
"success.vm.start",
|
||||
)
|
||||
|
||||
if self.switch.get_sensitive() is False and not self.is_building():
|
||||
self.switch.set_sensitive(True)
|
||||
|
||||
exit_vm = self.vm_process.proc.exitcode
|
||||
exit_build = self.build_process.proc.exitcode
|
||||
exitc = exit_vm or exit_build
|
||||
if not self.is_running() and exitc != 0:
|
||||
with self.switch.handler_block(self.switch_handler_id):
|
||||
self.switch.set_active(False)
|
||||
log.error(f"VM exited with error. Exitcode: {exitc}")
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
WarningToast(f"VM exited with error. Exitcode: {exitc}").toast,
|
||||
"warning.vm.exit",
|
||||
)
|
||||
|
||||
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
|
||||
if switch.get_active():
|
||||
switch.set_state(False)
|
||||
switch.set_sensitive(False)
|
||||
self.start()
|
||||
else:
|
||||
switch.set_state(True)
|
||||
self.shutdown()
|
||||
switch.set_sensitive(False)
|
||||
|
||||
# We use a context manager to create the machine object
|
||||
# and make sure it is destroyed when the context is exited
|
||||
@contextmanager
|
||||
def _create_machine(self) -> Generator[Machine, None, None]:
|
||||
uri = ClanURI.from_str(
|
||||
url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr
|
||||
)
|
||||
if uri.flake.is_local():
|
||||
self.machine = Machine(
|
||||
name=self.data.flake.flake_attr,
|
||||
flake=uri.flake,
|
||||
)
|
||||
if uri.flake.is_remote():
|
||||
self.machine = Machine(
|
||||
name=self.data.flake.flake_attr,
|
||||
flake=uri.flake,
|
||||
)
|
||||
assert self.machine is not None
|
||||
state_dir = vm_state_dir(
|
||||
flake_url=str(self.machine.flake.url), vm_name=self.machine.name
|
||||
)
|
||||
self.qmp_wrap = QMPWrapper(state_dir)
|
||||
assert self.machine is not None
|
||||
yield self.machine
|
||||
self.machine = None
|
||||
|
||||
def _pulse_progress_bar_task(self) -> bool:
|
||||
if self.progress_bar.is_visible():
|
||||
self.progress_bar.pulse()
|
||||
return GLib.SOURCE_CONTINUE
|
||||
else:
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def __start(self) -> None:
|
||||
with self._create_machine() as machine:
|
||||
# Start building VM
|
||||
tstart = datetime.now()
|
||||
log.info(f"Building VM {self.get_id()}")
|
||||
log_dir = Path(str(self.log_dir.name))
|
||||
|
||||
# Start the build process
|
||||
self.build_process = spawn(
|
||||
on_except=None,
|
||||
out_file=log_dir / "build.log",
|
||||
func=vms.run.build_vm,
|
||||
machine=machine,
|
||||
tmpdir=log_dir,
|
||||
)
|
||||
|
||||
gfile = Gio.File.new_for_path(str(log_dir / "build.log"))
|
||||
# Gio documentation:
|
||||
# Obtains a file monitor for the given file.
|
||||
# If no file notification mechanism exists, then regular polling of the file is used.
|
||||
g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None)
|
||||
g_monitor.connect("changed", self.on_logs_changed)
|
||||
|
||||
GLib.idle_add(self._vm_status_changed_task)
|
||||
self.switch.set_sensitive(True)
|
||||
# Start the logs watcher
|
||||
self._logs_id = GLib.timeout_add(
|
||||
50, self._get_logs_task, self.build_process
|
||||
)
|
||||
if self._logs_id == 0:
|
||||
log.error("Failed to start VM log watcher")
|
||||
log.debug(f"Starting logs watcher on file: {self.build_process.out_file}")
|
||||
|
||||
# Start the progress bar and show it
|
||||
self.progress_bar.show()
|
||||
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task)
|
||||
if self.prog_bar_id == 0:
|
||||
log.error("Couldn't spawn a progress bar task")
|
||||
|
||||
# Wait for the build to finish then hide the progress bar
|
||||
self.build_process.proc.join()
|
||||
tend = datetime.now()
|
||||
log.info(f"VM {self.get_id()} build took {tend - tstart}s")
|
||||
self.progress_bar.hide()
|
||||
|
||||
# Check if the VM was built successfully
|
||||
if self.build_process.proc.exitcode != 0:
|
||||
log.error(f"Failed to build VM {self.get_id()}")
|
||||
GLib.idle_add(self._vm_status_changed_task)
|
||||
return
|
||||
log.info(f"Successfully built VM {self.get_id()}")
|
||||
|
||||
# Start the VM
|
||||
self.vm_process = spawn(
|
||||
on_except=None,
|
||||
out_file=Path(str(self.log_dir.name)) / "vm.log",
|
||||
func=vms.run.run_vm,
|
||||
vm=self.data.flake.vm,
|
||||
cachedir=log_dir,
|
||||
socketdir=log_dir,
|
||||
)
|
||||
log.debug(f"Started VM {self.get_id()}")
|
||||
GLib.idle_add(self._vm_status_changed_task)
|
||||
|
||||
# Start the logs watcher
|
||||
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
|
||||
if self._logs_id == 0:
|
||||
log.error("Failed to start VM log watcher")
|
||||
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
|
||||
|
||||
# Wait for the VM to stop
|
||||
self.vm_process.proc.join()
|
||||
log.debug(f"VM {self.get_id()} has stopped")
|
||||
GLib.idle_add(self._vm_status_changed_task)
|
||||
|
||||
def on_logs_changed(
|
||||
self,
|
||||
monitor: Gio.FileMonitor,
|
||||
file: Gio.File,
|
||||
other_file: Gio.File,
|
||||
event_type: Gio.FileMonitorEvent,
|
||||
) -> None:
|
||||
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
|
||||
# File was changed and the changes were written to disk
|
||||
# wire up the callback for setting the logs
|
||||
self.build_log_cb(file)
|
||||
|
||||
def start(self) -> None:
|
||||
if self.is_running():
|
||||
log.warn("VM is already running. Ignoring start request")
|
||||
self.emit("vm_status_changed", self)
|
||||
return
|
||||
log.debug(f"VM state dir {self.log_dir.name}")
|
||||
self._start_thread = threading.Thread(target=self.__start)
|
||||
self._start_thread.start()
|
||||
|
||||
def _get_logs_task(self, proc: MPProcess) -> bool:
|
||||
if not proc.out_file.exists():
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
if not self._log_file:
|
||||
try:
|
||||
self._log_file = open(proc.out_file)
|
||||
except Exception as ex:
|
||||
log.exception(ex)
|
||||
self._log_file = None
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
line = os.read(self._log_file.fileno(), 4096)
|
||||
if len(line) != 0:
|
||||
print(line.decode("utf-8"), end="", flush=True)
|
||||
|
||||
if not proc.proc.is_alive():
|
||||
log.debug("Removing logs watcher")
|
||||
self._log_file = None
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return self._start_thread.is_alive()
|
||||
|
||||
def is_building(self) -> bool:
|
||||
return self.build_process.proc.is_alive()
|
||||
|
||||
def is_shutting_down(self) -> bool:
|
||||
return self._stop_thread.is_alive()
|
||||
|
||||
def get_id(self) -> str:
|
||||
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
|
||||
|
||||
def __stop(self) -> None:
|
||||
log.info(f"Stopping VM {self.get_id()}")
|
||||
|
||||
start_time = datetime.now()
|
||||
while self.is_running():
|
||||
diff = datetime.now() - start_time
|
||||
if diff.seconds > self.KILL_TIMEOUT:
|
||||
log.error(
|
||||
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
|
||||
)
|
||||
self.vm_process.kill_group()
|
||||
break
|
||||
if self.is_building():
|
||||
log.info(f"VM {self.get_id()} is still building. Killing it")
|
||||
self.build_process.kill_group()
|
||||
break
|
||||
if not self.machine:
|
||||
log.error(f"Machine object is None. Killing VM {self.get_id()}")
|
||||
self.vm_process.kill_group()
|
||||
break
|
||||
|
||||
# Try to shutdown the VM gracefully using QMP
|
||||
try:
|
||||
assert self.qmp_wrap is not None
|
||||
with self.qmp_wrap.qmp_ctx() as qmp:
|
||||
qmp.command("system_powerdown")
|
||||
except Exception as ex:
|
||||
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
|
||||
|
||||
# Try 20 times to stop the VM
|
||||
time.sleep(self.KILL_TIMEOUT / 20)
|
||||
GLib.idle_add(self._vm_status_changed_task)
|
||||
log.debug(f"VM {self.get_id()} has stopped")
|
||||
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit"
|
||||
)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
if not self.is_running():
|
||||
log.warning("VM not running. Ignoring shutdown request.")
|
||||
self.emit("vm_status_changed", self)
|
||||
return
|
||||
if self.is_shutting_down():
|
||||
log.warning("Shutdown already in progress")
|
||||
self.emit("vm_status_changed", self)
|
||||
return
|
||||
self._stop_thread = threading.Thread(target=self.__stop)
|
||||
self._stop_thread.start()
|
||||
|
||||
def _kill_ref_drop(self) -> None:
|
||||
if self.is_running():
|
||||
log.warning("Killing VM due to reference drop")
|
||||
self.kill()
|
||||
|
||||
def kill(self) -> None:
|
||||
if not self.is_running():
|
||||
log.warning(f"Tried to kill VM {self.get_id()} is not running")
|
||||
return
|
||||
log.info(f"Killing VM {self.get_id()} now")
|
||||
|
||||
if self.vm_process.proc.is_alive():
|
||||
self.vm_process.kill_group()
|
||||
|
||||
if self.build_process.proc.is_alive():
|
||||
self.build_process.kill_group()
|
||||
|
||||
def read_whole_log(self) -> str:
|
||||
if not self.vm_process.out_file.exists():
|
||||
log.error(f"Log file {self.vm_process.out_file} does not exist")
|
||||
return ""
|
||||
return self.vm_process.out_file.read_text()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"VM({self.get_id()})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.__str__()
|
||||
150
pkgs/clan-vm-manager/clan_vm_manager/singletons/toast.py
Normal file
@@ -0,0 +1,150 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
|
||||
from gi.repository import Adw
|
||||
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
from clan_vm_manager.views.logs import Logs
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ToastOverlay:
|
||||
"""
|
||||
The ToastOverlay is a class that manages the display of toasts
|
||||
It should be used as a singleton in your application to prevent duplicate toasts
|
||||
Usage
|
||||
"""
|
||||
|
||||
# For some reason, the adw toast overlay cannot be subclassed
|
||||
# Thats why it is added as a class property
|
||||
overlay: Adw.ToastOverlay
|
||||
active_toasts: set[str]
|
||||
|
||||
_instance: "None | ToastOverlay" = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "ToastOverlay":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls.overlay = Adw.ToastOverlay()
|
||||
cls.active_toasts = set()
|
||||
|
||||
return cls._instance
|
||||
|
||||
def add_toast_unique(self, toast: Adw.Toast, key: str) -> None:
|
||||
if key not in self.active_toasts:
|
||||
self.active_toasts.add(key)
|
||||
self.overlay.add_toast(toast)
|
||||
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
|
||||
|
||||
|
||||
class ErrorToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(
|
||||
self, message: str, persistent: bool = False, details: str = ""
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(
|
||||
f"""<span foreground='red'>❌ Error </span> {message}"""
|
||||
)
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.HIGH)
|
||||
self.toast.set_button_label("Show more")
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
views = ViewStack.use().view
|
||||
|
||||
# we cannot check this type, python is not smart enough
|
||||
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
|
||||
logs_view.set_message(details)
|
||||
|
||||
self.toast.connect(
|
||||
"button-clicked",
|
||||
lambda _: views.set_visible_child_name("logs"),
|
||||
)
|
||||
|
||||
|
||||
class WarningToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(self, message: str, persistent: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(
|
||||
f"<span foreground='orange'>⚠ Warning </span> {message}"
|
||||
)
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
|
||||
class InfoToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(self, message: str, persistent: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(f"<span>❕</span> {message}")
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
|
||||
class SuccessToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(self, message: str, persistent: bool = False) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(f"<span foreground='green'>✅</span> {message}")
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
|
||||
class LogToast:
|
||||
toast: Adw.Toast
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
on_button_click: Callable[[], None],
|
||||
button_label: str = "More",
|
||||
persistent: bool = False,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.toast = Adw.Toast.new(
|
||||
f"""Logs are available <span weight="regular">{message}</span>"""
|
||||
)
|
||||
self.toast.set_use_markup(True)
|
||||
|
||||
self.toast.set_priority(Adw.ToastPriority.NORMAL)
|
||||
|
||||
if persistent:
|
||||
self.toast.set_timeout(0)
|
||||
|
||||
self.toast.set_button_label(button_label)
|
||||
self.toast.connect(
|
||||
"button-clicked",
|
||||
lambda _: on_button_click(),
|
||||
)
|
||||
114
pkgs/clan-vm-manager/clan_vm_manager/singletons/use_join.py
Normal file
@@ -0,0 +1,114 @@
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from typing import Any, ClassVar, cast
|
||||
|
||||
import gi
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
from clan_cli.history.add import HistoryEntry, add_history
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
from clan_vm_manager.components.gkvstore import GKVStore
|
||||
from clan_vm_manager.singletons.use_vms import ClanStore
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Gio, GLib, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JoinValue(GObject.Object):
|
||||
__gsignals__: ClassVar = {
|
||||
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
|
||||
}
|
||||
|
||||
url: ClanURI
|
||||
entry: HistoryEntry | None
|
||||
|
||||
def _join_finished_task(self) -> bool:
|
||||
self.emit("join_finished")
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def __init__(self, url: ClanURI) -> None:
|
||||
super().__init__()
|
||||
self.url: ClanURI = url
|
||||
self.entry: HistoryEntry | None = None
|
||||
|
||||
def __join(self) -> None:
|
||||
new_entry = add_history(self.url)
|
||||
self.entry = new_entry
|
||||
GLib.idle_add(self._join_finished_task)
|
||||
|
||||
def join(self) -> None:
|
||||
threading.Thread(target=self.__join).start()
|
||||
|
||||
|
||||
class JoinList:
|
||||
"""
|
||||
This is a singleton.
|
||||
It is initialized with the first call of use()
|
||||
"""
|
||||
|
||||
_instance: "None | JoinList" = None
|
||||
list_store: Gio.ListStore
|
||||
|
||||
# Make sure the VMS class is used as a singleton
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "JoinList":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls.list_store = Gio.ListStore.new(JoinValue)
|
||||
|
||||
ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list)
|
||||
|
||||
return cls._instance
|
||||
|
||||
def _rerender_join_list(
|
||||
self, source: GKVStore, position: int, removed: int, added: int
|
||||
) -> None:
|
||||
self.list_store.items_changed(
|
||||
0, self.list_store.get_n_items(), self.list_store.get_n_items()
|
||||
)
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return self.list_store.get_n_items() == 0
|
||||
|
||||
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
|
||||
"""
|
||||
Add a join request.
|
||||
This method can add multiple join requests if called subsequently for each request.
|
||||
"""
|
||||
|
||||
value = JoinValue(uri)
|
||||
|
||||
machine_id = Machine(uri.machine_name, uri.flake)
|
||||
machine_id_list = []
|
||||
|
||||
for machine_obj in self.list_store:
|
||||
mvalue: ClanURI = cast(JoinValue, machine_obj).url
|
||||
machine = Machine(mvalue.machine_name, mvalue.flake)
|
||||
machine_id_list.append(machine.get_id())
|
||||
|
||||
if machine_id in machine_id_list:
|
||||
log.info(f"Join request already exists: {value.url}. Ignoring.")
|
||||
return
|
||||
|
||||
value.connect("join_finished", self._on_join_finished)
|
||||
value.connect("join_finished", after_join)
|
||||
|
||||
self.list_store.append(value)
|
||||
|
||||
def _on_join_finished(self, source: JoinValue) -> None:
|
||||
log.info(f"Join finished: {source.url}")
|
||||
self.discard(source)
|
||||
assert source.entry is not None
|
||||
ClanStore.use().push_history_entry(source.entry)
|
||||
|
||||
def discard(self, value: JoinValue) -> None:
|
||||
(has, idx) = self.list_store.find(value)
|
||||
if has:
|
||||
self.list_store.remove(idx)
|
||||
36
pkgs/clan-vm-manager/clan_vm_manager/singletons/use_views.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "4.0")
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Adw
|
||||
|
||||
|
||||
class ViewStack:
|
||||
"""
|
||||
This is a singleton.
|
||||
It is initialized with the first call of use()
|
||||
|
||||
Usage:
|
||||
|
||||
ViewStack.use().set_visible()
|
||||
|
||||
ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
|
||||
|
||||
"""
|
||||
|
||||
_instance: "None | ViewStack" = None
|
||||
view: Adw.ViewStack
|
||||
|
||||
# Make sure the VMS class is used as a singleton
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "ViewStack":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls.view = Adw.ViewStack()
|
||||
|
||||
return cls._instance
|
||||
183
pkgs/clan-vm-manager/clan_vm_manager/singletons/use_vms.py
Normal file
@@ -0,0 +1,183 @@
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any, ClassVar
|
||||
|
||||
import gi
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
from clan_cli.history.add import HistoryEntry
|
||||
from clan_cli.machines.machines import Machine
|
||||
|
||||
from clan_vm_manager import assets
|
||||
from clan_vm_manager.components.gkvstore import GKVStore
|
||||
from clan_vm_manager.components.vmobj import VMObject
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
from clan_vm_manager.views.logs import Logs
|
||||
|
||||
gi.require_version("GObject", "2.0")
|
||||
gi.require_version("Gtk", "4.0")
|
||||
from gi.repository import Gio, GLib, GObject
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VMStore(GKVStore):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
|
||||
|
||||
|
||||
class Emitter(GObject.GObject):
|
||||
__gsignals__: ClassVar = {
|
||||
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
|
||||
}
|
||||
|
||||
|
||||
class ClanStore:
|
||||
_instance: "None | ClanStore" = None
|
||||
_clan_store: GKVStore[str, VMStore]
|
||||
|
||||
_emitter: Emitter
|
||||
|
||||
# set the vm that is outputting logs
|
||||
# build logs are automatically streamed to the logs-view
|
||||
_logging_vm: VMObject | None = None
|
||||
|
||||
# Make sure the VMS class is used as a singleton
|
||||
def __init__(self) -> None:
|
||||
raise RuntimeError("Call use() instead")
|
||||
|
||||
@classmethod
|
||||
def use(cls: Any) -> "ClanStore":
|
||||
if cls._instance is None:
|
||||
cls._instance = cls.__new__(cls)
|
||||
cls._clan_store = GKVStore(
|
||||
VMStore, lambda store: store.first().data.flake.flake_url
|
||||
)
|
||||
cls._emitter = Emitter()
|
||||
|
||||
return cls._instance
|
||||
|
||||
def emit(self, signal: str) -> None:
|
||||
self._emitter.emit(signal)
|
||||
|
||||
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
|
||||
self._emitter.connect(signal, cb)
|
||||
|
||||
def set_logging_vm(self, ident: str) -> VMObject | None:
|
||||
vm = self.get_vm(ClanURI(f"clan://{ident}"))
|
||||
if vm is not None:
|
||||
self._logging_vm = vm
|
||||
|
||||
return self._logging_vm
|
||||
|
||||
def register_on_deep_change(
|
||||
self, callback: Callable[[GKVStore, int, int, int], None]
|
||||
) -> None:
|
||||
"""
|
||||
Register a callback that is called when a clan_store or one of the included VMStores changes
|
||||
"""
|
||||
|
||||
def on_vmstore_change(
|
||||
store: VMStore, position: int, removed: int, added: int
|
||||
) -> None:
|
||||
callback(store, position, removed, added)
|
||||
|
||||
def on_clanstore_change(
|
||||
store: "GKVStore", position: int, removed: int, added: int
|
||||
) -> None:
|
||||
if added > 0:
|
||||
store.values()[position].register_on_change(on_vmstore_change)
|
||||
callback(store, position, removed, added)
|
||||
|
||||
self.clan_store.register_on_change(on_clanstore_change)
|
||||
|
||||
@property
|
||||
def clan_store(self) -> GKVStore[str, VMStore]:
|
||||
return self._clan_store
|
||||
|
||||
def create_vm_task(self, vm: HistoryEntry) -> bool:
|
||||
self.push_history_entry(vm)
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def push_history_entry(self, entry: HistoryEntry) -> None:
|
||||
# TODO: We shouldn't do this here but in the list view
|
||||
if entry.flake.icon is None:
|
||||
icon: Path = assets.loc / "placeholder.jpeg"
|
||||
else:
|
||||
icon = Path(entry.flake.icon)
|
||||
|
||||
def log_details(gfile: Gio.File) -> None:
|
||||
self.log_details(vm, gfile)
|
||||
|
||||
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
|
||||
self.push(vm)
|
||||
|
||||
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
|
||||
views = ViewStack.use().view
|
||||
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
|
||||
|
||||
def file_read_callback(
|
||||
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
|
||||
) -> None:
|
||||
try:
|
||||
# Finish the asynchronous read operation
|
||||
res = source_object.load_contents_finish(result)
|
||||
_success, contents, _etag_out = res
|
||||
|
||||
# Convert the byte array to a string and print it
|
||||
logs_view.set_message(contents.decode("utf-8"))
|
||||
except Exception as e:
|
||||
print(f"Error reading file: {e}")
|
||||
|
||||
# only one vm can output logs at a time
|
||||
if vm == self._logging_vm:
|
||||
gfile.load_contents_async(None, file_read_callback, None)
|
||||
|
||||
# we cannot check this type, python is not smart enough
|
||||
|
||||
def push(self, vm: VMObject) -> None:
|
||||
url = str(vm.data.flake.flake_url)
|
||||
|
||||
# Only write to the store if the Clan is not already in it
|
||||
# Every write to the KVStore rerenders bound widgets to the clan_store
|
||||
if url not in self.clan_store:
|
||||
log.debug(f"Creating new VMStore for {url}")
|
||||
vm_store = VMStore()
|
||||
vm_store.append(vm)
|
||||
self.clan_store[url] = vm_store
|
||||
else:
|
||||
vm_store = self.clan_store[url]
|
||||
machine = vm.data.flake.flake_attr
|
||||
old_vm = vm_store.get(machine)
|
||||
|
||||
if old_vm:
|
||||
log.info(
|
||||
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
|
||||
)
|
||||
old_vm.update(vm.data)
|
||||
else:
|
||||
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
|
||||
vm_store.append(vm)
|
||||
|
||||
def remove(self, vm: VMObject) -> None:
|
||||
del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr]
|
||||
|
||||
def get_vm(self, uri: ClanURI) -> None | VMObject:
|
||||
flake_id = Machine(uri.machine_name, uri.flake).get_id()
|
||||
vm_store = self.clan_store.get(flake_id)
|
||||
if vm_store is None:
|
||||
return None
|
||||
machine = vm_store.get(uri.machine_name, None)
|
||||
return machine
|
||||
|
||||
def get_running_vms(self) -> list[VMObject]:
|
||||
return [
|
||||
vm
|
||||
for clan in self.clan_store.values()
|
||||
for vm in clan.values()
|
||||
if vm.is_running()
|
||||
]
|
||||
|
||||
def kill_all(self) -> None:
|
||||
for vm in self.get_running_vms():
|
||||
vm.kill()
|
||||
61
pkgs/clan-vm-manager/clan_vm_manager/views/details.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import os
|
||||
from collections.abc import Callable
|
||||
from functools import partial
|
||||
from typing import Any, Literal, TypeVar
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Adw, Gio, GObject, Gtk
|
||||
|
||||
# Define a TypeVar that is bound to GObject.Object
|
||||
ListItem = TypeVar("ListItem", bound=GObject.Object)
|
||||
|
||||
|
||||
def create_details_list(
|
||||
model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget]
|
||||
) -> Gtk.ListBox:
|
||||
boxed_list = Gtk.ListBox()
|
||||
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
|
||||
boxed_list.add_css_class("boxed-list")
|
||||
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
|
||||
return boxed_list
|
||||
|
||||
|
||||
class PreferencesValue(GObject.Object):
|
||||
variant: Literal["CPU", "MEMORY"]
|
||||
editable: bool
|
||||
data: Any
|
||||
|
||||
def __init__(
|
||||
self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.variant = variant
|
||||
self.editable = editable
|
||||
self.data = data
|
||||
|
||||
|
||||
class Details(Gtk.Box):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(orientation=Gtk.Orientation.VERTICAL)
|
||||
|
||||
preferences_store = Gio.ListStore.new(PreferencesValue)
|
||||
preferences_store.append(PreferencesValue("CPU", True, 1))
|
||||
|
||||
self.details_list = create_details_list(
|
||||
model=preferences_store, render_row=self.render_entry_row
|
||||
)
|
||||
|
||||
self.append(self.details_list)
|
||||
|
||||
def render_entry_row(
|
||||
self, boxed_list: Gtk.ListBox, item: PreferencesValue
|
||||
) -> Gtk.Widget:
|
||||
cores: int | None = os.cpu_count()
|
||||
fcores = float(cores) if cores else 1.0
|
||||
|
||||
row = Adw.SpinRow.new_with_range(0, fcores, 1)
|
||||
row.set_value(item.data)
|
||||
|
||||
return row
|
||||
356
pkgs/clan-vm-manager/clan_vm_manager/views/list.py
Normal file
@@ -0,0 +1,356 @@
|
||||
import base64
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from functools import partial
|
||||
from typing import Any, TypeVar
|
||||
|
||||
import gi
|
||||
from clan_cli.clan_uri import ClanURI
|
||||
|
||||
from clan_vm_manager.components.gkvstore import GKVStore
|
||||
from clan_vm_manager.components.interfaces import ClanConfig
|
||||
from clan_vm_manager.components.list_splash import EmptySplash
|
||||
from clan_vm_manager.components.vmobj import VMObject
|
||||
from clan_vm_manager.singletons.toast import (
|
||||
LogToast,
|
||||
SuccessToast,
|
||||
ToastOverlay,
|
||||
WarningToast,
|
||||
)
|
||||
from clan_vm_manager.singletons.use_join import JoinList, JoinValue
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
from clan_vm_manager.singletons.use_vms import ClanStore, VMStore
|
||||
from clan_vm_manager.views.logs import Logs
|
||||
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ListItem = TypeVar("ListItem", bound=GObject.Object)
|
||||
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
|
||||
|
||||
|
||||
def create_boxed_list(
|
||||
model: CustomStore,
|
||||
render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget],
|
||||
) -> Gtk.ListBox:
|
||||
boxed_list = Gtk.ListBox()
|
||||
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
|
||||
boxed_list.add_css_class("boxed-list")
|
||||
boxed_list.add_css_class("no-shadow")
|
||||
|
||||
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
|
||||
return boxed_list
|
||||
|
||||
|
||||
class ClanList(Gtk.Box):
|
||||
"""
|
||||
The ClanList
|
||||
Is the composition of
|
||||
the ClanListToolbar
|
||||
the clanListView
|
||||
# ------------------------ #
|
||||
# - Tools <Start> <Stop> < Edit> #
|
||||
# ------------------------ #
|
||||
# - List Items
|
||||
# - <...>
|
||||
# ------------------------#
|
||||
"""
|
||||
|
||||
def __init__(self, config: ClanConfig) -> None:
|
||||
super().__init__(orientation=Gtk.Orientation.VERTICAL)
|
||||
|
||||
app = Gio.Application.get_default()
|
||||
assert app is not None
|
||||
app.connect("join_request", self.on_join_request)
|
||||
|
||||
self.log_label: Gtk.Label = Gtk.Label()
|
||||
|
||||
# Add join list
|
||||
self.join_boxed_list = create_boxed_list(
|
||||
model=JoinList.use().list_store, render_row=self.render_join_row
|
||||
)
|
||||
self.join_boxed_list.add_css_class("join-list")
|
||||
self.append(self.join_boxed_list)
|
||||
|
||||
clan_store = ClanStore.use()
|
||||
clan_store.connect("is_ready", self.display_splash)
|
||||
|
||||
self.group_list = create_boxed_list(
|
||||
model=clan_store.clan_store, render_row=self.render_group_row
|
||||
)
|
||||
self.group_list.add_css_class("group-list")
|
||||
self.append(self.group_list)
|
||||
|
||||
self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x))
|
||||
|
||||
def display_splash(self, source: GKVStore) -> None:
|
||||
print("Displaying splash")
|
||||
if (
|
||||
ClanStore.use().clan_store.get_n_items() == 0
|
||||
and JoinList.use().list_store.get_n_items() == 0
|
||||
):
|
||||
self.append(self.splash)
|
||||
|
||||
def render_group_row(
|
||||
self, boxed_list: Gtk.ListBox, vm_store: VMStore
|
||||
) -> Gtk.Widget:
|
||||
self.remove(self.splash)
|
||||
|
||||
vm = vm_store.first()
|
||||
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
|
||||
|
||||
grp = Adw.PreferencesGroup()
|
||||
grp.set_title(vm.data.flake.clan_name)
|
||||
grp.set_description(vm.data.flake.flake_url)
|
||||
|
||||
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
|
||||
add_action.connect("activate", self.on_add)
|
||||
app = Gio.Application.get_default()
|
||||
assert app is not None
|
||||
app.add_action(add_action)
|
||||
|
||||
# menu_model = Gio.Menu()
|
||||
# TODO: Make this lazy, blocks UI startup for too long
|
||||
# for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url):
|
||||
# if vm not in vm_store:
|
||||
# menu_model.append(vm, f"app.add::{vm}")
|
||||
|
||||
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||||
box.set_valign(Gtk.Align.CENTER)
|
||||
|
||||
add_button = Gtk.Button()
|
||||
add_button_content = Adw.ButtonContent.new()
|
||||
add_button_content.set_label("Add machine")
|
||||
add_button_content.set_icon_name("list-add-symbolic")
|
||||
add_button.add_css_class("flat")
|
||||
add_button.set_child(add_button_content)
|
||||
|
||||
# add_button.set_has_frame(False)
|
||||
# add_button.set_menu_model(menu_model)
|
||||
# add_button.set_label("Add machine")
|
||||
box.append(add_button)
|
||||
|
||||
grp.set_header_suffix(box)
|
||||
|
||||
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
|
||||
grp.add(vm_list)
|
||||
|
||||
return grp
|
||||
|
||||
def on_add(self, source: Any, parameter: Any) -> None:
|
||||
target = parameter.get_string()
|
||||
print("Adding new machine", target)
|
||||
|
||||
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget:
|
||||
# Remove no-shadow class if attached
|
||||
if boxed_list.has_css_class("no-shadow"):
|
||||
boxed_list.remove_css_class("no-shadow")
|
||||
flake = vm.data.flake
|
||||
row = Adw.ActionRow()
|
||||
|
||||
# ====== Display Avatar ======
|
||||
avatar = Adw.Avatar()
|
||||
machine_icon = flake.vm.machine_icon
|
||||
|
||||
# If there is a machine icon, display it else
|
||||
# display the clan icon
|
||||
if machine_icon:
|
||||
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon)))
|
||||
elif flake.icon:
|
||||
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon)))
|
||||
else:
|
||||
avatar.set_text(flake.clan_name + " " + flake.flake_attr)
|
||||
|
||||
avatar.set_show_initials(True)
|
||||
avatar.set_size(50)
|
||||
row.add_prefix(avatar)
|
||||
|
||||
# ====== Display Name And Url =====
|
||||
row.set_title(flake.flake_attr)
|
||||
row.set_title_lines(1)
|
||||
row.set_title_selectable(True)
|
||||
|
||||
# If there is a machine description, display it else
|
||||
# display the clan name
|
||||
if flake.vm.machine_description:
|
||||
row.set_subtitle(flake.vm.machine_description)
|
||||
else:
|
||||
row.set_subtitle(flake.clan_name)
|
||||
row.set_subtitle_lines(1)
|
||||
|
||||
# ==== Display build progress bar ====
|
||||
build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||||
build_box.set_valign(Gtk.Align.CENTER)
|
||||
build_box.append(vm.progress_bar)
|
||||
build_box.set_homogeneous(False)
|
||||
row.add_suffix(build_box) # This allows children to have different sizes
|
||||
|
||||
# ==== Action buttons ====
|
||||
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||||
button_box.set_valign(Gtk.Align.CENTER)
|
||||
|
||||
## Drop down menu
|
||||
open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s"))
|
||||
open_action.connect("activate", self.on_edit)
|
||||
|
||||
action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8")
|
||||
|
||||
build_logs_action = Gio.SimpleAction.new(
|
||||
f"logs.{action_id}", GLib.VariantType.new("s")
|
||||
)
|
||||
|
||||
build_logs_action.connect("activate", self.on_show_build_logs)
|
||||
build_logs_action.set_enabled(False)
|
||||
|
||||
app = Gio.Application.get_default()
|
||||
assert app is not None
|
||||
|
||||
app.add_action(open_action)
|
||||
app.add_action(build_logs_action)
|
||||
|
||||
# set a callback function for conditionally enabling the build_logs action
|
||||
def on_vm_build_notify(
|
||||
vm: VMObject, is_building: bool, is_running: bool
|
||||
) -> None:
|
||||
build_logs_action.set_enabled(is_building or is_running)
|
||||
app.add_action(build_logs_action)
|
||||
if is_building:
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
LogToast(
|
||||
"""Build process running ...""",
|
||||
on_button_click=lambda: self.show_vm_build_logs(vm.get_id()),
|
||||
).toast,
|
||||
f"info.build.running.{vm}",
|
||||
)
|
||||
|
||||
vm.connect("vm_build_notify", on_vm_build_notify)
|
||||
|
||||
menu_model = Gio.Menu()
|
||||
menu_model.append("Edit", f"app.edit::{vm.get_id()}")
|
||||
menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}")
|
||||
|
||||
pref_button = Gtk.MenuButton()
|
||||
pref_button.set_icon_name("open-menu-symbolic")
|
||||
pref_button.set_menu_model(menu_model)
|
||||
|
||||
button_box.append(pref_button)
|
||||
|
||||
## VM switch button
|
||||
switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
|
||||
switch_box.set_valign(Gtk.Align.CENTER)
|
||||
switch_box.append(vm.switch)
|
||||
button_box.append(switch_box)
|
||||
|
||||
row.add_suffix(button_box)
|
||||
|
||||
return row
|
||||
|
||||
def on_edit(self, source: Any, parameter: Any) -> None:
|
||||
target = parameter.get_string()
|
||||
print("Editing settings for machine", target)
|
||||
|
||||
def on_show_build_logs(self, _: Any, parameter: Any) -> None:
|
||||
target = parameter.get_string()
|
||||
self.show_vm_build_logs(target)
|
||||
|
||||
def show_vm_build_logs(self, target: str) -> None:
|
||||
vm = ClanStore.use().set_logging_vm(target)
|
||||
if vm is None:
|
||||
raise ValueError(f"VM {target} not found")
|
||||
|
||||
views = ViewStack.use().view
|
||||
# Reset the logs view
|
||||
logs: Logs = views.get_child_by_name("logs") # type: ignore
|
||||
|
||||
if logs is None:
|
||||
raise ValueError("Logs view not found")
|
||||
|
||||
name = vm.machine.name if vm.machine else "Unknown"
|
||||
|
||||
logs.set_title(f"""📄<span weight="normal"> {name}</span>""")
|
||||
# initial message. Streaming happens automatically when the file is changed by the build process
|
||||
with open(vm.build_process.out_file) as f:
|
||||
logs.set_message(f.read())
|
||||
|
||||
views.set_visible_child_name("logs")
|
||||
|
||||
def render_join_row(
|
||||
self, boxed_list: Gtk.ListBox, join_val: JoinValue
|
||||
) -> Gtk.Widget:
|
||||
if boxed_list.has_css_class("no-shadow"):
|
||||
boxed_list.remove_css_class("no-shadow")
|
||||
|
||||
log.debug("Rendering join row for %s", join_val.url)
|
||||
|
||||
row = Adw.ActionRow()
|
||||
row.set_title(join_val.url.machine_name)
|
||||
row.set_subtitle(str(join_val.url))
|
||||
row.add_css_class("trust")
|
||||
|
||||
vm = ClanStore.use().get_vm(join_val.url)
|
||||
|
||||
# Can't do this here because clan store is empty at this point
|
||||
if vm is not None:
|
||||
sub = row.get_subtitle()
|
||||
assert sub is not None
|
||||
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
WarningToast(
|
||||
f"""<span weight="regular">{join_val.url.machine_name!s}</span> Already exists. Joining again will update it"""
|
||||
).toast,
|
||||
"warning.duplicate.join",
|
||||
)
|
||||
|
||||
row.set_subtitle(
|
||||
sub + "\nClan already exists. Joining again will update it"
|
||||
)
|
||||
|
||||
avatar = Adw.Avatar()
|
||||
avatar.set_text(str(join_val.url.machine_name))
|
||||
avatar.set_show_initials(True)
|
||||
avatar.set_size(50)
|
||||
row.add_prefix(avatar)
|
||||
|
||||
cancel_button = Gtk.Button(label="Cancel")
|
||||
cancel_button.add_css_class("error")
|
||||
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
|
||||
self.cancel_button = cancel_button
|
||||
|
||||
trust_button = Gtk.Button(label="Join")
|
||||
trust_button.add_css_class("success")
|
||||
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
|
||||
|
||||
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
|
||||
box.set_valign(Gtk.Align.CENTER)
|
||||
box.append(cancel_button)
|
||||
box.append(trust_button)
|
||||
|
||||
row.add_suffix(box)
|
||||
|
||||
return row
|
||||
|
||||
def on_join_request(self, source: Any, url: str) -> None:
|
||||
log.debug("Join request: %s", url)
|
||||
clan_uri = ClanURI(url)
|
||||
JoinList.use().push(clan_uri, self.on_after_join)
|
||||
|
||||
def on_after_join(self, source: JoinValue) -> None:
|
||||
ToastOverlay.use().add_toast_unique(
|
||||
SuccessToast(f"Updated {source.url.machine_name}").toast,
|
||||
"success.join",
|
||||
)
|
||||
# If the join request list is empty disable the shadow artefact
|
||||
if JoinList.use().is_empty():
|
||||
self.join_boxed_list.add_css_class("no-shadow")
|
||||
|
||||
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
|
||||
source.set_sensitive(False)
|
||||
self.cancel_button.set_sensitive(False)
|
||||
value.join()
|
||||
|
||||
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
|
||||
JoinList.use().discard(value)
|
||||
if JoinList.use().is_empty():
|
||||
self.join_boxed_list.add_css_class("no-shadow")
|
||||
65
pkgs/clan-vm-manager/clan_vm_manager/views/logs.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import logging
|
||||
|
||||
import gi
|
||||
|
||||
gi.require_version("Adw", "1")
|
||||
from gi.repository import Adw, Gio, Gtk
|
||||
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Logs(Gtk.Box):
|
||||
"""
|
||||
Simple log view
|
||||
This includes a banner and a text view and a button to close the log and navigate back to the overview
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(orientation=Gtk.Orientation.VERTICAL)
|
||||
|
||||
app = Gio.Application.get_default()
|
||||
assert app is not None
|
||||
|
||||
self.banner = Adw.Banner.new("")
|
||||
self.banner.set_use_markup(True)
|
||||
self.banner.set_revealed(True)
|
||||
self.banner.set_button_label("Close")
|
||||
|
||||
self.banner.connect(
|
||||
"button-clicked",
|
||||
lambda _: ViewStack.use().view.set_visible_child_name("list"),
|
||||
)
|
||||
|
||||
self.text_view = Gtk.TextView()
|
||||
self.text_view.set_editable(False)
|
||||
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
|
||||
self.text_view.add_css_class("log-view")
|
||||
|
||||
self.append(self.banner)
|
||||
self.append(self.text_view)
|
||||
|
||||
def set_title(self, title: str) -> None:
|
||||
self.banner.set_title(title)
|
||||
|
||||
def set_message(self, message: str) -> None:
|
||||
"""
|
||||
Set the log message. This will delete any previous message
|
||||
"""
|
||||
buffer = self.text_view.get_buffer()
|
||||
buffer.set_text(message)
|
||||
|
||||
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
|
||||
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
|
||||
|
||||
def append_message(self, message: str) -> None:
|
||||
"""
|
||||
Append to the end of a potentially existent log message
|
||||
"""
|
||||
buffer = self.text_view.get_buffer()
|
||||
end_iter = buffer.get_end_iter()
|
||||
buffer.insert(end_iter, message) # type: ignore
|
||||
|
||||
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
|
||||
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
|
||||
156
pkgs/clan-vm-manager/clan_vm_manager/views/webview.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import dataclasses
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import Any
|
||||
|
||||
import gi
|
||||
from clan_cli.api import API
|
||||
|
||||
gi.require_version("WebKit", "6.0")
|
||||
|
||||
from gi.repository import GLib, WebKit
|
||||
|
||||
site_index: Path = (
|
||||
Path(sys.argv[0]).absolute()
|
||||
/ Path("../..")
|
||||
/ Path("clan_vm_manager/.webui/index.html")
|
||||
).resolve()
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def dataclass_to_dict(obj: Any) -> Any:
|
||||
"""
|
||||
Utility function to convert dataclasses to dictionaries
|
||||
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
|
||||
|
||||
It does NOT convert member functions.
|
||||
"""
|
||||
if dataclasses.is_dataclass(obj):
|
||||
return {k: dataclass_to_dict(v) for k, v in dataclasses.asdict(obj).items()}
|
||||
elif isinstance(obj, list | tuple):
|
||||
return [dataclass_to_dict(item) for item in obj]
|
||||
elif isinstance(obj, dict):
|
||||
return {k: dataclass_to_dict(v) for k, v in obj.items()}
|
||||
else:
|
||||
return obj
|
||||
|
||||
|
||||
class WebView:
|
||||
def __init__(self, methods: dict[str, Callable]) -> None:
|
||||
self.method_registry: dict[str, Callable] = methods
|
||||
|
||||
self.webview = WebKit.WebView()
|
||||
|
||||
settings = self.webview.get_settings()
|
||||
# settings.
|
||||
settings.set_property("enable-developer-extras", True)
|
||||
self.webview.set_settings(settings)
|
||||
|
||||
self.manager = self.webview.get_user_content_manager()
|
||||
# Can be called with: window.webkit.messageHandlers.gtk.postMessage("...")
|
||||
# Important: it seems postMessage must be given some payload, otherwise it won't trigger the event
|
||||
self.manager.register_script_message_handler("gtk")
|
||||
self.manager.connect("script-message-received", self.on_message_received)
|
||||
|
||||
self.webview.load_uri(f"file://{site_index}")
|
||||
|
||||
# global mutex lock to ensure functions run sequentially
|
||||
self.mutex_lock = Lock()
|
||||
self.queue_size = 0
|
||||
|
||||
def on_message_received(
|
||||
self, user_content_manager: WebKit.UserContentManager, message: Any
|
||||
) -> None:
|
||||
payload = json.loads(message.to_json(0))
|
||||
method_name = payload["method"]
|
||||
handler_fn = self.method_registry[method_name]
|
||||
|
||||
log.debug(f"Received message: {payload}")
|
||||
log.debug(f"Queue size: {self.queue_size} (Wait)")
|
||||
|
||||
def threaded_wrapper() -> bool:
|
||||
"""
|
||||
Ensures only one function is executed at a time
|
||||
|
||||
Wait until there is no other function acquiring the global lock.
|
||||
|
||||
Starts a thread with the potentially long running API function within.
|
||||
"""
|
||||
if not self.mutex_lock.locked():
|
||||
thread = threading.Thread(
|
||||
target=self.threaded_handler,
|
||||
args=(
|
||||
handler_fn,
|
||||
payload.get("data"),
|
||||
method_name,
|
||||
),
|
||||
)
|
||||
thread.start()
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
return GLib.SOURCE_CONTINUE
|
||||
|
||||
GLib.idle_add(
|
||||
threaded_wrapper,
|
||||
)
|
||||
self.queue_size += 1
|
||||
|
||||
def threaded_handler(
|
||||
self,
|
||||
handler_fn: Callable[
|
||||
...,
|
||||
Any,
|
||||
],
|
||||
data: dict[str, Any] | None,
|
||||
method_name: str,
|
||||
) -> None:
|
||||
with self.mutex_lock:
|
||||
log.debug("Executing... ", method_name)
|
||||
log.debug(f"{data}")
|
||||
if data is None:
|
||||
result = handler_fn()
|
||||
else:
|
||||
reconciled_arguments = {}
|
||||
for k, v in data.items():
|
||||
# Some functions expect to be called with dataclass instances
|
||||
# But the js api returns dictionaries.
|
||||
# Introspect the function and create the expected dataclass from dict dynamically
|
||||
# Depending on the introspected argument_type
|
||||
arg_type = API.get_method_argtype(method_name, k)
|
||||
if dataclasses.is_dataclass(arg_type):
|
||||
reconciled_arguments[k] = arg_type(**v)
|
||||
else:
|
||||
reconciled_arguments[k] = v
|
||||
|
||||
result = handler_fn(**reconciled_arguments)
|
||||
|
||||
serialized = json.dumps(dataclass_to_dict(result))
|
||||
|
||||
# Use idle_add to queue the response call to js on the main GTK thread
|
||||
GLib.idle_add(self.return_data_to_js, method_name, serialized)
|
||||
self.queue_size -= 1
|
||||
log.debug(f"Done: Remaining queue size: {self.queue_size}")
|
||||
|
||||
def return_data_to_js(self, method_name: str, serialized: str) -> bool:
|
||||
# This function must be run on the main GTK thread to interact with the webview
|
||||
# result = method_fn(data) # takes very long
|
||||
# serialized = result
|
||||
self.webview.evaluate_javascript(
|
||||
f"""
|
||||
window.clan.{method_name}(`{serialized}`);
|
||||
""",
|
||||
-1,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def get_webview(self) -> WebKit.WebView:
|
||||
return self.webview
|
||||
92
pkgs/clan-vm-manager/clan_vm_manager/windows/main_window.py
Normal file
@@ -0,0 +1,92 @@
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import gi
|
||||
from clan_cli.api import API
|
||||
from clan_cli.history.list import list_history
|
||||
|
||||
from clan_vm_manager.components.interfaces import ClanConfig
|
||||
from clan_vm_manager.singletons.toast import ToastOverlay
|
||||
from clan_vm_manager.singletons.use_views import ViewStack
|
||||
from clan_vm_manager.singletons.use_vms import ClanStore
|
||||
from clan_vm_manager.views.details import Details
|
||||
from clan_vm_manager.views.list import ClanList
|
||||
from clan_vm_manager.views.logs import Logs
|
||||
from clan_vm_manager.views.webview import WebView
|
||||
|
||||
gi.require_version("Adw", "1")
|
||||
|
||||
from gi.repository import Adw, Gio, GLib, Gtk
|
||||
|
||||
from clan_vm_manager.components.trayicon import TrayIcon
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MainWindow(Adw.ApplicationWindow):
|
||||
def __init__(self, config: ClanConfig) -> None:
|
||||
super().__init__()
|
||||
self.set_title("Clan Manager")
|
||||
self.set_default_size(980, 850)
|
||||
|
||||
overlay = ToastOverlay.use().overlay
|
||||
view = Adw.ToolbarView()
|
||||
overlay.set_child(view)
|
||||
|
||||
self.set_content(overlay)
|
||||
|
||||
header = Adw.HeaderBar()
|
||||
view.add_top_bar(header)
|
||||
|
||||
app = Gio.Application.get_default()
|
||||
assert app is not None
|
||||
self.tray_icon: TrayIcon = TrayIcon(app)
|
||||
|
||||
# Initialize all ClanStore
|
||||
threading.Thread(target=self._populate_vms).start()
|
||||
|
||||
# Initialize all views
|
||||
stack_view = ViewStack.use().view
|
||||
|
||||
clamp = Adw.Clamp()
|
||||
clamp.set_child(stack_view)
|
||||
clamp.set_maximum_size(1000)
|
||||
|
||||
scroll = Gtk.ScrolledWindow()
|
||||
scroll.set_propagate_natural_height(True)
|
||||
scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
|
||||
scroll.set_child(clamp)
|
||||
|
||||
stack_view.add_named(ClanList(config), "list")
|
||||
stack_view.add_named(Details(), "details")
|
||||
stack_view.add_named(Logs(), "logs")
|
||||
|
||||
webview = WebView(methods=API._registry)
|
||||
stack_view.add_named(webview.get_webview(), "webview")
|
||||
|
||||
stack_view.set_visible_child_name(config.initial_view)
|
||||
|
||||
view.set_content(scroll)
|
||||
|
||||
self.connect("destroy", self.on_destroy)
|
||||
|
||||
def _set_clan_store_ready(self) -> bool:
|
||||
ClanStore.use().emit("is_ready")
|
||||
return GLib.SOURCE_REMOVE
|
||||
|
||||
def _populate_vms(self) -> None:
|
||||
# Execute `clan flakes add <path>` to democlan for this to work
|
||||
# TODO: Make list_history a generator function
|
||||
for entry in list_history():
|
||||
GLib.idle_add(ClanStore.use().create_vm_task, entry)
|
||||
|
||||
GLib.idle_add(self._set_clan_store_ready)
|
||||
|
||||
def kill_vms(self) -> None:
|
||||
log.debug("Killing all VMs")
|
||||
ClanStore.use().kill_all()
|
||||
|
||||
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
|
||||
log.info("====Destroying Adw.ApplicationWindow===")
|
||||
ClanStore.use().kill_all()
|
||||
self.tray_icon.destroy()
|
||||