From 40acd695cc14aee1c9151628d1079f2d340afcb6 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 9 Jul 2024 17:35:00 +0200 Subject: [PATCH] clan-app: Remove vm-manager from codebase --- pkgs/clan-app/clan-app.code-workspace | 4 + pkgs/clan-app/clan_app/api/file.py | 103 ++ pkgs/clan-app/clan_app/app.py | 6 +- pkgs/clan-app/clan_app/components/executor.py | 132 -- pkgs/clan-app/clan_app/components/gkvstore.py | 220 --- .../clan_app/components/list_splash.py | 74 - pkgs/clan-app/clan_app/components/trayicon.py | 1189 ----------------- pkgs/clan-app/clan_app/components/vmobj.py | 375 ------ pkgs/clan-app/clan_app/singletons/toast.py | 30 - pkgs/clan-app/clan_app/singletons/use_join.py | 106 -- pkgs/clan-app/clan_app/singletons/use_vms.py | 181 --- pkgs/clan-app/clan_app/views/details.py | 61 - pkgs/clan-app/clan_app/views/list.py | 356 ----- pkgs/clan-app/clan_app/views/logs.py | 65 - pkgs/clan-app/clan_app/views/webview.py | 101 +- pkgs/clan-app/clan_app/windows/main_window.py | 35 +- 16 files changed, 116 insertions(+), 2922 deletions(-) create mode 100644 pkgs/clan-app/clan_app/api/file.py delete mode 100644 pkgs/clan-app/clan_app/components/executor.py delete mode 100644 pkgs/clan-app/clan_app/components/gkvstore.py delete mode 100644 pkgs/clan-app/clan_app/components/list_splash.py delete mode 100644 pkgs/clan-app/clan_app/components/trayicon.py delete mode 100644 pkgs/clan-app/clan_app/components/vmobj.py delete mode 100644 pkgs/clan-app/clan_app/singletons/use_join.py delete mode 100644 pkgs/clan-app/clan_app/singletons/use_vms.py delete mode 100644 pkgs/clan-app/clan_app/views/details.py delete mode 100644 pkgs/clan-app/clan_app/views/list.py delete mode 100644 pkgs/clan-app/clan_app/views/logs.py diff --git a/pkgs/clan-app/clan-app.code-workspace b/pkgs/clan-app/clan-app.code-workspace index 1a108d5ce..6cf6c41e4 100644 --- a/pkgs/clan-app/clan-app.code-workspace +++ b/pkgs/clan-app/clan-app.code-workspace @@ -14,6 +14,9 @@ }, { "path": "../../lib/build-clan" + }, + { + "path": "../webview-ui" } ], "settings": { @@ -25,6 +28,7 @@ "**/.mypy_cache": true, "**/.reports": true, "**/.ruff_cache": true, + "**/.webui": true, "**/result/**": true, "/nix/store/**": true }, diff --git a/pkgs/clan-app/clan_app/api/file.py b/pkgs/clan-app/clan_app/api/file.py new file mode 100644 index 000000000..577facba3 --- /dev/null +++ b/pkgs/clan-app/clan_app/api/file.py @@ -0,0 +1,103 @@ +import gi + +gi.require_version("WebKit", "6.0") + +from gi.repository import Gio, GLib, Gtk, WebKit + + +from clan_cli.api.directory import FileRequest + + +# Implement the abstract open_file function +def open_file(file_request: FileRequest) -> str | None: + # Function to handle the response and stop the loop + selected_path = None + + def on_file_select( + file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop + ) -> None: + try: + gfile = file_dialog.open_finish(task) + if gfile: + nonlocal selected_path + selected_path = gfile.get_path() + except Exception as e: + print(f"Error getting selected file or directory: {e}") + finally: + main_loop.quit() + + def on_folder_select( + file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop + ) -> None: + try: + gfile = file_dialog.select_folder_finish(task) + if gfile: + nonlocal selected_path + selected_path = gfile.get_path() + except Exception as e: + print(f"Error getting selected directory: {e}") + finally: + main_loop.quit() + + def on_save_finish( + file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop + ) -> None: + try: + gfile = file_dialog.save_finish(task) + if gfile: + nonlocal selected_path + selected_path = gfile.get_path() + except Exception as e: + print(f"Error getting selected file: {e}") + finally: + main_loop.quit() + + dialog = Gtk.FileDialog() + + if file_request.title: + dialog.set_title(file_request.title) + + if file_request.filters: + filters = Gio.ListStore.new(Gtk.FileFilter) + file_filters = Gtk.FileFilter() + + if file_request.filters.title: + file_filters.set_name(file_request.filters.title) + + # Create and configure a filter for image files + if file_request.filters.mime_types: + for mime in file_request.filters.mime_types: + file_filters.add_mime_type(mime) + filters.append(file_filters) + + if file_request.filters.patterns: + for pattern in file_request.filters.patterns: + file_filters.add_pattern(pattern) + + if file_request.filters.suffixes: + for suffix in file_request.filters.suffixes: + file_filters.add_suffix(suffix) + + filters.append(file_filters) + dialog.set_filters(filters) + + main_loop = GLib.MainLoop() + + # if select_folder + if file_request.mode == "select_folder": + dialog.select_folder( + callback=lambda dialog, task: on_folder_select(dialog, task, main_loop), + ) + elif file_request.mode == "open_file": + dialog.open( + callback=lambda dialog, task: on_file_select(dialog, task, main_loop) + ) + elif file_request.mode == "save": + dialog.save( + callback=lambda dialog, task: on_save_finish(dialog, task, main_loop) + ) + + # Wait for the user to select a file or directory + main_loop.run() # type: ignore + + return selected_path diff --git a/pkgs/clan-app/clan_app/app.py b/pkgs/clan-app/clan_app/app.py index bd4339596..d4f3c78c9 100644 --- a/pkgs/clan-app/clan_app/app.py +++ b/pkgs/clan-app/clan_app/app.py @@ -13,10 +13,9 @@ gi.require_version("Adw", "1") from pathlib import Path from clan_cli.custom_logger import setup_logging -from gi.repository import Adw, Gdk, Gio, Gtk +from gi.repository import Adw, Gdk, Gio, Gtk, GLib, GObject from clan_app.components.interfaces import ClanConfig -from clan_app.singletons.use_join import GLib, GObject from .windows.main_window import MainWindow @@ -75,8 +74,7 @@ class MainApplication(Adw.Application): # TODO: Doesn't seem to raise the destroy signal. Need to investigate # self.get_windows() returns an empty list. Desync between window and application? self.window.close() - # Killing vms directly. This is dirty - self.window.kill_vms() + else: log.error("No window to destroy") diff --git a/pkgs/clan-app/clan_app/components/executor.py b/pkgs/clan-app/clan_app/components/executor.py deleted file mode 100644 index 7ca59d52f..000000000 --- a/pkgs/clan-app/clan_app/components/executor.py +++ /dev/null @@ -1,132 +0,0 @@ -import logging -import os -import signal -import sys -import traceback -from pathlib import Path -from typing import Any - -import gi - -gi.require_version("GdkPixbuf", "2.0") - -import dataclasses -import multiprocessing as mp -from collections.abc import Callable - -log = logging.getLogger(__name__) - - -# Kill the new process and all its children by sending a SIGTERM signal to the process group -def _kill_group(proc: mp.Process) -> None: - pid = proc.pid - if proc.is_alive() and pid: - os.killpg(pid, signal.SIGTERM) - else: - log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead") - - -@dataclasses.dataclass(frozen=True) -class MPProcess: - name: str - proc: mp.Process - out_file: Path - - # Kill the new process and all its children by sending a SIGTERM signal to the process group - def kill_group(self) -> None: - _kill_group(proc=self.proc) - - -def _set_proc_name(name: str) -> None: - if sys.platform != "linux": - return - import ctypes - - # Define the prctl function with the appropriate arguments and return type - libc = ctypes.CDLL("libc.so.6") - prctl = libc.prctl - prctl.argtypes = [ - ctypes.c_int, - ctypes.c_char_p, - ctypes.c_ulong, - ctypes.c_ulong, - ctypes.c_ulong, - ] - prctl.restype = ctypes.c_int - - # Set the process name to "my_process" - prctl(15, name.encode(), 0, 0, 0) - - -def _init_proc( - func: Callable, - out_file: Path, - proc_name: str, - on_except: Callable[[Exception, mp.process.BaseProcess], None] | None, - **kwargs: Any, -) -> None: - # Create a new process group - os.setsid() - - # Open stdout and stderr - with open(out_file, "w") as out_fd: - os.dup2(out_fd.fileno(), sys.stdout.fileno()) - os.dup2(out_fd.fileno(), sys.stderr.fileno()) - - # Print some information - pid = os.getpid() - gpid = os.getpgid(pid=pid) - - # Set the process name - _set_proc_name(proc_name) - - # Close stdin - sys.stdin.close() - - linebreak = "=" * 5 - # Execute the main function - print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr) - try: - func(**kwargs) - except Exception as ex: - traceback.print_exc() - if on_except is not None: - on_except(ex, mp.current_process()) - - # Kill the new process and all its children by sending a SIGTERM signal to the process group - pid = os.getpid() - gpid = os.getpgid(pid=pid) - print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr) - os.killpg(gpid, signal.SIGTERM) - sys.exit(1) - # Don't use a finally block here, because we want the exitcode to be set to - # 0 if the function returns normally - - -def spawn( - *, - out_file: Path, - on_except: Callable[[Exception, mp.process.BaseProcess], None] | None, - func: Callable, - **kwargs: Any, -) -> MPProcess: - # Decouple the process from the parent - if mp.get_start_method(allow_none=True) is None: - mp.set_start_method(method="forkserver") - - # Set names - proc_name = f"MPExec:{func.__name__}" - - # Start the process - proc = mp.Process( - target=_init_proc, - args=(func, out_file, proc_name, on_except), - name=proc_name, - kwargs=kwargs, - ) - proc.start() - - # Return the process - mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file) - - return mp_proc diff --git a/pkgs/clan-app/clan_app/components/gkvstore.py b/pkgs/clan-app/clan_app/components/gkvstore.py deleted file mode 100644 index ea96b0052..000000000 --- a/pkgs/clan-app/clan_app/components/gkvstore.py +++ /dev/null @@ -1,220 +0,0 @@ -import logging -from collections.abc import Callable -from typing import Any, Generic, TypeVar - -import gi - -gi.require_version("Gio", "2.0") -from gi.repository import Gio, GObject - -log = logging.getLogger(__name__) - - -# Define type variables for key and value types -K = TypeVar("K") # Key type -V = TypeVar( - "V", bound=GObject.Object -) # Value type, bound to GObject.GObject or its subclasses - - -class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]): - """ - A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values. - Only use self[key] and del self[key] for accessing the items for better performance. - This class could be optimized by having the objects remember their position in the list. - """ - - def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None: - super().__init__() - self.gtype = gtype - self.key_gen = key_gen - # From Python 3.7 onwards dictionaries are ordered by default - self._items: dict[K, V] = dict() - - ################################## - # # - # Gio.ListStore Interface # - # # - ################################## - @classmethod - def new(cls: Any, gtype: type[V]) -> "GKVStore": - return cls.__new__(cls, gtype) - - def append(self, item: V) -> None: - key = self.key_gen(item) - self[key] = item - - def find(self, item: V) -> tuple[bool, int]: - log.warning("Finding is O(n) in GKVStore. Better use indexing") - for i, v in enumerate(self.values()): - if v == item: - return True, i - return False, -1 - - def find_with_equal_func( - self, item: V, equal_func: Callable[[V, V], bool] - ) -> tuple[bool, int]: - log.warning("Finding is O(n) in GKVStore. Better use indexing") - for i, v in enumerate(self.values()): - if equal_func(v, item): - return True, i - return False, -1 - - def find_with_equal_func_full( - self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any - ) -> tuple[bool, int]: - log.warning("Finding is O(n) in GKVStore. Better use indexing") - for i, v in enumerate(self.values()): - if equal_func(v, item, user_data): - return True, i - return False, -1 - - def insert(self, position: int, item: V) -> None: - log.warning("Inserting is O(n) in GKVStore. Better use append") - log.warning( - "This functions may have incorrect items_changed signal behavior. Please test it" - ) - key = self.key_gen(item) - if key in self._items: - raise ValueError("Key already exists in the dictionary") - if position < 0 or position > len(self._items): - raise IndexError("Index out of range") - - # Temporary storage for items to be reinserted - temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]] - - # Delete items from the original dict - for k in list(self.keys())[position:]: - del self._items[k] - - # Insert the new key-value pair - self._items[key] = item - - # Reinsert the items - for i, (k, v) in enumerate(temp_list): - self._items[k] = v - - # Notify the model of the changes - self.items_changed(position, 0, 1) - - def insert_sorted( - self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any - ) -> None: - raise NotImplementedError("insert_sorted is not implemented in GKVStore") - - def remove(self, position: int) -> None: - if position < 0 or position >= self.get_n_items(): - return - key = self.keys()[position] - del self[key] - self.items_changed(position, 1, 0) - - def remove_all(self) -> None: - self._items.clear() - self.items_changed(0, len(self._items), 0) - - def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None: - raise NotImplementedError("sort is not implemented in GKVStore") - - def splice(self, position: int, n_removals: int, additions: list[V]) -> None: - raise NotImplementedError("splice is not implemented in GKVStore") - - ################################## - # # - # Gio.ListModel Interface # - # # - ################################## - def get_item(self, position: int) -> V | None: - if position < 0 or position >= self.get_n_items(): - return None - # Access items by index since OrderedDict does not support direct indexing - key = list(self._items.keys())[position] - return self._items[key] - - def do_get_item(self, position: int) -> V | None: - return self.get_item(position) - - def get_item_type(self) -> Any: - return self.gtype.__gtype__ # type: ignore[attr-defined] - - def do_get_item_type(self) -> GObject.GType: - return self.get_item_type() - - def get_n_items(self) -> int: - return len(self._items) - - def do_get_n_items(self) -> int: - return self.get_n_items() - - ################################## - # # - # Dict Interface # - # # - ################################## - def keys(self) -> list[K]: - return list(self._items.keys()) - - def values(self) -> list[V]: - return list(self._items.values()) - - def items(self) -> list[tuple[K, V]]: - return list(self._items.items()) - - def get(self, key: K, default: V | None = None) -> V | None: - return self._items.get(key, default) - - # O(1) operation if the key does not exist, O(n) if it does - def __setitem__(self, key: K, value: V) -> None: - # If the key already exists, remove it O(n) - if key in self._items: - log.debug("Updating an existing key in GKVStore is O(n)") - position = self.keys().index(key) - self._items[key] = value - self.items_changed(position, 1, 1) - else: - # Add the new key-value pair - self._items[key] = value - position = max(len(self._items) - 1, 0) - self.items_changed(position, 0, 1) - - # O(n) operation - def __delitem__(self, key: K) -> None: - position = self.keys().index(key) - del self._items[key] - self.items_changed(position, 1, 0) - - def __len__(self) -> int: - return len(self._items) - - # O(1) operation - def __getitem__(self, key: K) -> V: # type: ignore[override] - return self._items[key] - - def __contains__(self, key: K) -> bool: # type: ignore[override] - return key in self._items - - def __str__(self) -> str: - resp = "GKVStore(\n" - for k, v in self._items.items(): - resp += f"{k}: {v}\n" - resp += ")" - return resp - - def __repr__(self) -> str: - return self._items.__str__() - - ################################## - # # - # Custom Methods # - # # - ################################## - def first(self) -> V: - return self.values()[0] - - def last(self) -> V: - return self.values()[-1] - - def register_on_change( - self, callback: Callable[["GKVStore[K,V]", int, int, int], None] - ) -> None: - self.connect("items-changed", callback) diff --git a/pkgs/clan-app/clan_app/components/list_splash.py b/pkgs/clan-app/clan_app/components/list_splash.py deleted file mode 100644 index 827c72a4b..000000000 --- a/pkgs/clan-app/clan_app/components/list_splash.py +++ /dev/null @@ -1,74 +0,0 @@ -import logging -from collections.abc import Callable -from typing import TypeVar - -import gi - -from clan_app import assets - -gi.require_version("Adw", "1") -from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk - -log = logging.getLogger(__name__) - -ListItem = TypeVar("ListItem", bound=GObject.Object) -CustomStore = TypeVar("CustomStore", bound=Gio.ListModel) - - -class EmptySplash(Gtk.Box): - def __init__(self, on_join: Callable[[str], None]) -> None: - super().__init__(orientation=Gtk.Orientation.VERTICAL) - self.on_join = on_join - - vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) - clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png"))) - - if clan_icon: - image = Gtk.Image.new_from_pixbuf(clan_icon) - else: - image = Gtk.Image.new_from_icon_name("image-missing") - # same as the clamp - image.set_pixel_size(400) - image.set_opacity(0.5) - image.set_margin_top(20) - image.set_margin_bottom(10) - - vbox.append(image) - - empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.") - join_entry = Gtk.Entry() - join_entry.set_placeholder_text("clan://") - join_entry.set_hexpand(True) - - join_button = Gtk.Button(label="Join") - join_button.connect("clicked", self._on_join, join_entry) - - join_entry.connect("activate", lambda e: self._on_join(join_button, e)) - - clamp = Adw.Clamp() - clamp.set_maximum_size(400) - clamp.set_margin_bottom(40) - vbox.append(empty_label) - hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) - hbox.append(join_entry) - hbox.append(join_button) - vbox.append(hbox) - clamp.set_child(vbox) - - self.append(clamp) - - def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None: - try: - pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path) - return pixbuf - except Exception as e: - log.error(f"Failed to load image: {e}") - return None - - def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None: - """ - Callback for the join button - Extracts the text from the entry and calls the on_join callback - """ - log.info(f"Splash screen: Joining {entry.get_text()}") - self.on_join(entry.get_text()) diff --git a/pkgs/clan-app/clan_app/components/trayicon.py b/pkgs/clan-app/clan_app/components/trayicon.py deleted file mode 100644 index 79eec779b..000000000 --- a/pkgs/clan-app/clan_app/components/trayicon.py +++ /dev/null @@ -1,1189 +0,0 @@ -# mypy: allow-untyped-defs -# ruff: noqa: ANN201, ANN001, ANN202 - -# COPYRIGHT (C) 2020-2024 Nicotine+ Contributors -# -# GNU GENERAL PUBLIC LICENSE -# Version 3, 29 June 2007 -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import sys -from collections.abc import Callable -from typing import Any, ClassVar - -import gi - -gi.require_version("Gtk", "4.0") -from gi.repository import GdkPixbuf, Gio, GLib, Gtk - - -# DUMMY IMPLEMENTATION -################################################ -### import pynicotine -class Pynicotine: - __application_id__ = "nicotine-plus" - __application_name__ = "Nicotine+" - __version__ = "3.0.0" - - -pynicotine = Pynicotine() - - -### from pynicotine import slskmessages -class UserStatus: - OFFLINE = 0 - ONLINE = 1 - AWAY = 2 - - -class Slskmessages: - UserStatus: Any = UserStatus - - -slskmessages = Slskmessages() - - -### from pynicotine.config import config -class Config: - sections: ClassVar = { - "notifications": {"notification_popup_sound": False}, - "ui": {"trayicon": True}, - } - data_folder_path: Any = "data_folder_path" - - -config = Config() - - -### from pynicotine.core import core -class User: - login_status: Any = UserStatus.OFFLINE - - -class Core: - users = User() - - -core = Core() -### from pynicotine.gtkgui.application import GTK_API_VERSION -GTK_API_VERSION = 4 - -## from pynicotine.gtkgui.application import GTK_GUI_FOLDER_PATH -GTK_GUI_FOLDER_PATH = "assets" -LONG_PATH_PREFIX = "\\\\?\\" - - -# from pynicotine.gtkgui.widgets.theme import ICON_THEME -class IconTheme: - def lookup_icon(self, icon_name: str, **kwargs: Any) -> None: - return None - - -ICON_THEME = IconTheme() - - -# from pynicotine.gtkgui.widgets.window import Window -class CWindow: - activation_token = None - - -Window = CWindow() - -### from pynicotine.logfacility import log - -import logging - - -class MyLog: - def __init__(self) -> None: - self.log = logging.getLogger(__name__) - - def add_debug(self, *args: Any, **kwargs: Any) -> None: - return - self.log.debug(*args, **kwargs) - - -log = MyLog() - - -### from pynicotine.utils import encode_path - - -def encode_path(path: str, prefix: bool = True) -> bytes: - """Converts a file path to bytes for processing by the system. - - On Windows, also append prefix to enable extended-length path. - """ - - if sys.platform == "win32" and prefix: - path = path.replace("/", "\\") - - if path.startswith("\\\\"): - path = "UNC" + path[1:] - - path = LONG_PATH_PREFIX + path - - return path.encode("utf-8") - - -# from pynicotine.utils import truncate_string_byte -def truncate_string_byte( - string: str, byte_limit: int, encoding: str = "utf-8", ellipsize: bool = False -) -> str: - """Truncates a string to fit inside a byte limit.""" - - string_bytes = string.encode(encoding) - - if len(string_bytes) <= byte_limit: - # Nothing to do, return original string - return string - - if ellipsize: - ellipsis_char = "…".encode(encoding) - string_bytes = ( - string_bytes[: max(byte_limit - len(ellipsis_char), 0)].rstrip() - + ellipsis_char - ) - else: - string_bytes = string_bytes[:byte_limit] - - return string_bytes.decode(encoding, "ignore") - - -################################################ - - -class ImplUnavailableError(Exception): - pass - - -class BaseImplementation: - def __init__(self, application: Any) -> None: - self.application = application - self.menu_items: dict[int, Any] = {} - self.menu_item_id: int = 1 - self.activate_callback: Callable = lambda a, b: self.update_window_visibility - self.is_visible: bool = True - - self.create_menu() - - def create_item( - self, - text: str | None = None, - callback: Callable | None = None, - check: bool = False, - ) -> dict[str, Any]: - item: dict[str, Any] = {"id": self.menu_item_id, "sensitive": True} - - if text is not None: - item["text"] = text - - if callback is not None: - item["callback"] = callback - - if check: - item["toggled"] = False - - self.menu_items[self.menu_item_id] = item - self.menu_item_id += 1 - - return item - - @staticmethod - def set_item_text(item: dict[str, Any], text: str | None) -> None: - item["text"] = text - - @staticmethod - def set_item_sensitive(item: dict[str, Any], sensitive: bool) -> None: - item["sensitive"] = sensitive - - @staticmethod - def set_item_toggled(item: dict[str, Any], toggled: bool) -> None: - item["toggled"] = toggled - - def create_menu(self) -> None: - self.show_hide_item = self.create_item( - "default", self.application.on_window_hide_unhide - ) - - # self.create_item() - - # self.create_item("_Quit", self.application.on_shutdown) - - def update_window_visibility(self) -> None: - if self.application.window is None: - return - - if self.application.window.is_visible(): - label = "Hide VM Manager" - else: - label = "Show VM Manager" - - self.set_item_text(self.show_hide_item, label) - self.update_menu() - - def update_user_status(self) -> None: - self.update_icon() - self.update_menu() - - def update_icon(self) -> None: - pass - # # Check for highlights, and display highlight icon if there is a highlighted room or private chat - # if (self.application.window - # and (self.application.window.chatrooms.highlighted_rooms - # or self.application.window.privatechat.highlighted_users)): - # icon_name = "msg" - - # elif core.users.login_status == slskmessages.UserStatus.ONLINE: - # icon_name = "connect" - - # elif core.users.login_status == slskmessages.UserStatus.AWAY: - # icon_name = "away" - - # else: - # icon_name = "disconnect" - - # icon_name = f"{pynicotine.__application_id__}-{icon_name}" - # self.set_icon(icon_name) - - def set_icon(self, icon_name: str) -> None: - # Implemented in subclasses - pass - - def update_icon_theme(self) -> None: - # Implemented in subclasses - pass - - def update_menu(self) -> None: - # Implemented in subclasses - pass - - def set_download_status(self, status: str) -> None: - self.update_menu() - - def set_upload_status(self, status) -> None: - self.update_menu() - - def show_notification(self, title, message) -> None: - # Implemented in subclasses - pass - - def unload(self, is_shutdown=False) -> None: - # Implemented in subclasses - pass - - -class StatusNotifierImplementation(BaseImplementation): - class DBusProperty: - def __init__(self, name, signature, value) -> None: - self.name = name - self.signature = signature - self.value = value - - class DBusSignal: - def __init__(self, name, args) -> None: - self.name = name - self.args = args - - class DBusMethod: - def __init__(self, name, in_args, out_args, callback) -> None: - self.name = name - self.in_args = in_args - self.out_args = out_args - self.callback = callback - - class DBusService: - def __init__(self, interface_name, object_path, bus_type) -> None: - self._interface_name = interface_name - self._object_path = object_path - - self._bus = Gio.bus_get_sync(bus_type) - self._registration_id = None - self.properties: Any = {} - self.signals: Any = {} - self.methods: Any = {} - - def register(self): - xml_output = f"" - - for property_name, prop in self.properties.items(): - xml_output += f"" - - for method_name, method in self.methods.items(): - xml_output += f"" - - for in_signature in method.in_args: - xml_output += f"" - for out_signature in method.out_args: - xml_output += f"" - - xml_output += "" - - for signal_name, signal in self.signals.items(): - xml_output += f"" - - for signature in signal.args: - xml_output += f"" - - xml_output += "" - - xml_output += "" - - registration_id = self._bus.register_object( - object_path=self._object_path, - interface_info=Gio.DBusNodeInfo.new_for_xml(xml_output).interfaces[0], - method_call_closure=self.on_method_call, - get_property_closure=self.on_get_property, - ) - - if not registration_id: - raise GLib.Error( - f"Failed to register object with path {self._object_path}" - ) - - self._registration_id = registration_id - - def unregister(self) -> None: - if self._registration_id is None: - return - - self._bus.unregister_object(self._registration_id) - self._registration_id = None - - def add_property(self, name: str, signature: Any, value: Any) -> None: - self.properties[name] = StatusNotifierImplementation.DBusProperty( - name, signature, value - ) - - def add_signal(self, name: str, args: Any) -> None: - self.signals[name] = StatusNotifierImplementation.DBusSignal(name, args) - - def add_method( - self, name: str, in_args: Any, out_args: Any, callback: Any - ) -> None: - self.methods[name] = StatusNotifierImplementation.DBusMethod( - name, in_args, out_args, callback - ) - - def emit_signal(self, name: str, *args: Any) -> None: - arg_types = "".join(self.signals[name].args) - - self._bus.emit_signal( - destination_bus_name=None, - object_path=self._object_path, - interface_name=self._interface_name, - signal_name=name, - parameters=GLib.Variant(f"({arg_types})", args), - ) - - def on_method_call( - self, - _connection, - _sender, - _path, - _interface_name, - method_name, - parameters, - invocation, - ): - method = self.methods[method_name] - result = method.callback(*parameters.unpack()) - - out_arg_types = "".join(method.out_args) - return_value = None - - if method.out_args: - return_value = GLib.Variant(f"({out_arg_types})", result) - - invocation.return_value(return_value) - - def on_get_property( - self, _connection, _sender, _path, _interface_name, property_name - ): - prop = self.properties[property_name] - return GLib.Variant(prop.signature, prop.value) - - class DBusMenuService(DBusService): - def __init__(self) -> None: - super().__init__( - interface_name="com.canonical.dbusmenu", - object_path="/org/ayatana/NotificationItem/Nicotine/Menu", - bus_type=Gio.BusType.SESSION, - ) - - self._items: Any = {} - self._revision: Any = 0 - - for method_name, in_args, out_args, callback in ( - ( - "GetGroupProperties", - ("ai", "as"), - ("a(ia{sv})",), - self.on_get_group_properties, - ), - ( - "GetLayout", - ("i", "i", "as"), - ("u", "(ia{sv}av)"), - self.on_get_layout, - ), - ("Event", ("i", "s", "v", "u"), (), self.on_event), - ): - self.add_method(method_name, in_args, out_args, callback) - - for signal_name, value in (("LayoutUpdated", ("u", "i")),): - self.add_signal(signal_name, value) - - def set_items(self, items) -> None: - self._items = items - - self._revision += 1 - self.emit_signal("LayoutUpdated", self._revision, 0) - - @staticmethod - def _serialize_item(item) -> dict[str, Any]: - if "text" in item: - props = { - "label": GLib.Variant("s", item["text"]), - "enabled": GLib.Variant("b", item["sensitive"]), - } - - if item.get("toggled") is not None: - props["toggle-type"] = GLib.Variant("s", "checkmark") - props["toggle-state"] = GLib.Variant("i", int(item["toggled"])) - - return props - - return {"type": GLib.Variant("s", "separator")} - - def on_get_group_properties(self, ids, _properties): - item_properties = [] - - for idx, item in self._items.items(): - if idx in ids: - item_properties.append((idx, self._serialize_item(item))) - - return (item_properties,) - - def on_get_layout(self, _parent_id, _recursion_depth, _property_names): - serialized_items = [] - - for idx, item in self._items.items(): - serialized_item = GLib.Variant( - "(ia{sv}av)", (idx, self._serialize_item(item), []) - ) - serialized_items.append(serialized_item) - - return self._revision, (0, {}, serialized_items) - - def on_event(self, idx, event_id, _data, _timestamp) -> None: - if event_id == "clicked": - self._items[idx]["callback"]() - - class StatusNotifierItemService(DBusService): - def __init__(self, activate_callback) -> None: - super().__init__( - interface_name="org.kde.StatusNotifierItem", - object_path="/org/ayatana/NotificationItem/Nicotine", - bus_type=Gio.BusType.SESSION, - ) - - self.menu = StatusNotifierImplementation.DBusMenuService() - - for property_name, signature, value in ( - ("Category", "s", "Communications"), - ("Id", "s", pynicotine.__application_id__), - ("Title", "s", pynicotine.__application_name__), - ( - "ToolTip", - "(sa(iiay)ss)", - ("", [], pynicotine.__application_name__, ""), - ), - ("Menu", "o", "/org/ayatana/NotificationItem/Nicotine/Menu"), - ("ItemIsMenu", "b", False), - ("IconName", "s", ""), - ("IconThemePath", "s", ""), - ("Status", "s", "Active"), - ): - self.add_property(property_name, signature, value) - - for method_name, in_args, out_args, callback in ( - ("Activate", ("i", "i"), (), activate_callback), - ( - "ProvideXdgActivationToken", - ("s",), - (), - self.on_provide_activation_token, - ), - ): - self.add_method(method_name, in_args, out_args, callback) - - for signal_name, value in ( - ("NewIcon", ()), - ("NewIconThemePath", ("s",)), - ("NewStatus", ("s",)), - ): - self.add_signal(signal_name, value) - - def register(self): - self.menu.register() - super().register() - - def unregister(self): - super().unregister() - self.menu.unregister() - - def on_provide_activation_token(self, token): - Window.activation_token = token - - def __init__(self, application) -> None: - super().__init__(application) - - self.tray_icon: Any = None - self.custom_icons: bool = False - - try: - self.bus = Gio.bus_get_sync(bus_type=Gio.BusType.SESSION) - self.tray_icon = self.StatusNotifierItemService( - activate_callback=self.activate_callback - ) - self.tray_icon.register() - - from clan_app.assets import loc - - icon_path = str(loc / "clan_white_notext.png") - self.set_icon(icon_path) - - self.bus.call_sync( - bus_name="org.kde.StatusNotifierWatcher", - object_path="/StatusNotifierWatcher", - interface_name="org.kde.StatusNotifierWatcher", - method_name="RegisterStatusNotifierItem", - parameters=GLib.Variant( - "(s)", ("/org/ayatana/NotificationItem/Nicotine",) - ), - reply_type=None, - flags=Gio.DBusCallFlags.NONE, - timeout_msec=-1, - ) - - except GLib.Error as error: - self.unload() - raise ImplUnavailableError( - f"StatusNotifier implementation not available: {error}" - ) from error - - self.update_menu() - - @staticmethod - def check_icon_path(icon_name, icon_path) -> bool: - """Check if tray icons exist in the specified icon path.""" - - if not icon_path: - return False - - icon_scheme = f"{pynicotine.__application_id__}-{icon_name}." - - try: - with os.scandir(encode_path(icon_path)) as entries: - for entry in entries: - if entry.is_file() and entry.name.decode( - "utf-8", "replace" - ).startswith(icon_scheme): - return True - - except OSError as error: - log.add_debug(f"Error accessing tray icon path {icon_path}: {error}") - - return False - - def get_icon_path(self): - """Returns an icon path to use for tray icons, or None to fall back to - system-wide icons.""" - - # icon_path = self.application.get_application_icon_path() - - return "" - - def set_icon(self, icon_path) -> None: - self.tray_icon.properties["IconName"].value = icon_path - self.tray_icon.emit_signal("NewIcon") - - if not self.is_visible: - return - - status = "Active" - - if self.tray_icon.properties["Status"].value != status: - self.tray_icon.properties["Status"].value = status - self.tray_icon.emit_signal("NewStatus", status) - - def update_icon_theme(self): - # If custom icon path was found, use it, otherwise we fall back to system icons - icon_path = self.get_icon_path() - self.tray_icon.properties["IconThemePath"].value = icon_path - self.tray_icon.emit_signal("NewIconThemePath", icon_path) - - if icon_path: - log.add_debug("Using tray icon path %s", icon_path) - - def update_menu(self) -> None: - self.tray_icon.menu.set_items(self.menu_items) - - def unload(self, is_shutdown: bool = False) -> None: - if self.tray_icon is None: - return - - status = "Passive" - - self.tray_icon.properties["Status"].value = status - self.tray_icon.emit_signal("NewStatus", status) - - if is_shutdown: - self.tray_icon.unregister() - - -class Win32Implementation(BaseImplementation): - """Windows NotifyIcon implementation. - - https://learn.microsoft.com/en-us/windows/win32/shell/notification-area - https://learn.microsoft.com/en-us/windows/win32/shell/taskbar - """ - - WINDOW_CLASS_NAME = "NicotineTrayIcon" - - NIM_ADD = 0 - NIM_MODIFY = 1 - NIM_DELETE = 2 - - NIF_MESSAGE = 1 - NIF_ICON = 2 - NIF_TIP = 4 - NIF_INFO = 16 - NIIF_NOSOUND = 16 - - MIIM_STATE = 1 - MIIM_ID = 2 - MIIM_STRING = 64 - - MFS_ENABLED = 0 - MFS_UNCHECKED = 0 - MFS_DISABLED = 3 - MFS_CHECKED = 8 - - MFT_SEPARATOR = 2048 - - WM_NULL = 0 - WM_DESTROY = 2 - WM_CLOSE = 16 - WM_COMMAND = 273 - WM_LBUTTONUP = 514 - WM_RBUTTONUP = 517 - WM_USER = 1024 - WM_TRAYICON = WM_USER + 1 - NIN_BALLOONHIDE = WM_USER + 3 - NIN_BALLOONTIMEOUT = WM_USER + 4 - NIN_BALLOONUSERCLICK = WM_USER + 5 - - CS_VREDRAW = 1 - CS_HREDRAW = 2 - COLOR_WINDOW = 5 - IDC_ARROW = 32512 - - WS_OVERLAPPED = 0 - WS_SYSMENU = 524288 - CW_USEDEFAULT = -2147483648 - - IMAGE_ICON = 1 - LR_LOADFROMFILE = 16 - SM_CXSMICON = 49 - - if sys.platform == "win32": - from ctypes import Structure - - class WNDCLASSW(Structure): - from ctypes import CFUNCTYPE, wintypes - - LPFN_WND_PROC = CFUNCTYPE( - wintypes.INT, - wintypes.HWND, - wintypes.UINT, - wintypes.WPARAM, - wintypes.LPARAM, - ) - _fields_: ClassVar = [ - ("style", wintypes.UINT), - ("lpfn_wnd_proc", LPFN_WND_PROC), - ("cb_cls_extra", wintypes.INT), - ("cb_wnd_extra", wintypes.INT), - ("h_instance", wintypes.HINSTANCE), - ("h_icon", wintypes.HICON), - ("h_cursor", wintypes.HANDLE), - ("hbr_background", wintypes.HBRUSH), - ("lpsz_menu_name", wintypes.LPCWSTR), - ("lpsz_class_name", wintypes.LPCWSTR), - ] - - class MENUITEMINFOW(Structure): - from ctypes import wintypes - - _fields_: ClassVar = [ - ("cb_size", wintypes.UINT), - ("f_mask", wintypes.UINT), - ("f_type", wintypes.UINT), - ("f_state", wintypes.UINT), - ("w_id", wintypes.UINT), - ("h_sub_menu", wintypes.HMENU), - ("hbmp_checked", wintypes.HBITMAP), - ("hbmp_unchecked", wintypes.HBITMAP), - ("dw_item_data", wintypes.LPVOID), - ("dw_type_data", wintypes.LPWSTR), - ("cch", wintypes.UINT), - ("hbmp_item", wintypes.HBITMAP), - ] - - class NOTIFYICONDATAW(Structure): - from ctypes import wintypes - - _fields_: ClassVar = [ - ("cb_size", wintypes.DWORD), - ("h_wnd", wintypes.HWND), - ("u_id", wintypes.UINT), - ("u_flags", wintypes.UINT), - ("u_callback_message", wintypes.UINT), - ("h_icon", wintypes.HICON), - ("sz_tip", wintypes.WCHAR * 128), - ("dw_state", wintypes.DWORD), - ("dw_state_mask", wintypes.DWORD), - ("sz_info", wintypes.WCHAR * 256), - ("u_version", wintypes.UINT), - ("sz_info_title", wintypes.WCHAR * 64), - ("dw_info_flags", wintypes.DWORD), - ("guid_item", wintypes.CHAR * 16), - ("h_balloon_icon", wintypes.HICON), - ] - - def __init__(self, application: Gtk.Application) -> None: - from ctypes import windll # type: ignore - - super().__init__(application) - - self._window_class: Any = None - self._h_wnd = None - self._notify_id = None - self._h_icon = None - self._menu = None - self._wm_taskbarcreated = windll.user32.RegisterWindowMessageW("TaskbarCreated") - - self._register_class() - self._create_window() - self.update_icon() - - def _register_class(self) -> None: - from ctypes import byref, windll # type: ignore - - self._window_class = self.WNDCLASSW( # type: ignore - style=(self.CS_VREDRAW | self.CS_HREDRAW), - lpfn_wnd_proc=self.WNDCLASSW.LPFN_WND_PROC(self.on_process_window_message), # type: ignore - h_cursor=windll.user32.LoadCursorW(0, self.IDC_ARROW), - hbr_background=self.COLOR_WINDOW, - lpsz_class_name=self.WINDOW_CLASS_NAME, - ) - - windll.user32.RegisterClassW(byref(self._window_class)) - - def _unregister_class(self): - if self._window_class is None: - return - - from ctypes import windll - - windll.user32.UnregisterClassW( - self.WINDOW_CLASS_NAME, self._window_class.h_instance - ) - self._window_class = None - - def _create_window(self) -> None: - from ctypes import windll # type: ignore - - style = self.WS_OVERLAPPED | self.WS_SYSMENU - self._h_wnd = windll.user32.CreateWindowExW( - 0, - self.WINDOW_CLASS_NAME, - self.WINDOW_CLASS_NAME, - style, - 0, - 0, - self.CW_USEDEFAULT, - self.CW_USEDEFAULT, - 0, - 0, - 0, - None, - ) - - windll.user32.UpdateWindow(self._h_wnd) - - def _destroy_window(self): - if self._h_wnd is None: - return - - from ctypes import windll - - windll.user32.DestroyWindow(self._h_wnd) - self._h_wnd = None - - def _load_ico_buffer(self, icon_name, icon_size): - ico_buffer = b"" - - if GTK_API_VERSION >= 4: - icon = ICON_THEME.lookup_icon( - icon_name, fallbacks=None, size=icon_size, scale=1, direction=0, flags=0 - ) - icon_path = icon.get_file().get_path() - - if not icon_path: - return ico_buffer - - pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size( - icon_path, icon_size, icon_size - ) - else: - icon = ICON_THEME.lookup_icon(icon_name, size=icon_size, flags=0) - - if not icon: - return ico_buffer - - pixbuf = icon.load_icon() - - _success, ico_buffer = pixbuf.save_to_bufferv("ico") - return ico_buffer - - def _load_h_icon(self, icon_name): - from ctypes import windll - - # Attempt to load custom icons first - icon_size = windll.user32.GetSystemMetrics(self.SM_CXSMICON) - ico_buffer = self._load_ico_buffer( - icon_name.replace(f"{pynicotine.__application_id__}-", "nplus-tray-"), - icon_size, - ) - - if not ico_buffer: - # No custom icons present, fall back to default icons - ico_buffer = self._load_ico_buffer(icon_name, icon_size) - - try: - import tempfile - - file_handle = tempfile.NamedTemporaryFile(delete=False) - - with file_handle: - file_handle.write(ico_buffer) - - return windll.user32.LoadImageA( - 0, - encode_path(file_handle.name), - self.IMAGE_ICON, - icon_size, - icon_size, - self.LR_LOADFROMFILE, - ) - - finally: - os.remove(file_handle.name) - - def _destroy_h_icon(self): - from ctypes import windll - - if self._h_icon: - windll.user32.DestroyIcon(self._h_icon) - self._h_icon = None - - def _update_notify_icon(self, title="", message="", icon_name=None): - # pylint: disable=attribute-defined-outside-init,no-member - - if self._h_wnd is None: - return - - if icon_name: - self._destroy_h_icon() - self._h_icon = self._load_h_icon(icon_name) - - if not self.is_visible and not (title or message): - # When disabled by user, temporarily show tray icon when displaying a notification - return - - from ctypes import byref, sizeof, windll - - action = self.NIM_MODIFY - - if self._notify_id is None: - self._notify_id = self.NOTIFYICONDATAW( - cb_size=sizeof(self.NOTIFYICONDATAW), - h_wnd=self._h_wnd, - u_id=0, - u_flags=( - self.NIF_ICON | self.NIF_MESSAGE | self.NIF_TIP | self.NIF_INFO - ), - u_callback_message=self.WM_TRAYICON, - sz_tip=truncate_string_byte( - pynicotine.__application_name__, byte_limit=127 - ), - ) - action = self.NIM_ADD - - if config.sections["notifications"]["notification_popup_sound"]: - self._notify_id.dw_info_flags &= ~self.NIIF_NOSOUND - else: - self._notify_id.dw_info_flags |= self.NIIF_NOSOUND - - self._notify_id.h_icon = self._h_icon - self._notify_id.sz_info_title = truncate_string_byte( - title, byte_limit=63, ellipsize=True - ) - self._notify_id.sz_info = truncate_string_byte( - message, byte_limit=255, ellipsize=True - ) - - windll.shell32.Shell_NotifyIconW(action, byref(self._notify_id)) - - def _remove_notify_icon(self): - from ctypes import byref, windll - - if self._notify_id: - windll.shell32.Shell_NotifyIconW(self.NIM_DELETE, byref(self._notify_id)) - self._notify_id = None - - if self._menu: - windll.user32.DestroyMenu(self._menu) - self._menu = None - - def _serialize_menu_item(self, item): - # pylint: disable=attribute-defined-outside-init,no-member - - from ctypes import sizeof - - item_info = self.MENUITEMINFOW(cb_size=sizeof(self.MENUITEMINFOW)) - w_id = item["id"] - text = item.get("text") - is_checked = item.get("toggled") - is_sensitive = item.get("sensitive") - - item_info.f_mask |= self.MIIM_ID - item_info.w_id = w_id - - if text is not None: - item_info.f_mask |= self.MIIM_STRING - item_info.dw_type_data = text.replace("_", "&") # Mnemonics use & - else: - item_info.f_type |= self.MFT_SEPARATOR - - if is_checked is not None: - item_info.f_mask |= self.MIIM_STATE - item_info.f_state |= self.MFS_CHECKED if is_checked else self.MFS_UNCHECKED - - if is_sensitive is not None: - item_info.f_mask |= self.MIIM_STATE - item_info.f_state |= self.MFS_ENABLED if is_sensitive else self.MFS_DISABLED - - return item_info - - def _show_menu(self): - from ctypes import byref, windll, wintypes - - if self._menu is None: - self.update_menu() - - pos = wintypes.POINT() - windll.user32.GetCursorPos(byref(pos)) - - # PRB: Menus for Notification Icons Do Not Work Correctly - # https://web.archive.org/web/20121015064650/http://support.microsoft.com/kb/135788 - - windll.user32.SetForegroundWindow(self._h_wnd) - windll.user32.TrackPopupMenu(self._menu, 0, pos.x, pos.y, 0, self._h_wnd, None) - windll.user32.PostMessageW(self._h_wnd, self.WM_NULL, 0, 0) - - def update_menu(self): - from ctypes import byref, windll - - if self._menu is None: - self._menu = windll.user32.CreatePopupMenu() - - for item in self.menu_items.values(): - item_id = item["id"] - item_info = self._serialize_menu_item(item) - - if not windll.user32.SetMenuItemInfoW( - self._menu, item_id, False, byref(item_info) - ): - windll.user32.InsertMenuItemW( - self._menu, item_id, False, byref(item_info) - ) - - def set_icon(self, icon_name): - self._update_notify_icon(icon_name=icon_name) - - def show_notification(self, title, message): - self._update_notify_icon(title=title, message=message) - - def on_process_window_message(self, h_wnd, msg, w_param, l_param): - from ctypes import windll, wintypes - - if msg == self.WM_TRAYICON: - if l_param == self.WM_RBUTTONUP: - # Icon pressed - self._show_menu() - - elif l_param == self.WM_LBUTTONUP: - # Icon pressed - self.activate_callback() - - elif l_param in ( - self.NIN_BALLOONHIDE, - self.NIN_BALLOONTIMEOUT, - self.NIN_BALLOONUSERCLICK, - ): - if not config.sections["ui"]["trayicon"]: - # Notification dismissed, but user has disabled tray icon - self._remove_notify_icon() - - elif msg == self.WM_COMMAND: - # Menu item pressed - menu_item_id = w_param & 0xFFFF - menu_item_callback = self.menu_items[menu_item_id]["callback"] - menu_item_callback() - - elif msg == self._wm_taskbarcreated: - # Taskbar process restarted, create new icon - self._remove_notify_icon() - self._update_notify_icon() - - return windll.user32.DefWindowProcW( - wintypes.HWND(h_wnd), - msg, - wintypes.WPARAM(w_param), - wintypes.LPARAM(l_param), - ) - - def unload(self, is_shutdown=False): - self._remove_notify_icon() - - if not is_shutdown: - # Keep notification support as long as we're running - return - - self._destroy_h_icon() - self._destroy_window() - self._unregister_class() - - -class TrayIcon: - def __init__(self, application: Gio.Application) -> None: - self.application: Gio.Application = application - self.available: bool = True - self.implementation: Any = None - - self.watch_availability() - self.load() - - def watch_availability(self) -> None: - if sys.platform in {"win32", "darwin"}: - return - - Gio.bus_watch_name( - bus_type=Gio.BusType.SESSION, - name="org.kde.StatusNotifierWatcher", - flags=Gio.BusNameWatcherFlags.NONE, - name_appeared_closure=self.load, - name_vanished_closure=self.unload, - ) - - def load(self, *_args: Any) -> None: - self.available = True - - if sys.platform == "win32": - # Always keep tray icon loaded for Windows notification support - pass - - elif not config.sections["ui"]["trayicon"]: - # No need to have tray icon loaded now (unless this is Windows) - return - - if self.implementation is None: - if sys.platform == "win32": - self.implementation = Win32Implementation(self.application) - else: - try: - self.implementation = StatusNotifierImplementation(self.application) # type: ignore - - except ImplUnavailableError: - self.available = False - return - - self.refresh_state() - - def update_window_visibility(self) -> None: - if self.implementation: - self.implementation.update_window_visibility() - - def update_user_status(self) -> None: - if self.implementation: - self.implementation.update_user_status() - - def update_icon(self) -> None: - if self.implementation: - self.implementation.update_icon() - - def update_icon_theme(self) -> None: - if self.implementation: - self.implementation.update_icon_theme() - - def set_download_status(self, status: str) -> None: - if self.implementation: - self.implementation.set_download_status(status) - - def set_upload_status(self, status: str) -> None: - if self.implementation: - self.implementation.set_upload_status(status) - - def show_notification(self, title: str, message: str) -> None: - if self.implementation: - self.implementation.show_notification(title=title, message=message) - - def refresh_state(self) -> None: - if not self.implementation: - return - - self.implementation.is_visible = True - - self.update_icon_theme() - self.update_icon() - self.update_window_visibility() - self.update_user_status() - - def unload(self, *_args: Any, is_shutdown: bool = False) -> None: - if self.implementation: - self.implementation.unload(is_shutdown=is_shutdown) - self.implementation.is_visible = False - - if is_shutdown: - self.implementation = None - - def destroy(self) -> None: - self.unload(is_shutdown=True) - self.__dict__.clear() diff --git a/pkgs/clan-app/clan_app/components/vmobj.py b/pkgs/clan-app/clan_app/components/vmobj.py deleted file mode 100644 index 3949e27fa..000000000 --- a/pkgs/clan-app/clan_app/components/vmobj.py +++ /dev/null @@ -1,375 +0,0 @@ -import logging -import multiprocessing as mp -import os -import tempfile -import threading -import time -import weakref -from collections.abc import Callable, Generator -from contextlib import contextmanager -from datetime import datetime -from pathlib import Path -from typing import IO, ClassVar - -import gi -from clan_cli import vms -from clan_cli.clan_uri import ClanURI -from clan_cli.history.add import HistoryEntry -from clan_cli.machines.machines import Machine - -from clan_app.components.executor import MPProcess, spawn -from clan_app.singletons.toast import ( - InfoToast, - SuccessToast, - ToastOverlay, - WarningToast, -) - -gi.require_version("GObject", "2.0") -gi.require_version("Gtk", "4.0") -from gi.repository import Gio, GLib, GObject, Gtk - -log = logging.getLogger(__name__) - - -class VMObject(GObject.Object): - # Define a custom signal with the name "vm_stopped" and a string argument for the message - __gsignals__: ClassVar = { - "vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []), - "vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]), - } - - def __init__( - self, - icon: Path, - data: HistoryEntry, - build_log_cb: Callable[[Gio.File], None], - ) -> None: - super().__init__() - - # Store the data from the history entry - self.data: HistoryEntry = data - - self.build_log_cb = build_log_cb - - # Create a process object to store the VM process - self.vm_process: MPProcess = MPProcess( - "vm_dummy", mp.Process(), Path("./dummy") - ) - self.build_process: MPProcess = MPProcess( - "build_dummy", mp.Process(), Path("./dummy") - ) - self._start_thread: threading.Thread = threading.Thread() - self.machine: Machine | None = None - - # Watcher to stop the VM - self.KILL_TIMEOUT: int = 20 # seconds - self._stop_thread: threading.Thread = threading.Thread() - - # Build progress bar vars - self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar() - self.progress_bar.hide() - self.progress_bar.set_hexpand(True) # Horizontally expand - self.prog_bar_id: int = 0 - - # Create a temporary directory to store the logs - self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory( - prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}" - ) - self._logs_id: int = 0 - self._log_file: IO[str] | None = None - - # To be able to set the switch state programmatically - # we need to store the handler id returned by the connect method - # and block the signal while we change the state. This is cursed. - self.switch: Gtk.Switch = Gtk.Switch() - self.switch_handler_id: int = self.switch.connect( - "notify::active", self._on_switch_toggle - ) - self.connect("vm_status_changed", self._on_vm_status_changed) - - # Make sure the VM is killed when the reference to this object is dropped - self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop) - - def _vm_status_changed_task(self) -> bool: - self.emit("vm_status_changed") - return GLib.SOURCE_REMOVE - - def update(self, data: HistoryEntry) -> None: - self.data = data - - def _on_vm_status_changed(self, source: "VMObject") -> None: - # Signal may be emitted multiple times - self.emit("vm_build_notify", self.is_building(), self.is_running()) - - prev_state = self.switch.get_state() - next_state = self.is_running() and not self.is_building() - - self.switch.set_state(next_state) - if prev_state is False and next_state is True: - ToastOverlay.use().add_toast_unique( - SuccessToast(f"{source.data.flake.flake_attr} started").toast, - "success.vm.start", - ) - - if self.switch.get_sensitive() is False and not self.is_building(): - self.switch.set_sensitive(True) - - exit_vm = self.vm_process.proc.exitcode - exit_build = self.build_process.proc.exitcode - exitc = exit_vm or exit_build - if not self.is_running() and exitc != 0: - with self.switch.handler_block(self.switch_handler_id): - self.switch.set_active(False) - log.error(f"VM exited with error. Exitcode: {exitc}") - ToastOverlay.use().add_toast_unique( - WarningToast(f"VM exited with error. Exitcode: {exitc}").toast, - "warning.vm.exit", - ) - - def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None: - if switch.get_active(): - switch.set_state(False) - switch.set_sensitive(False) - self.start() - else: - switch.set_state(True) - self.shutdown() - switch.set_sensitive(False) - - # We use a context manager to create the machine object - # and make sure it is destroyed when the context is exited - @contextmanager - def _create_machine(self) -> Generator[Machine, None, None]: - uri = ClanURI.from_str( - url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr - ) - if uri.flake_id.is_local(): - self.machine = Machine( - name=self.data.flake.flake_attr, - flake=uri.flake_id.path, - ) - if uri.flake_id.is_remote(): - self.machine = Machine( - name=self.data.flake.flake_attr, - flake=uri.flake_id.url, - ) - assert self.machine is not None - yield self.machine - self.machine = None - - def _pulse_progress_bar_task(self) -> bool: - if self.progress_bar.is_visible(): - self.progress_bar.pulse() - return GLib.SOURCE_CONTINUE - else: - return GLib.SOURCE_REMOVE - - def __start(self) -> None: - with self._create_machine() as machine: - # Start building VM - tstart = datetime.now() - log.info(f"Building VM {self.get_id()}") - log_dir = Path(str(self.log_dir.name)) - - # Start the build process - self.build_process = spawn( - on_except=None, - out_file=log_dir / "build.log", - func=vms.run.build_vm, - machine=machine, - tmpdir=log_dir, - ) - - gfile = Gio.File.new_for_path(str(log_dir / "build.log")) - # Gio documentation: - # Obtains a file monitor for the given file. - # If no file notification mechanism exists, then regular polling of the file is used. - g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None) - g_monitor.connect("changed", self.on_logs_changed) - - GLib.idle_add(self._vm_status_changed_task) - self.switch.set_sensitive(True) - # Start the logs watcher - self._logs_id = GLib.timeout_add( - 50, self._get_logs_task, self.build_process - ) - if self._logs_id == 0: - log.error("Failed to start VM log watcher") - log.debug(f"Starting logs watcher on file: {self.build_process.out_file}") - - # Start the progress bar and show it - self.progress_bar.show() - self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task) - if self.prog_bar_id == 0: - log.error("Couldn't spawn a progress bar task") - - # Wait for the build to finish then hide the progress bar - self.build_process.proc.join() - tend = datetime.now() - log.info(f"VM {self.get_id()} build took {tend - tstart}s") - self.progress_bar.hide() - - # Check if the VM was built successfully - if self.build_process.proc.exitcode != 0: - log.error(f"Failed to build VM {self.get_id()}") - GLib.idle_add(self._vm_status_changed_task) - return - log.info(f"Successfully built VM {self.get_id()}") - - # Start the VM - self.vm_process = spawn( - on_except=None, - out_file=Path(str(self.log_dir.name)) / "vm.log", - func=vms.run.run_vm, - vm=self.data.flake.vm, - cachedir=log_dir, - socketdir=log_dir, - ) - log.debug(f"Started VM {self.get_id()}") - GLib.idle_add(self._vm_status_changed_task) - - # Start the logs watcher - self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process) - if self._logs_id == 0: - log.error("Failed to start VM log watcher") - log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}") - - # Wait for the VM to stop - self.vm_process.proc.join() - log.debug(f"VM {self.get_id()} has stopped") - GLib.idle_add(self._vm_status_changed_task) - - def on_logs_changed( - self, - monitor: Gio.FileMonitor, - file: Gio.File, - other_file: Gio.File, - event_type: Gio.FileMonitorEvent, - ) -> None: - if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT: - # File was changed and the changes were written to disk - # wire up the callback for setting the logs - self.build_log_cb(file) - - def start(self) -> None: - if self.is_running(): - log.warn("VM is already running. Ignoring start request") - self.emit("vm_status_changed", self) - return - log.debug(f"VM state dir {self.log_dir.name}") - self._start_thread = threading.Thread(target=self.__start) - self._start_thread.start() - - def _get_logs_task(self, proc: MPProcess) -> bool: - if not proc.out_file.exists(): - return GLib.SOURCE_CONTINUE - - if not self._log_file: - try: - self._log_file = open(proc.out_file) - except Exception as ex: - log.exception(ex) - self._log_file = None - return GLib.SOURCE_REMOVE - - line = os.read(self._log_file.fileno(), 4096) - if len(line) != 0: - print(line.decode("utf-8"), end="", flush=True) - - if not proc.proc.is_alive(): - log.debug("Removing logs watcher") - self._log_file = None - return GLib.SOURCE_REMOVE - - return GLib.SOURCE_CONTINUE - - def is_running(self) -> bool: - return self._start_thread.is_alive() - - def is_building(self) -> bool: - return self.build_process.proc.is_alive() - - def is_shutting_down(self) -> bool: - return self._stop_thread.is_alive() - - def get_id(self) -> str: - return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}" - - def __stop(self) -> None: - log.info(f"Stopping VM {self.get_id()}") - - start_time = datetime.now() - while self.is_running(): - diff = datetime.now() - start_time - if diff.seconds > self.KILL_TIMEOUT: - log.error( - f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it" - ) - self.vm_process.kill_group() - break - if self.is_building(): - log.info(f"VM {self.get_id()} is still building. Killing it") - self.build_process.kill_group() - break - if not self.machine: - log.error(f"Machine object is None. Killing VM {self.get_id()}") - self.vm_process.kill_group() - break - - # Try to shutdown the VM gracefully using QMP - try: - with self.machine.vm.qmp_ctx() as qmp: - qmp.command("system_powerdown") - except Exception as ex: - log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}") - - # Try 20 times to stop the VM - time.sleep(self.KILL_TIMEOUT / 20) - GLib.idle_add(self._vm_status_changed_task) - log.debug(f"VM {self.get_id()} has stopped") - - ToastOverlay.use().add_toast_unique( - InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit" - ) - - def shutdown(self) -> None: - if not self.is_running(): - log.warning("VM not running. Ignoring shutdown request.") - self.emit("vm_status_changed", self) - return - if self.is_shutting_down(): - log.warning("Shutdown already in progress") - self.emit("vm_status_changed", self) - return - self._stop_thread = threading.Thread(target=self.__stop) - self._stop_thread.start() - - def _kill_ref_drop(self) -> None: - if self.is_running(): - log.warning("Killing VM due to reference drop") - self.kill() - - def kill(self) -> None: - if not self.is_running(): - log.warning(f"Tried to kill VM {self.get_id()} is not running") - return - log.info(f"Killing VM {self.get_id()} now") - - if self.vm_process.proc.is_alive(): - self.vm_process.kill_group() - - if self.build_process.proc.is_alive(): - self.build_process.kill_group() - - def read_whole_log(self) -> str: - if not self.vm_process.out_file.exists(): - log.error(f"Log file {self.vm_process.out_file} does not exist") - return "" - return self.vm_process.out_file.read_text() - - def __str__(self) -> str: - return f"VM({self.get_id()})" - - def __repr__(self) -> str: - return self.__str__() diff --git a/pkgs/clan-app/clan_app/singletons/toast.py b/pkgs/clan-app/clan_app/singletons/toast.py index 85da51730..6b262b9f6 100644 --- a/pkgs/clan-app/clan_app/singletons/toast.py +++ b/pkgs/clan-app/clan_app/singletons/toast.py @@ -10,7 +10,6 @@ gi.require_version("Adw", "1") from gi.repository import Adw from clan_app.singletons.use_views import ViewStack -from clan_app.views.logs import Logs log = logging.getLogger(__name__) @@ -48,35 +47,6 @@ class ToastOverlay: toast.connect("dismissed", lambda toast: self.active_toasts.remove(key)) -class ErrorToast: - toast: Adw.Toast - - def __init__( - self, message: str, persistent: bool = False, details: str = "" - ) -> None: - super().__init__() - self.toast = Adw.Toast.new( - f"""❌ Error {message}""" - ) - self.toast.set_use_markup(True) - - self.toast.set_priority(Adw.ToastPriority.HIGH) - self.toast.set_button_label("Show more") - - if persistent: - self.toast.set_timeout(0) - - views = ViewStack.use().view - - # we cannot check this type, python is not smart enough - logs_view: Logs = views.get_child_by_name("logs") # type: ignore - logs_view.set_message(details) - - self.toast.connect( - "button-clicked", - lambda _: views.set_visible_child_name("logs"), - ) - class WarningToast: toast: Adw.Toast diff --git a/pkgs/clan-app/clan_app/singletons/use_join.py b/pkgs/clan-app/clan_app/singletons/use_join.py deleted file mode 100644 index 18ad418e0..000000000 --- a/pkgs/clan-app/clan_app/singletons/use_join.py +++ /dev/null @@ -1,106 +0,0 @@ -import logging -import threading -from collections.abc import Callable -from typing import Any, ClassVar, cast - -import gi -from clan_cli.clan_uri import ClanURI -from clan_cli.history.add import HistoryEntry, add_history - -from clan_app.components.gkvstore import GKVStore -from clan_app.singletons.use_vms import ClanStore - -gi.require_version("Gtk", "4.0") -gi.require_version("Adw", "1") -from gi.repository import Gio, GLib, GObject - -log = logging.getLogger(__name__) - - -class JoinValue(GObject.Object): - __gsignals__: ClassVar = { - "join_finished": (GObject.SignalFlags.RUN_FIRST, None, []), - } - - url: ClanURI - entry: HistoryEntry | None - - def _join_finished_task(self) -> bool: - self.emit("join_finished") - return GLib.SOURCE_REMOVE - - def __init__(self, url: ClanURI) -> None: - super().__init__() - self.url: ClanURI = url - self.entry: HistoryEntry | None = None - - def __join(self) -> None: - new_entry = add_history(self.url) - self.entry = new_entry - GLib.idle_add(self._join_finished_task) - - def join(self) -> None: - threading.Thread(target=self.__join).start() - - -class JoinList: - """ - This is a singleton. - It is initialized with the first call of use() - """ - - _instance: "None | JoinList" = None - list_store: Gio.ListStore - - # Make sure the VMS class is used as a singleton - def __init__(self) -> None: - raise RuntimeError("Call use() instead") - - @classmethod - def use(cls: Any) -> "JoinList": - if cls._instance is None: - cls._instance = cls.__new__(cls) - cls.list_store = Gio.ListStore.new(JoinValue) - - ClanStore.use().register_on_deep_change(cls._instance._rerender_join_list) - - return cls._instance - - def _rerender_join_list( - self, source: GKVStore, position: int, removed: int, added: int - ) -> None: - self.list_store.items_changed( - 0, self.list_store.get_n_items(), self.list_store.get_n_items() - ) - - def is_empty(self) -> bool: - return self.list_store.get_n_items() == 0 - - def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None: - """ - Add a join request. - This method can add multiple join requests if called subsequently for each request. - """ - - value = JoinValue(uri) - if value.url.machine.get_id() in [ - cast(JoinValue, item).url.machine.get_id() for item in self.list_store - ]: - log.info(f"Join request already exists: {value.url}. Ignoring.") - return - - value.connect("join_finished", self._on_join_finished) - value.connect("join_finished", after_join) - - self.list_store.append(value) - - def _on_join_finished(self, source: JoinValue) -> None: - log.info(f"Join finished: {source.url}") - self.discard(source) - assert source.entry is not None - ClanStore.use().push_history_entry(source.entry) - - def discard(self, value: JoinValue) -> None: - (has, idx) = self.list_store.find(value) - if has: - self.list_store.remove(idx) diff --git a/pkgs/clan-app/clan_app/singletons/use_vms.py b/pkgs/clan-app/clan_app/singletons/use_vms.py deleted file mode 100644 index 5060b76a1..000000000 --- a/pkgs/clan-app/clan_app/singletons/use_vms.py +++ /dev/null @@ -1,181 +0,0 @@ -import logging -from collections.abc import Callable -from pathlib import Path -from typing import Any, ClassVar - -import gi -from clan_cli.clan_uri import ClanURI -from clan_cli.history.add import HistoryEntry - -from clan_app import assets -from clan_app.components.gkvstore import GKVStore -from clan_app.components.vmobj import VMObject -from clan_app.singletons.use_views import ViewStack -from clan_app.views.logs import Logs - -gi.require_version("GObject", "2.0") -gi.require_version("Gtk", "4.0") -from gi.repository import Gio, GLib, GObject - -log = logging.getLogger(__name__) - - -class VMStore(GKVStore): - def __init__(self) -> None: - super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr) - - -class Emitter(GObject.GObject): - __gsignals__: ClassVar = { - "is_ready": (GObject.SignalFlags.RUN_FIRST, None, []), - } - - -class ClanStore: - _instance: "None | ClanStore" = None - _clan_store: GKVStore[str, VMStore] - - _emitter: Emitter - - # set the vm that is outputting logs - # build logs are automatically streamed to the logs-view - _logging_vm: VMObject | None = None - - # Make sure the VMS class is used as a singleton - def __init__(self) -> None: - raise RuntimeError("Call use() instead") - - @classmethod - def use(cls: Any) -> "ClanStore": - if cls._instance is None: - cls._instance = cls.__new__(cls) - cls._clan_store = GKVStore( - VMStore, lambda store: store.first().data.flake.flake_url - ) - cls._emitter = Emitter() - - return cls._instance - - def emit(self, signal: str) -> None: - self._emitter.emit(signal) - - def connect(self, signal: str, cb: Callable[(...), Any]) -> None: - self._emitter.connect(signal, cb) - - def set_logging_vm(self, ident: str) -> VMObject | None: - vm = self.get_vm(ClanURI(f"clan://{ident}")) - if vm is not None: - self._logging_vm = vm - - return self._logging_vm - - def register_on_deep_change( - self, callback: Callable[[GKVStore, int, int, int], None] - ) -> None: - """ - Register a callback that is called when a clan_store or one of the included VMStores changes - """ - - def on_vmstore_change( - store: VMStore, position: int, removed: int, added: int - ) -> None: - callback(store, position, removed, added) - - def on_clanstore_change( - store: "GKVStore", position: int, removed: int, added: int - ) -> None: - if added > 0: - store.values()[position].register_on_change(on_vmstore_change) - callback(store, position, removed, added) - - self.clan_store.register_on_change(on_clanstore_change) - - @property - def clan_store(self) -> GKVStore[str, VMStore]: - return self._clan_store - - def create_vm_task(self, vm: HistoryEntry) -> bool: - self.push_history_entry(vm) - return GLib.SOURCE_REMOVE - - def push_history_entry(self, entry: HistoryEntry) -> None: - # TODO: We shouldn't do this here but in the list view - if entry.flake.icon is None: - icon: Path = assets.loc / "placeholder.jpeg" - else: - icon = Path(entry.flake.icon) - - def log_details(gfile: Gio.File) -> None: - self.log_details(vm, gfile) - - vm = VMObject(icon=icon, data=entry, build_log_cb=log_details) - self.push(vm) - - def log_details(self, vm: VMObject, gfile: Gio.File) -> None: - views = ViewStack.use().view - logs_view: Logs = views.get_child_by_name("logs") # type: ignore - - def file_read_callback( - source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any - ) -> None: - try: - # Finish the asynchronous read operation - res = source_object.load_contents_finish(result) - _success, contents, _etag_out = res - - # Convert the byte array to a string and print it - logs_view.set_message(contents.decode("utf-8")) - except Exception as e: - print(f"Error reading file: {e}") - - # only one vm can output logs at a time - if vm == self._logging_vm: - gfile.load_contents_async(None, file_read_callback, None) - - # we cannot check this type, python is not smart enough - - def push(self, vm: VMObject) -> None: - url = str(vm.data.flake.flake_url) - - # Only write to the store if the Clan is not already in it - # Every write to the KVStore rerenders bound widgets to the clan_store - if url not in self.clan_store: - log.debug(f"Creating new VMStore for {url}") - vm_store = VMStore() - vm_store.append(vm) - self.clan_store[url] = vm_store - else: - vm_store = self.clan_store[url] - machine = vm.data.flake.flake_attr - old_vm = vm_store.get(machine) - - if old_vm: - log.info( - f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field." - ) - old_vm.update(vm.data) - else: - log.debug(f"Appending VM {vm.data.flake.flake_attr} to store") - vm_store.append(vm) - - def remove(self, vm: VMObject) -> None: - del self.clan_store[str(vm.data.flake.flake_url)][vm.data.flake.flake_attr] - - def get_vm(self, uri: ClanURI) -> None | VMObject: - vm_store = self.clan_store.get(str(uri.flake_id)) - if vm_store is None: - return None - machine = vm_store.get(uri.machine.name, None) - return machine - - def get_running_vms(self) -> list[VMObject]: - return [ - vm - for clan in self.clan_store.values() - for vm in clan.values() - if vm.is_running() - ] - - def kill_all(self) -> None: - for vm in self.get_running_vms(): - vm.kill() diff --git a/pkgs/clan-app/clan_app/views/details.py b/pkgs/clan-app/clan_app/views/details.py deleted file mode 100644 index c9ec2f93f..000000000 --- a/pkgs/clan-app/clan_app/views/details.py +++ /dev/null @@ -1,61 +0,0 @@ -import os -from collections.abc import Callable -from functools import partial -from typing import Any, Literal, TypeVar - -import gi - -gi.require_version("Adw", "1") -from gi.repository import Adw, Gio, GObject, Gtk - -# Define a TypeVar that is bound to GObject.Object -ListItem = TypeVar("ListItem", bound=GObject.Object) - - -def create_details_list( - model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget] -) -> Gtk.ListBox: - boxed_list = Gtk.ListBox() - boxed_list.set_selection_mode(Gtk.SelectionMode.NONE) - boxed_list.add_css_class("boxed-list") - boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list)) - return boxed_list - - -class PreferencesValue(GObject.Object): - variant: Literal["CPU", "MEMORY"] - editable: bool - data: Any - - def __init__( - self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any - ) -> None: - super().__init__() - self.variant = variant - self.editable = editable - self.data = data - - -class Details(Gtk.Box): - def __init__(self) -> None: - super().__init__(orientation=Gtk.Orientation.VERTICAL) - - preferences_store = Gio.ListStore.new(PreferencesValue) - preferences_store.append(PreferencesValue("CPU", True, 1)) - - self.details_list = create_details_list( - model=preferences_store, render_row=self.render_entry_row - ) - - self.append(self.details_list) - - def render_entry_row( - self, boxed_list: Gtk.ListBox, item: PreferencesValue - ) -> Gtk.Widget: - cores: int | None = os.cpu_count() - fcores = float(cores) if cores else 1.0 - - row = Adw.SpinRow.new_with_range(0, fcores, 1) - row.set_value(item.data) - - return row diff --git a/pkgs/clan-app/clan_app/views/list.py b/pkgs/clan-app/clan_app/views/list.py deleted file mode 100644 index b165cac0e..000000000 --- a/pkgs/clan-app/clan_app/views/list.py +++ /dev/null @@ -1,356 +0,0 @@ -import base64 -import logging -from collections.abc import Callable -from functools import partial -from typing import Any, TypeVar - -import gi -from clan_cli.clan_uri import ClanURI - -from clan_app.components.gkvstore import GKVStore -from clan_app.components.interfaces import ClanConfig -from clan_app.components.list_splash import EmptySplash -from clan_app.components.vmobj import VMObject -from clan_app.singletons.toast import ( - LogToast, - SuccessToast, - ToastOverlay, - WarningToast, -) -from clan_app.singletons.use_join import JoinList, JoinValue -from clan_app.singletons.use_views import ViewStack -from clan_app.singletons.use_vms import ClanStore, VMStore -from clan_app.views.logs import Logs - -gi.require_version("Adw", "1") -from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk - -log = logging.getLogger(__name__) - -ListItem = TypeVar("ListItem", bound=GObject.Object) -CustomStore = TypeVar("CustomStore", bound=Gio.ListModel) - - -def create_boxed_list( - model: CustomStore, - render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget], -) -> Gtk.ListBox: - boxed_list = Gtk.ListBox() - boxed_list.set_selection_mode(Gtk.SelectionMode.NONE) - boxed_list.add_css_class("boxed-list") - boxed_list.add_css_class("no-shadow") - - boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list)) - return boxed_list - - -class ClanList(Gtk.Box): - """ - The ClanList - Is the composition of - the ClanListToolbar - the clanListView - # ------------------------ # - # - Tools < Edit> # - # ------------------------ # - # - List Items - # - <...> - # ------------------------# - """ - - def __init__(self, config: ClanConfig) -> None: - super().__init__(orientation=Gtk.Orientation.VERTICAL) - - app = Gio.Application.get_default() - assert app is not None - app.connect("join_request", self.on_join_request) - - self.log_label: Gtk.Label = Gtk.Label() - - # Add join list - self.join_boxed_list = create_boxed_list( - model=JoinList.use().list_store, render_row=self.render_join_row - ) - self.join_boxed_list.add_css_class("join-list") - self.append(self.join_boxed_list) - - clan_store = ClanStore.use() - clan_store.connect("is_ready", self.display_splash) - - self.group_list = create_boxed_list( - model=clan_store.clan_store, render_row=self.render_group_row - ) - self.group_list.add_css_class("group-list") - self.append(self.group_list) - - self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x)) - - def display_splash(self, source: GKVStore) -> None: - print("Displaying splash") - if ( - ClanStore.use().clan_store.get_n_items() == 0 - and JoinList.use().list_store.get_n_items() == 0 - ): - self.append(self.splash) - - def render_group_row( - self, boxed_list: Gtk.ListBox, vm_store: VMStore - ) -> Gtk.Widget: - self.remove(self.splash) - - vm = vm_store.first() - log.debug("Rendering group row for %s", vm.data.flake.flake_url) - - grp = Adw.PreferencesGroup() - grp.set_title(vm.data.flake.clan_name) - grp.set_description(vm.data.flake.flake_url) - - add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s")) - add_action.connect("activate", self.on_add) - app = Gio.Application.get_default() - assert app is not None - app.add_action(add_action) - - # menu_model = Gio.Menu() - # TODO: Make this lazy, blocks UI startup for too long - # for vm in machines.list.list_machines(flake_url=vm.data.flake.flake_url): - # if vm not in vm_store: - # menu_model.append(vm, f"app.add::{vm}") - - box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) - box.set_valign(Gtk.Align.CENTER) - - add_button = Gtk.Button() - add_button_content = Adw.ButtonContent.new() - add_button_content.set_label("Add machine") - add_button_content.set_icon_name("list-add-symbolic") - add_button.add_css_class("flat") - add_button.set_child(add_button_content) - - # add_button.set_has_frame(False) - # add_button.set_menu_model(menu_model) - # add_button.set_label("Add machine") - box.append(add_button) - - grp.set_header_suffix(box) - - vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row) - grp.add(vm_list) - - return grp - - def on_add(self, source: Any, parameter: Any) -> None: - target = parameter.get_string() - print("Adding new machine", target) - - def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget: - # Remove no-shadow class if attached - if boxed_list.has_css_class("no-shadow"): - boxed_list.remove_css_class("no-shadow") - flake = vm.data.flake - row = Adw.ActionRow() - - # ====== Display Avatar ====== - avatar = Adw.Avatar() - machine_icon = flake.vm.machine_icon - - # If there is a machine icon, display it else - # display the clan icon - if machine_icon: - avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon))) - elif flake.icon: - avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon))) - else: - avatar.set_text(flake.clan_name + " " + flake.flake_attr) - - avatar.set_show_initials(True) - avatar.set_size(50) - row.add_prefix(avatar) - - # ====== Display Name And Url ===== - row.set_title(flake.flake_attr) - row.set_title_lines(1) - row.set_title_selectable(True) - - # If there is a machine description, display it else - # display the clan name - if flake.vm.machine_description: - row.set_subtitle(flake.vm.machine_description) - else: - row.set_subtitle(flake.clan_name) - row.set_subtitle_lines(1) - - # ==== Display build progress bar ==== - build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) - build_box.set_valign(Gtk.Align.CENTER) - build_box.append(vm.progress_bar) - build_box.set_homogeneous(False) - row.add_suffix(build_box) # This allows children to have different sizes - - # ==== Action buttons ==== - button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) - button_box.set_valign(Gtk.Align.CENTER) - - ## Drop down menu - open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s")) - open_action.connect("activate", self.on_edit) - - action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8") - - build_logs_action = Gio.SimpleAction.new( - f"logs.{action_id}", GLib.VariantType.new("s") - ) - - build_logs_action.connect("activate", self.on_show_build_logs) - build_logs_action.set_enabled(False) - - app = Gio.Application.get_default() - assert app is not None - - app.add_action(open_action) - app.add_action(build_logs_action) - - # set a callback function for conditionally enabling the build_logs action - def on_vm_build_notify( - vm: VMObject, is_building: bool, is_running: bool - ) -> None: - build_logs_action.set_enabled(is_building or is_running) - app.add_action(build_logs_action) - if is_building: - ToastOverlay.use().add_toast_unique( - LogToast( - """Build process running ...""", - on_button_click=lambda: self.show_vm_build_logs(vm.get_id()), - ).toast, - f"info.build.running.{vm}", - ) - - vm.connect("vm_build_notify", on_vm_build_notify) - - menu_model = Gio.Menu() - menu_model.append("Edit", f"app.edit::{vm.get_id()}") - menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}") - - pref_button = Gtk.MenuButton() - pref_button.set_icon_name("open-menu-symbolic") - pref_button.set_menu_model(menu_model) - - button_box.append(pref_button) - - ## VM switch button - switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) - switch_box.set_valign(Gtk.Align.CENTER) - switch_box.append(vm.switch) - button_box.append(switch_box) - - row.add_suffix(button_box) - - return row - - def on_edit(self, source: Any, parameter: Any) -> None: - target = parameter.get_string() - print("Editing settings for machine", target) - - def on_show_build_logs(self, _: Any, parameter: Any) -> None: - target = parameter.get_string() - self.show_vm_build_logs(target) - - def show_vm_build_logs(self, target: str) -> None: - vm = ClanStore.use().set_logging_vm(target) - if vm is None: - raise ValueError(f"VM {target} not found") - - views = ViewStack.use().view - # Reset the logs view - logs: Logs = views.get_child_by_name("logs") # type: ignore - - if logs is None: - raise ValueError("Logs view not found") - - name = vm.machine.name if vm.machine else "Unknown" - - logs.set_title(f"""📄 {name}""") - # initial message. Streaming happens automatically when the file is changed by the build process - with open(vm.build_process.out_file) as f: - logs.set_message(f.read()) - - views.set_visible_child_name("logs") - - def render_join_row( - self, boxed_list: Gtk.ListBox, join_val: JoinValue - ) -> Gtk.Widget: - if boxed_list.has_css_class("no-shadow"): - boxed_list.remove_css_class("no-shadow") - - log.debug("Rendering join row for %s", join_val.url) - - row = Adw.ActionRow() - row.set_title(join_val.url.machine.name) - row.set_subtitle(str(join_val.url)) - row.add_css_class("trust") - - vm = ClanStore.use().get_vm(join_val.url) - - # Can't do this here because clan store is empty at this point - if vm is not None: - sub = row.get_subtitle() - assert sub is not None - - ToastOverlay.use().add_toast_unique( - WarningToast( - f"""{join_val.url.machine.name!s} Already exists. Joining again will update it""" - ).toast, - "warning.duplicate.join", - ) - - row.set_subtitle( - sub + "\nClan already exists. Joining again will update it" - ) - - avatar = Adw.Avatar() - avatar.set_text(str(join_val.url.machine.name)) - avatar.set_show_initials(True) - avatar.set_size(50) - row.add_prefix(avatar) - - cancel_button = Gtk.Button(label="Cancel") - cancel_button.add_css_class("error") - cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val)) - self.cancel_button = cancel_button - - trust_button = Gtk.Button(label="Join") - trust_button.add_css_class("success") - trust_button.connect("clicked", partial(self.on_trust_clicked, join_val)) - - box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5) - box.set_valign(Gtk.Align.CENTER) - box.append(cancel_button) - box.append(trust_button) - - row.add_suffix(box) - - return row - - def on_join_request(self, source: Any, url: str) -> None: - log.debug("Join request: %s", url) - clan_uri = ClanURI(url) - JoinList.use().push(clan_uri, self.on_after_join) - - def on_after_join(self, source: JoinValue) -> None: - ToastOverlay.use().add_toast_unique( - SuccessToast(f"Updated {source.url.machine.name}").toast, - "success.join", - ) - # If the join request list is empty disable the shadow artefact - if JoinList.use().is_empty(): - self.join_boxed_list.add_css_class("no-shadow") - - def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None: - source.set_sensitive(False) - self.cancel_button.set_sensitive(False) - value.join() - - def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None: - JoinList.use().discard(value) - if JoinList.use().is_empty(): - self.join_boxed_list.add_css_class("no-shadow") diff --git a/pkgs/clan-app/clan_app/views/logs.py b/pkgs/clan-app/clan_app/views/logs.py deleted file mode 100644 index 4ce2dacb7..000000000 --- a/pkgs/clan-app/clan_app/views/logs.py +++ /dev/null @@ -1,65 +0,0 @@ -import logging - -import gi - -gi.require_version("Adw", "1") -from gi.repository import Adw, Gio, Gtk - -from clan_app.singletons.use_views import ViewStack - -log = logging.getLogger(__name__) - - -class Logs(Gtk.Box): - """ - Simple log view - This includes a banner and a text view and a button to close the log and navigate back to the overview - """ - - def __init__(self) -> None: - super().__init__(orientation=Gtk.Orientation.VERTICAL) - - app = Gio.Application.get_default() - assert app is not None - - self.banner = Adw.Banner.new("") - self.banner.set_use_markup(True) - self.banner.set_revealed(True) - self.banner.set_button_label("Close") - - self.banner.connect( - "button-clicked", - lambda _: ViewStack.use().view.set_visible_child_name("list"), - ) - - self.text_view = Gtk.TextView() - self.text_view.set_editable(False) - self.text_view.set_wrap_mode(Gtk.WrapMode.WORD) - self.text_view.add_css_class("log-view") - - self.append(self.banner) - self.append(self.text_view) - - def set_title(self, title: str) -> None: - self.banner.set_title(title) - - def set_message(self, message: str) -> None: - """ - Set the log message. This will delete any previous message - """ - buffer = self.text_view.get_buffer() - buffer.set_text(message) - - mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore - self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0) - - def append_message(self, message: str) -> None: - """ - Append to the end of a potentially existent log message - """ - buffer = self.text_view.get_buffer() - end_iter = buffer.get_end_iter() - buffer.insert(end_iter, message) # type: ignore - - mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore - self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0) diff --git a/pkgs/clan-app/clan_app/views/webview.py b/pkgs/clan-app/clan_app/views/webview.py index 9956352b4..cfa938bbb 100644 --- a/pkgs/clan-app/clan_app/views/webview.py +++ b/pkgs/clan-app/clan_app/views/webview.py @@ -1,5 +1,6 @@ import dataclasses import json +import gi import logging import threading from collections.abc import Callable @@ -9,12 +10,10 @@ from threading import Lock from types import UnionType from typing import Any, get_args -import gi from clan_cli.api import API -from clan_cli.api.directory import FileRequest +from clan_app.api.file import open_file gi.require_version("WebKit", "6.0") - from gi.repository import Gio, GLib, Gtk, WebKit log = logging.getLogger(__name__) @@ -48,100 +47,6 @@ def dataclass_to_dict(obj: Any) -> Any: return obj -# Implement the abstract open_file function -def open_file(file_request: FileRequest) -> str | None: - # Function to handle the response and stop the loop - selected_path = None - - def on_file_select( - file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop - ) -> None: - try: - gfile = file_dialog.open_finish(task) - if gfile: - nonlocal selected_path - selected_path = gfile.get_path() - except Exception as e: - print(f"Error getting selected file or directory: {e}") - finally: - main_loop.quit() - - def on_folder_select( - file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop - ) -> None: - try: - gfile = file_dialog.select_folder_finish(task) - if gfile: - nonlocal selected_path - selected_path = gfile.get_path() - except Exception as e: - print(f"Error getting selected directory: {e}") - finally: - main_loop.quit() - - def on_save_finish( - file_dialog: Gtk.FileDialog, task: Gio.Task, main_loop: GLib.MainLoop - ) -> None: - try: - gfile = file_dialog.save_finish(task) - if gfile: - nonlocal selected_path - selected_path = gfile.get_path() - except Exception as e: - print(f"Error getting selected file: {e}") - finally: - main_loop.quit() - - dialog = Gtk.FileDialog() - - if file_request.title: - dialog.set_title(file_request.title) - - if file_request.filters: - filters = Gio.ListStore.new(Gtk.FileFilter) - file_filters = Gtk.FileFilter() - - if file_request.filters.title: - file_filters.set_name(file_request.filters.title) - - # Create and configure a filter for image files - if file_request.filters.mime_types: - for mime in file_request.filters.mime_types: - file_filters.add_mime_type(mime) - filters.append(file_filters) - - if file_request.filters.patterns: - for pattern in file_request.filters.patterns: - file_filters.add_pattern(pattern) - - if file_request.filters.suffixes: - for suffix in file_request.filters.suffixes: - file_filters.add_suffix(suffix) - - filters.append(file_filters) - dialog.set_filters(filters) - - main_loop = GLib.MainLoop() - - # if select_folder - if file_request.mode == "select_folder": - dialog.select_folder( - callback=lambda dialog, task: on_folder_select(dialog, task, main_loop), - ) - elif file_request.mode == "open_file": - dialog.open( - callback=lambda dialog, task: on_file_select(dialog, task, main_loop) - ) - elif file_request.mode == "save": - dialog.save( - callback=lambda dialog, task: on_save_finish(dialog, task, main_loop) - ) - - # Wait for the user to select a file or directory - main_loop.run() # type: ignore - - return selected_path - def is_union_type(type_hint: type) -> bool: return type(type_hint) is UnionType @@ -294,7 +199,7 @@ class WebView: method_name: str, ) -> None: with self.mutex_lock: - log.debug("Executing... ", method_name) + log.debug(f"Executing... {method_name}") log.debug(f"{data}") if data is None: result = handler_fn() diff --git a/pkgs/clan-app/clan_app/windows/main_window.py b/pkgs/clan-app/clan_app/windows/main_window.py index 718780f52..24cad81ae 100644 --- a/pkgs/clan-app/clan_app/windows/main_window.py +++ b/pkgs/clan-app/clan_app/windows/main_window.py @@ -8,17 +8,14 @@ from clan_cli.history.list import list_history from clan_app.components.interfaces import ClanConfig from clan_app.singletons.toast import ToastOverlay from clan_app.singletons.use_views import ViewStack -from clan_app.singletons.use_vms import ClanStore -from clan_app.views.details import Details -from clan_app.views.list import ClanList -from clan_app.views.logs import Logs + + from clan_app.views.webview import WebView, open_file gi.require_version("Adw", "1") from gi.repository import Adw, Gio, GLib -from clan_app.components.trayicon import TrayIcon log = logging.getLogger(__name__) @@ -41,15 +38,8 @@ class MainWindow(Adw.ApplicationWindow): app = Gio.Application.get_default() assert app is not None - self.tray_icon: TrayIcon = TrayIcon(app) - - # Initialize all ClanStore - threading.Thread(target=self._populate_vms).start() stack_view = ViewStack.use().view - stack_view.add_named(ClanList(config), "list") - stack_view.add_named(Details(), "details") - stack_view.add_named(Logs(), "logs") # Override platform specific functions API.register(open_file) @@ -63,23 +53,6 @@ class MainWindow(Adw.ApplicationWindow): self.connect("destroy", self.on_destroy) - def _set_clan_store_ready(self) -> bool: - ClanStore.use().emit("is_ready") - return GLib.SOURCE_REMOVE - - def _populate_vms(self) -> None: - # Execute `clan flakes add ` to democlan for this to work - # TODO: Make list_history a generator function - for entry in list_history(): - GLib.idle_add(ClanStore.use().create_vm_task, entry) - - GLib.idle_add(self._set_clan_store_ready) - - def kill_vms(self) -> None: - log.debug("Killing all VMs") - ClanStore.use().kill_all() - def on_destroy(self, source: "Adw.ApplicationWindow") -> None: - log.info("====Destroying Adw.ApplicationWindow===") - ClanStore.use().kill_all() - self.tray_icon.destroy() + log.debug("====Destroying Adw.ApplicationWindow===") +