vm_manager: remove in favor of clan-app

This commit is contained in:
Johannes Kirschbauer
2025-08-13 20:23:05 +02:00
parent 62ef90e959
commit 5859eeac5a
57 changed files with 1 additions and 4439 deletions

View File

@@ -59,8 +59,6 @@
"pkgs/clan-cli/clan_cli/tests/data/password-store/.gpg-id"
"pkgs/clan-cli/clan_cli/tests/data/ssh_host_ed25519_key"
"pkgs/clan-cli/clan_cli/tests/data/sshd_config"
"pkgs/clan-vm-manager/.vscode/lhebendanz.weaudit"
"pkgs/clan-vm-manager/bin/clan-vm-manager"
"clanServices/hello-world/default.nix"
"sops/secrets/test-backup-age.key/secret"
];
@@ -115,21 +113,7 @@
];
extraPythonPaths = [ "../clan-cli" ];
};
}
// (
if pkgs.stdenv.isLinux then
{
"clan-vm-manager" = {
directory = "pkgs/clan-vm-manager";
extraPythonPackages = self'.packages.clan-vm-manager.externalTestDeps ++ [
(pkgs.python3.withPackages (ps: self'.packages.clan-cli.devshellPyDeps ps))
];
extraPythonPaths = [ "../clan-cli" ];
};
}
else
{ }
);
};
treefmt.programs.ruff.check = true;
treefmt.programs.ruff.format = true;
};

View File

@@ -1,7 +0,0 @@
# shellcheck shell=bash
source_up
watch_file flake-module.nix shell.nix default.nix
# Because we depend on nixpkgs sources, uploading to builders takes a long time
use flake .#clan-vm-manager --builders ''

View File

@@ -1 +0,0 @@
**/.vscode

View File

@@ -1,8 +0,0 @@
{
"clientRemote": "",
"gitRemote": "",
"gitSha": "",
"treeEntries": [],
"auditedFiles": [],
"resolvedEntries": []
}

View File

@@ -1,94 +0,0 @@
# Clan VM Manager
Provides users with the simple functionality to manage their locally registered clans.
![app-preview](screenshots/image.png)
## Available commands
Run this application
```bash
./bin/clan-vm-manager
```
Join the default machine of a clan
```bash
./bin/clan-vm-manager [clan-uri]
```
Join a specific machine of a clan
```bash
./bin/clan-vm-manager [clan-uri]#[machine]
```
For more available commands see the developer section below.
## Developing this Application
### Debugging Style and Layout
```bash
# Enable the GTK debugger
gsettings set org.gtk.Settings.Debug enable-inspector-keybinding true
# Start the application with the debugger attached
GTK_DEBUG=interactive ./bin/clan-vm-manager --debug
```
Appending `--debug` flag enables debug logging printed into the console.
### Profiling
To activate profiling you can run
```bash
CLAN_CLI_PERF=1 ./bin/clan-vm-manager
```
### Library Components
> Note:
>
> we recognized bugs when starting some cli-commands through the integrated vs-code terminal.
> If encountering issues make sure to run commands in a regular os-shell.
lib-Adw has a demo application showing all widgets. You can run it by executing
```bash
adwaita-1-demo
```
GTK4 has a demo application showing all widgets. You can run it by executing
```bash
gtk4-widget-factory
```
To find available icons execute
```bash
gtk4-icon-browser
```
### Links
Here are some important documentation links related to the Clan VM Manager:
- [Adw PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Adw-1): This link provides the PyGObject reference documentation for the Adw library, which is used in the Clan VM Manager. It contains detailed information about the Adw widgets and their usage.
- [GTK4 PyGobject Reference](http://lazka.github.io/pgi-docs/index.html#Gtk-4.0): This link provides the PyGObject reference documentation for GTK4, the toolkit used for building the user interface of the Clan VM Manager. It includes information about GTK4 widgets, signals, and other features.
- [Adw Widget Gallery](https://gnome.pages.gitlab.gnome.org/libadwaita/doc/main/widget-gallery.html): This link showcases a widget gallery for Adw, allowing you to see the available widgets and their visual appearance. It can be helpful for designing the user interface of the Clan VM Manager.
- [Python + GTK3 Tutorial](https://python-gtk-3-tutorial.readthedocs.io/en/latest/textview.html): Although the Clan VM Manager uses GTK4, this tutorial for GTK3 can still be useful as it covers the basics of building GTK-based applications with Python. It includes examples and explanations for various GTK widgets, including text views.
- [GNOME Human Interface Guidelines](https://developer.gnome.org/hig/): This link provides the GNOME Human Interface Guidelines, which offer design and usability recommendations for creating GNOME applications. It covers topics such as layout, navigation, and interaction patterns.
## Error handling
> Error dialogs should be avoided where possible, since they are disruptive.
>
> For simple non-critical errors, toasts can be a good alternative.

View File

@@ -1,13 +0,0 @@
#!/usr/bin/env python3
import sys
from pathlib import Path
module_path = Path(__file__).parent.parent.absolute()
sys.path.insert(0, str(module_path))
sys.path.insert(0, str(module_path.parent / "clan_cli"))
from clan_vm_manager import main # NOQA
if __name__ == "__main__":
main()

View File

@@ -1,46 +0,0 @@
{
"folders": [
{
"path": "."
},
{
"path": "../clan-cli/clan_cli"
},
{
"path": "../clan-cli/tests"
},
{
"path": "../../nixosModules"
},
{
"path": "../../lib/modules"
},
{
"path": "../../../democlan"
}
],
"settings": {
"python.linting.mypyEnabled": true,
"files.exclude": {
"**/__pycache__": true,
"**/.direnv": true,
"**/.hypothesis": true,
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/result/**": true,
"/nix/store/**": true
},
"search.exclude": {
"**/__pycache__": true,
"**/.direnv": true,
"**/.hypothesis": true,
"**/.mypy_cache": true,
"**/.reports": true,
"**/.ruff_cache": true,
"**/result/": true,
"/nix/store/**": true
},
"files.autoSave": "off"
}
}

View File

@@ -1,14 +0,0 @@
import logging
import sys
from clan_cli.profiler import profile
from clan_vm_manager.app import MainApplication
log = logging.getLogger(__name__)
@profile
def main(argv: list[str] = sys.argv) -> int:
app = MainApplication()
return app.run(argv)

View File

@@ -1,6 +0,0 @@
import sys
from . import main
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,126 +0,0 @@
#!/usr/bin/env python3
import logging
from typing import Any, ClassVar
import gi
from clan_vm_manager import assets
from clan_vm_manager.singletons.toast import InfoToast, ToastOverlay
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from clan_lib.custom_logger import setup_logging
from gi.repository import Adw, Gdk, Gio, Gtk
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.singletons.use_join import GLib, GObject
from .windows.main_window import MainWindow
log = logging.getLogger(__name__)
class MainApplication(Adw.Application):
"""
This class is initialized every time the app is started
Only the Adw.ApplicationWindow is a singleton.
So don't use any singletons in the Adw.Application class.
"""
__gsignals__: ClassVar = {
"join_request": (GObject.SignalFlags.RUN_FIRST, None, [str]),
}
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(
application_id="org.clan.vm-manager",
flags=Gio.ApplicationFlags.HANDLES_COMMAND_LINE,
)
self.add_main_option(
"debug",
ord("d"),
GLib.OptionFlags.NONE,
GLib.OptionArg.NONE,
"enable debug mode",
None,
)
self.window: MainWindow | None = None
self.connect("activate", self.on_activate)
self.connect("shutdown", self.on_shutdown)
def on_shutdown(self, source: "MainApplication") -> None:
log.debug("Shutting down Adw.Application")
if self.get_windows() == []:
log.warning("No windows to destroy")
if self.window:
# TODO: Doesn't seem to raise the destroy signal. Need to investigate
# self.get_windows() returns an empty list. Desync between window and application?
self.window.close()
# Killing vms directly. This is dirty
self.window.kill_vms()
else:
log.error("No window to destroy")
def do_command_line(self, command_line: Any) -> int:
options = command_line.get_options_dict()
# convert GVariantDict -> GVariant -> dict
options = options.end().unpack()
if "debug" in options and self.window is None:
setup_logging(logging.DEBUG)
elif self.window is None:
setup_logging(logging.INFO)
log.debug("Debug logging enabled")
if "debug" in options:
ToastOverlay.use().add_toast_unique(
InfoToast("Debug logging enabled").toast, "info.debugging.enabled"
)
args = command_line.get_arguments()
self.activate()
if len(args) > 1:
uri = args[1]
self.emit("join_request", uri)
return 0
def on_window_hide_unhide(self, *_args: Any) -> None:
if not self.window:
log.error("No window to hide/unhide")
return
if self.window.is_visible():
self.window.hide()
else:
self.window.present()
def dummy_menu_entry(self) -> None:
log.info("Dummy menu entry called")
def on_activate(self, source: "MainApplication") -> None:
if not self.window:
self.init_style()
self.window = MainWindow(config=ClanConfig(initial_view="list"))
self.window.set_application(self)
self.window.show()
# TODO: For css styling
def init_style(self) -> None:
resource_path = assets.loc / "style.css"
log.debug(f"Style css path: {resource_path}")
css_provider = Gtk.CssProvider()
css_provider.load_from_path(str(resource_path))
display = Gdk.Display.get_default()
assert display is not None
Gtk.StyleContext.add_provider_for_display(
display,
css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION,
)

View File

@@ -1,7 +0,0 @@
from pathlib import Path
loc: Path = Path(__file__).parent
def get_asset(name: str | Path) -> Path:
return loc / name

Binary file not shown.

Before

Width:  |  Height:  |  Size: 95 KiB

View File

@@ -1,63 +0,0 @@
/* Insert custom styles here */
navigation-view {
padding: 5px;
/* padding-left: 5px;
padding-right: 5px;
padding-bottom: 5px; */
}
avatar {
margin: 2px;
}
.trust {
padding-top: 25px;
padding-bottom: 25px;
}
.join-list {
margin-top: 1px;
margin-left: 2px;
margin-right: 2px;
}
.progress-bar {
margin-right: 25px;
min-width: 200px;
}
.group-list {
background-color: inherit;
}
.group-list > .activatable:hover {
background-color: unset;
}
.group-list > row {
margin-top: 12px;
border-bottom: unset;
}
.vm-list {
margin-top: 25px;
margin-bottom: 25px;
}
.no-shadow {
box-shadow: none;
}
.search-entry {
margin-bottom: 12px;
}
searchbar {
margin-bottom: 25px;
}
.log-view {
margin-top: 12px;
font-family: monospace;
padding: 8px;
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 375 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 717 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 717 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 KiB

View File

@@ -1,61 +0,0 @@
# Import the urllib.parse, enum and dataclasses modules
import urllib.parse
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from clan_lib.flake import Flake
# Define the ClanURI class
@dataclass
class ClanURI:
flake: Flake
machine_name: str
def get_url(self) -> str:
return str(self.flake)
@classmethod
def from_str(
cls,
url: str,
machine_name: str | None = None,
) -> "ClanURI":
uri = url
if machine_name:
uri += f"#{machine_name}"
# Users might copy whitespace along with the URI
uri = uri.strip()
# Check if the URI starts with clan://
# If it does, remove the clan:// prefix
prefix = "clan://"
if uri.startswith(prefix):
uri = uri[len(prefix) :]
# Fix missing colon (caused by browsers like Firefox)
if "//" in uri and ":" not in uri.split("//", 1)[0]:
# If there's a `//` but no colon before it, add one before the `//`
parts = uri.split("//", 1)
uri = f"{parts[0]}://{parts[1]}"
# Parse the URI into components
# url://netloc/path;parameters?query#fragment
components: urllib.parse.ParseResult = urllib.parse.urlparse(uri)
# Replace the query string in the components with the new query string
clean_comps = components._replace(query=components.query, fragment="")
# Parse the URL into a ClanUrl object
if clean_comps.path and Path(clean_comps.path).exists():
flake = Flake(clean_comps.path)
else:
flake = Flake(clean_comps.geturl())
machine_name = "defaultVM"
if components.fragment:
machine_name = components.fragment
return cls(flake, machine_name)

View File

@@ -1,143 +0,0 @@
import dataclasses
import inspect
import logging
import multiprocessing as mp
import os
import signal
import sys
import traceback
from collections.abc import Callable
from pathlib import Path
from typing import Any, get_type_hints
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with out_file.open("w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Validate kwargs against the signature of func
func_signature = inspect.signature(func)
bound_args = func_signature.bind_partial(**kwargs)
bound_args.apply_defaults() # Ensure defaults are applied to missing args
# Type-check kwargs against func's annotations
type_hints = get_type_hints(func)
for arg_name, arg_value in kwargs.items():
if arg_name in type_hints:
expected_type = type_hints[arg_name]
if not isinstance(arg_value, expected_type):
msg = f"Argument '{arg_name}' must be of type {expected_type}, but got type {type(arg_value)}"
raise TypeError(msg)
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@@ -1,230 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any, TypeVar
import gi
gi.require_version("Gio", "2.0")
from clan_lib.errors import ClanError
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
# GObject and Gio.ListModel are not compatible with mypy, so we need to ignore the errors
# clan_vm_manager/components/gkvstore.py:21: error: Definition of "newv" in base class "Object" is incompatible with definition in base class "GInterface" [misc]
# clan_vm_manager/components/gkvstore.py:21: error: Definition of "install_properties" in base class "Object" is incompatible with definition in base class "GInterface" [misc]
# clan_vm_manager/components/gkvstore.py:21: error: Definition of "getv" in base class "Object" is incompatible with definition in base class "GInterface" [misc]
class GKVStore[K, V: GObject.Object](GObject.GObject, Gio.ListModel): # type: ignore[misc]
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
# From Python 3.7 onwards dictionaries are ordered by default
self._items: dict[K, V] = {}
##################################
# #
# Gio.ListStore Interface #
# #
##################################
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
def find_with_equal_func(
self, item: V, equal_func: Callable[[V, V], bool]
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item):
return True, i
return False, -1
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item, user_data):
return True, i
return False, -1
def insert(self, position: int, item: V) -> None:
log.warning("Inserting is O(n) in GKVStore. Better use append")
log.warning(
"This functions may have incorrect items_changed signal behavior. Please test it"
)
key = self.key_gen(item)
if key in self._items:
msg = "Key already exists in the dictionary"
raise ClanError(msg)
if position < 0 or position > len(self._items):
msg = "Index out of range"
raise IndexError(msg)
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for _i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None:
msg = "insert_sorted is not implemented in GKVStore"
raise NotImplementedError(msg)
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
msg = "sort is not implemented in GKVStore"
raise NotImplementedError(msg)
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
msg = "splice is not implemented in GKVStore"
raise NotImplementedError(msg)
##################################
# #
# Gio.ListModel Interface #
# #
##################################
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> Any:
return self.gtype.__gtype__ # type: ignore[attr-defined]
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
##################################
# #
# Dict Interface #
# #
##################################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
if key in self._items:
log.debug("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
else:
# Add the new key-value pair
self._items[key] = value
position = max(len(self._items) - 1, 0)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V: # type: ignore[override]
return self._items[key]
def __contains__(self, key: K) -> bool: # type: ignore[override]
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()
##################################
# #
# Custom Methods #
# #
##################################
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]
def register_on_change(
self, callback: Callable[["GKVStore[K,V]", int, int, int], None]
) -> None:
self.connect("items-changed", callback)

View File

@@ -1,10 +0,0 @@
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "4.0")
@dataclass
class ClanConfig:
initial_view: str

View File

@@ -1,75 +0,0 @@
import logging
from collections.abc import Callable
from typing import TypeVar
import gi
from clan_vm_manager import assets
gi.require_version("Adw", "1")
from gi.repository import Adw, GdkPixbuf, Gio, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
class EmptySplash(Gtk.Box):
def __init__(self, on_join: Callable[[str], None]) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
self.on_join = on_join
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
clan_icon = self.load_image(str(assets.get_asset("clan_black_notext.png")))
if clan_icon:
image = Gtk.Image.new_from_pixbuf(clan_icon)
else:
image = Gtk.Image.new_from_icon_name("image-missing")
# same as the clamp
image.set_pixel_size(400)
image.set_opacity(0.5)
image.set_margin_top(20)
image.set_margin_bottom(10)
vbox.append(image)
empty_label = Gtk.Label(label="Welcome to Clan! Join your first clan.")
join_entry = Gtk.Entry()
join_entry.set_placeholder_text("clan://<url>")
join_entry.set_hexpand(True)
join_button = Gtk.Button(label="Join")
join_button.connect("clicked", self._on_join, join_entry)
join_entry.connect("activate", lambda e: self._on_join(join_button, e))
clamp = Adw.Clamp()
clamp.set_maximum_size(400)
clamp.set_margin_bottom(40)
vbox.append(empty_label)
hbox = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
hbox.append(join_entry)
hbox.append(join_button)
vbox.append(hbox)
clamp.set_child(vbox)
self.append(clamp)
def load_image(self, file_path: str) -> GdkPixbuf.Pixbuf | None:
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file(file_path)
except Exception:
log.exception("Failed to load image")
return None
else:
return pixbuf
def _on_join(self, button: Gtk.Button, entry: Gtk.Entry) -> None:
"""
Callback for the join button
Extracts the text from the entry and calls the on_join callback
"""
log.info(f"Splash screen: Joining {entry.get_text()}")
self.on_join(entry.get_text())

File diff suppressed because it is too large Load Diff

View File

@@ -1,379 +0,0 @@
import datetime
import logging
import multiprocessing as mp
import os
import tempfile
import threading
import time
import weakref
from collections.abc import Callable, Generator
from contextlib import contextmanager
from pathlib import Path
from typing import IO, ClassVar
import gi
from clan_cli import vms
from clan_cli.vms.inspect import inspect_vm
from clan_cli.vms.qemu import QMPWrapper
from clan_lib.dirs import vm_state_dir
from clan_lib.machines.machines import Machine
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.executor import MPProcess, spawn
from clan_vm_manager.history import HistoryEntry
from clan_vm_manager.singletons.toast import (
InfoToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
class VMObject(GObject.Object):
# Define a custom signal with the name "vm_stopped" and a string argument for the message
__gsignals__: ClassVar = {
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"vm_build_notify": (GObject.SignalFlags.RUN_FIRST, None, [bool, bool]),
}
def __init__(
self,
icon: Path,
data: HistoryEntry,
build_log_cb: Callable[[Gio.File], None],
) -> None:
super().__init__()
# Store the data from the history entry
self.data: HistoryEntry = data
self.build_log_cb = build_log_cb
# Create a process object to store the VM process
self.vm_process: MPProcess = MPProcess(
"vm_dummy", mp.Process(), Path("./dummy")
)
self.build_process: MPProcess = MPProcess(
"build_dummy", mp.Process(), Path("./dummy")
)
self._start_thread: threading.Thread = threading.Thread()
self.machine: Machine | None = None
self.qmp_wrap: QMPWrapper | None = None
# Watcher to stop the VM
self.KILL_TIMEOUT: int = 20 # seconds
self._stop_thread: threading.Thread = threading.Thread()
# Build progress bar vars
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
self.progress_bar.hide()
self.progress_bar.set_hexpand(True) # Horizontally expand
self.prog_bar_id: int = 0
# Create a temporary directory to store the logs
self.log_dir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory(
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
)
self._logs_id: int = 0
self._log_file: IO[str] | None = None
# To be able to set the switch state programmatically
# we need to store the handler id returned by the connect method
# and block the signal while we change the state. This is cursed.
self.switch: Gtk.Switch = Gtk.Switch()
self.switch_handler_id: int = self.switch.connect(
"notify::active", self._on_switch_toggle
)
self.connect("vm_status_changed", self._on_vm_status_changed)
# Make sure the VM is killed when the reference to this object is dropped
self._finalizer: weakref.finalize = weakref.finalize(self, self._kill_ref_drop)
def _vm_status_changed_task(self) -> bool:
self.emit("vm_status_changed")
return GLib.SOURCE_REMOVE
def update(self, data: HistoryEntry) -> None:
self.data = data
def _on_vm_status_changed(self, source: "VMObject") -> None:
# Signal may be emitted multiple times
self.emit("vm_build_notify", self.is_building(), self.is_running())
prev_state = self.switch.get_state()
next_state = self.is_running() and not self.is_building()
self.switch.set_state(next_state)
if prev_state is False and next_state is True:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"{source.data.flake.flake_attr} started").toast,
"success.vm.start",
)
if self.switch.get_sensitive() is False and not self.is_building():
self.switch.set_sensitive(True)
exit_vm = self.vm_process.proc.exitcode
exit_build = self.build_process.proc.exitcode
exitc = exit_vm or exit_build
if not self.is_running() and exitc != 0:
with self.switch.handler_block(self.switch_handler_id):
self.switch.set_active(False)
log.error(f"VM exited with error. Exitcode: {exitc}")
ToastOverlay.use().add_toast_unique(
WarningToast(f"VM exited with error. Exitcode: {exitc}").toast,
"warning.vm.exit",
)
def _on_switch_toggle(self, switch: Gtk.Switch, user_state: bool) -> None:
if switch.get_active():
switch.set_state(False)
switch.set_sensitive(False)
self.start()
else:
switch.set_state(True)
self.shutdown()
switch.set_sensitive(False)
# We use a context manager to create the machine object
# and make sure it is destroyed when the context is exited
@contextmanager
def _create_machine(self) -> Generator[Machine]:
uri = ClanURI.from_str(
url=str(self.data.flake.flake_url), machine_name=self.data.flake.flake_attr
)
self.machine = Machine(
name=self.data.flake.flake_attr,
flake=uri.flake,
)
assert self.machine is not None
state_dir = vm_state_dir(
flake_url=self.machine.flake.identifier, vm_name=self.machine.name
)
self.qmp_wrap = QMPWrapper(state_dir)
assert self.machine is not None
yield self.machine
self.machine = None
def _pulse_progress_bar_task(self) -> bool:
if self.progress_bar.is_visible():
self.progress_bar.pulse()
return GLib.SOURCE_CONTINUE
return GLib.SOURCE_REMOVE
def __start(self) -> None:
with self._create_machine() as machine:
# Start building VM
tstart = datetime.datetime.now(tz=datetime.UTC)
log.info(f"Building VM {self.get_id()}")
log_dir = Path(str(self.log_dir.name))
# Start the build process
self.build_process = spawn(
on_except=None,
out_file=log_dir / "build.log",
func=vms.run.build_vm,
machine=machine,
tmpdir=log_dir,
)
gfile = Gio.File.new_for_path(str(log_dir / "build.log"))
# Gio documentation:
# Obtains a file monitor for the given file.
# If no file notification mechanism exists, then regular polling of the file is used.
g_monitor = gfile.monitor_file(Gio.FileMonitorFlags.NONE, None)
g_monitor.connect("changed", self.on_logs_changed)
GLib.idle_add(self._vm_status_changed_task)
self.switch.set_sensitive(True)
# Start the logs watcher
self._logs_id = GLib.timeout_add(
50, self._get_logs_task, self.build_process
)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.build_process.out_file}")
# Start the progress bar and show it
self.progress_bar.show()
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar_task)
if self.prog_bar_id == 0:
log.error("Couldn't spawn a progress bar task")
# Wait for the build to finish then hide the progress bar
self.build_process.proc.join()
tend = datetime.datetime.now(tz=datetime.UTC)
log.info(f"VM {self.get_id()} build took {tend - tstart}s")
self.progress_bar.hide()
# Check if the VM was built successfully
if self.build_process.proc.exitcode != 0:
log.error(f"Failed to build VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
return
log.info(f"Successfully built VM {self.get_id()}")
vm_config = inspect_vm(machine)
# Start the VM
self.vm_process = spawn(
on_except=None,
out_file=Path(str(self.log_dir.name)) / "vm.log",
func=vms.run.run_vm,
vm_config=vm_config,
runtime_config=vms.run.RuntimeConfig(),
)
log.debug(f"Started VM {self.get_id()}")
GLib.idle_add(self._vm_status_changed_task)
# Start the logs watcher
self._logs_id = GLib.timeout_add(50, self._get_logs_task, self.vm_process)
if self._logs_id == 0:
log.error("Failed to start VM log watcher")
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
# Wait for the VM to stop
self.vm_process.proc.join()
log.debug(f"VM {self.get_id()} has stopped")
GLib.idle_add(self._vm_status_changed_task)
def on_logs_changed(
self,
monitor: Gio.FileMonitor,
file: Gio.File,
other_file: Gio.File,
event_type: Gio.FileMonitorEvent,
) -> None:
if event_type == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
# File was changed and the changes were written to disk
# wire up the callback for setting the logs
self.build_log_cb(file)
def start(self) -> None:
if self.is_running():
log.warning("VM is already running. Ignoring start request")
self.emit("vm_status_changed", self)
return
log.debug(f"VM state dir {self.log_dir.name}")
self._start_thread = threading.Thread(target=self.__start)
self._start_thread.start()
def _get_logs_task(self, proc: MPProcess) -> bool:
if not proc.out_file.exists():
return GLib.SOURCE_CONTINUE
if not self._log_file:
try:
self._log_file = Path(proc.out_file).open() # noqa: SIM115
except Exception:
log.exception(f"Failed to open log file {proc.out_file}")
self._log_file = None
return GLib.SOURCE_REMOVE
line = os.read(self._log_file.fileno(), 4096)
if len(line) != 0:
print(line.decode("utf-8"), end="", flush=True)
if not proc.proc.is_alive():
log.debug("Removing logs watcher")
self._log_file = None
return GLib.SOURCE_REMOVE
return GLib.SOURCE_CONTINUE
def is_running(self) -> bool:
return self._start_thread.is_alive()
def is_building(self) -> bool:
return self.build_process.proc.is_alive()
def is_shutting_down(self) -> bool:
return self._stop_thread.is_alive()
def get_id(self) -> str:
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
def __stop(self) -> None:
log.info(f"Stopping VM {self.get_id()}")
start_time = datetime.datetime.now(tz=datetime.UTC)
while self.is_running():
diff = datetime.datetime.now(tz=datetime.UTC) - start_time
if diff.seconds > self.KILL_TIMEOUT:
log.error(
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
)
self.vm_process.kill_group()
break
if self.is_building():
log.info(f"VM {self.get_id()} is still building. Killing it")
self.build_process.kill_group()
break
if not self.machine:
log.error(f"Machine object is None. Killing VM {self.get_id()}")
self.vm_process.kill_group()
break
# Try to shutdown the VM gracefully using QMP
try:
assert self.qmp_wrap is not None
with self.qmp_wrap.qmp_ctx() as qmp:
qmp.command("system_powerdown")
except Exception as ex:
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
# Try 20 times to stop the VM
time.sleep(self.KILL_TIMEOUT / 20)
GLib.idle_add(self._vm_status_changed_task)
log.debug(f"VM {self.get_id()} has stopped")
ToastOverlay.use().add_toast_unique(
InfoToast(f"Stopped {self.get_id()}").toast, "info.vm.exit"
)
def shutdown(self) -> None:
if not self.is_running():
log.warning("VM not running. Ignoring shutdown request.")
self.emit("vm_status_changed", self)
return
if self.is_shutting_down():
log.warning("Shutdown already in progress")
self.emit("vm_status_changed", self)
return
self._stop_thread = threading.Thread(target=self.__stop)
self._stop_thread.start()
def _kill_ref_drop(self) -> None:
if self.is_running():
log.warning("Killing VM due to reference drop")
self.kill()
def kill(self) -> None:
if not self.is_running():
log.warning(f"Tried to kill VM {self.get_id()} is not running")
return
log.info(f"Killing VM {self.get_id()} now")
if self.vm_process.proc.is_alive():
self.vm_process.kill_group()
if self.build_process.proc.is_alive():
self.build_process.kill_group()
def read_whole_log(self) -> str:
if not self.vm_process.out_file.exists():
log.error(f"Log file {self.vm_process.out_file} does not exist")
return ""
return self.vm_process.out_file.read_text()
def __str__(self) -> str:
return f"VM({self.get_id()})"
def __repr__(self) -> str:
return self.__str__()

View File

@@ -1,159 +0,0 @@
import argparse
import dataclasses
import datetime
import json
import logging
from typing import Any
from clan_cli.clan.inspect import FlakeConfig, inspect_flake
from clan_lib.dirs import user_history_file
from clan_lib.errors import ClanError
from clan_lib.flake import Flake
from clan_lib.locked_open import read_history_file, write_history_file
from clan_lib.machines.list import list_machines
from clan_vm_manager.clan_uri import ClanURI
log = logging.getLogger(__name__)
@dataclasses.dataclass
class HistoryEntry:
last_used: str
flake: FlakeConfig
settings: dict[str, Any] = dataclasses.field(default_factory=dict)
@classmethod
def from_json(cls: type["HistoryEntry"], data: dict[str, Any]) -> "HistoryEntry":
return cls(
last_used=data["last_used"],
flake=FlakeConfig.from_json(data["flake"]),
settings=data.get("settings", {}),
)
def _merge_dicts(d1: dict, d2: dict) -> dict:
# create a new dictionary that copies d1
merged = dict(d1)
# iterate over the keys and values of d2
for key, value in d2.items():
# if the key is in d1 and both values are dictionaries, merge them recursively
if key in d1 and isinstance(d1[key], dict) and isinstance(value, dict):
merged[key] = _merge_dicts(d1[key], value)
# otherwise, update the value of the key in the merged dictionary
else:
merged[key] = value
# return the merged dictionary
return merged
def list_history() -> list[HistoryEntry]:
logs: list[HistoryEntry] = []
if not user_history_file().exists():
return []
try:
parsed = read_history_file()
for i, p in enumerate(parsed.copy()):
# Everything from the settings dict is merged into the flake dict, and can override existing values
parsed[i] = _merge_dicts(p, p.get("settings", {}))
logs = [HistoryEntry.from_json(p) for p in parsed]
except (json.JSONDecodeError, TypeError) as ex:
msg = f"History file at {user_history_file()} is corrupted"
raise ClanError(msg) from ex
return logs
def new_history_entry(url: str, machine: str) -> HistoryEntry:
flake = inspect_flake(url, machine)
return HistoryEntry(
flake=flake,
last_used=datetime.datetime.now(tz=datetime.UTC).isoformat(),
)
def add_all_to_history(uri: ClanURI) -> list[HistoryEntry]:
history = list_history()
new_entries: list[HistoryEntry] = []
for machine in list_machines(Flake(uri.get_url())):
new_entry = _add_maschine_to_history_list(uri.get_url(), machine, history)
new_entries.append(new_entry)
write_history_file(history)
return new_entries
def add_history(uri: ClanURI) -> HistoryEntry:
user_history_file().parent.mkdir(parents=True, exist_ok=True)
history = list_history()
new_entry = _add_maschine_to_history_list(uri.get_url(), uri.machine_name, history)
write_history_file(history)
return new_entry
def _add_maschine_to_history_list(
uri_path: str, uri_machine: str, entries: list[HistoryEntry]
) -> HistoryEntry:
for new_entry in entries:
if (
new_entry.flake.flake_url == str(uri_path)
and new_entry.flake.flake_attr == uri_machine
):
new_entry.last_used = datetime.datetime.now(tz=datetime.UTC).isoformat()
return new_entry
new_entry = new_history_entry(uri_path, uri_machine)
entries.append(new_entry)
return new_entry
def add_history_command(args: argparse.Namespace) -> None:
if args.all:
add_all_to_history(args.uri)
else:
add_history(args.uri)
def list_history_command(args: argparse.Namespace) -> None:
res: dict[str, list[HistoryEntry]] = {}
for history_entry in list_history():
url = str(history_entry.flake.flake_url)
if res.get(url) is None:
res[url] = []
res[url].append(history_entry)
for flake_url, entries in res.items():
print(flake_url)
for entry in entries:
d = datetime.datetime.fromisoformat(entry.last_used)
last_used = d.strftime("%d/%m/%Y %H:%M:%S")
print(f" {entry.flake.flake_attr} ({last_used})")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="clan history",
description="Manage clan history",
)
subparser = parser.add_subparsers(
title="command",
description="the command to run",
help="the command to run",
required=True,
)
add_parser = subparser.add_parser("add", help="Add a clan flake")
add_parser.add_argument(
"uri", type=ClanURI.from_str, help="Path to the flake", default="."
)
add_parser.add_argument(
"--all", help="Add all machines", default=False, action="store_true"
)
add_parser.set_defaults(func=add_history_command)
list_parser = subparser.add_parser("list", help="List recently used flakes")
list_parser.set_defaults(func=list_history_command)
return parser.parse_args()
def main() -> None:
args = parse_args()
args.func(args)

View File

@@ -1,151 +0,0 @@
import logging
from collections.abc import Callable
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.views.logs import Logs
log = logging.getLogger(__name__)
class ToastOverlay:
"""
The ToastOverlay is a class that manages the display of toasts
It should be used as a singleton in your application to prevent duplicate toasts
Usage
"""
# For some reason, the adw toast overlay cannot be subclassed
# Thats why it is added as a class property
overlay: Adw.ToastOverlay
active_toasts: set[str]
_instance: "None | ToastOverlay" = None
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ToastOverlay":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.overlay = Adw.ToastOverlay()
cls.active_toasts = set()
return cls._instance
def add_toast_unique(self, toast: Adw.Toast, key: str) -> None:
if key not in self.active_toasts:
self.active_toasts.add(key)
self.overlay.add_toast(toast)
toast.connect("dismissed", lambda toast: self.active_toasts.remove(key))
class ErrorToast:
toast: Adw.Toast
def __init__(
self, message: str, persistent: bool = False, details: str = ""
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""<span foreground='red'>❌ Error </span> {message}"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.HIGH)
self.toast.set_button_label("Show more")
if persistent:
self.toast.set_timeout(0)
views = ViewStack.use().view
# we cannot check this type, python is not smart enough
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view.set_message(details)
self.toast.connect(
"button-clicked",
lambda _: views.set_visible_child_name("logs"),
)
class WarningToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"<span foreground='orange'>⚠ Warning </span> {message}"
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class InfoToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span>❕</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class SuccessToast:
toast: Adw.Toast
def __init__(self, message: str, persistent: bool = False) -> None:
super().__init__()
self.toast = Adw.Toast.new(f"<span foreground='green'>✅</span> {message}")
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
class LogToast:
toast: Adw.Toast
def __init__(
self,
message: str,
on_button_click: Callable[[], None],
button_label: str = "More",
persistent: bool = False,
) -> None:
super().__init__()
self.toast = Adw.Toast.new(
f"""Logs are available <span weight="regular">{message}</span>"""
)
self.toast.set_use_markup(True)
self.toast.set_priority(Adw.ToastPriority.NORMAL)
if persistent:
self.toast.set_timeout(0)
self.toast.set_button_label(button_label)
self.toast.connect(
"button-clicked",
lambda _: on_button_click(),
)

View File

@@ -1,115 +0,0 @@
import logging
import threading
from collections.abc import Callable
from typing import Any, ClassVar, cast
import gi
from clan_lib.machines.machines import Machine
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.history import HistoryEntry, add_history
from clan_vm_manager.singletons.use_vms import ClanStore
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class JoinValue(GObject.Object):
__gsignals__: ClassVar = {
"join_finished": (GObject.SignalFlags.RUN_FIRST, None, []),
}
url: ClanURI
entry: HistoryEntry | None
def _join_finished_task(self) -> bool:
self.emit("join_finished")
return GLib.SOURCE_REMOVE
def __init__(self, url: ClanURI) -> None:
super().__init__()
self.url: ClanURI = url
self.entry: HistoryEntry | None = None
def __join(self) -> None:
new_entry = add_history(self.url)
self.entry = new_entry
GLib.idle_add(self._join_finished_task)
def join(self) -> None:
threading.Thread(target=self.__join).start()
class JoinList:
"""
This is a singleton.
It is initialized with the first call of use()
"""
_instance: "None | JoinList" = None
list_store: Gio.ListStore
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "JoinList":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.list_store = Gio.ListStore.new(JoinValue)
ClanStore.use().register_on_deep_change(cls._instance.rerender_join_list)
return cls._instance
def rerender_join_list(
self, source: GKVStore, position: int, removed: int, added: int
) -> None:
self.list_store.items_changed(
0, self.list_store.get_n_items(), self.list_store.get_n_items()
)
def is_empty(self) -> bool:
return self.list_store.get_n_items() == 0
def push(self, uri: ClanURI, after_join: Callable[[JoinValue], None]) -> None:
"""
Add a join request.
This method can add multiple join requests if called subsequently for each request.
"""
value = JoinValue(uri)
machine_id = Machine(uri.machine_name, uri.flake)
machine_id_list = []
for machine_obj in self.list_store:
mvalue: ClanURI = cast(JoinValue, machine_obj).url
machine = Machine(mvalue.machine_name, mvalue.flake)
machine_id_list.append(machine.get_id())
if machine_id in machine_id_list:
log.info(f"Join request already exists: {value.url}. Ignoring.")
return
value.connect("join_finished", self._on_join_finished)
value.connect("join_finished", after_join)
self.list_store.append(value)
def _on_join_finished(self, source: JoinValue) -> None:
log.info(f"Join finished: {source.url}")
self.discard(source)
assert source.entry is not None
ClanStore.use().push_history_entry(source.entry)
def discard(self, value: JoinValue) -> None:
(has, idx) = self.list_store.find(value)
if has:
self.list_store.remove(idx)

View File

@@ -1,37 +0,0 @@
from typing import Any
import gi
gi.require_version("Gtk", "4.0")
gi.require_version("Adw", "1")
from gi.repository import Adw
class ViewStack:
"""
This is a singleton.
It is initialized with the first call of use()
Usage:
ViewStack.use().set_visible()
ViewStack.use() can also be called before the data is needed. e.g. to eliminate/reduce waiting time.
"""
_instance: "None | ViewStack" = None
view: Adw.ViewStack
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ViewStack":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls.view = Adw.ViewStack()
return cls._instance

View File

@@ -1,187 +0,0 @@
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, ClassVar
import gi
from clan_lib.flake import Flake
from clan_lib.machines.machines import Machine
from clan_vm_manager import assets
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.components.vmobj import VMObject
from clan_vm_manager.history import HistoryEntry
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.views.logs import Logs
gi.require_version("GObject", "2.0")
gi.require_version("Gtk", "4.0")
from gi.repository import Gio, GLib, GObject
log = logging.getLogger(__name__)
class VMStore(GKVStore):
def __init__(self) -> None:
super().__init__(VMObject, lambda vm: vm.data.flake.flake_attr)
class Emitter(GObject.GObject):
__gsignals__: ClassVar = {
"is_ready": (GObject.SignalFlags.RUN_FIRST, None, []),
}
class ClanStore:
_instance: "None | ClanStore" = None
_clan_store: GKVStore[Flake, VMStore]
_emitter: Emitter
# set the vm that is outputting logs
# build logs are automatically streamed to the logs-view
_logging_vm: VMObject | None = None
# Make sure the VMS class is used as a singleton
def __init__(self) -> None:
msg = "Call use() instead"
raise RuntimeError(msg)
@classmethod
def use(cls: Any) -> "ClanStore":
if cls._instance is None:
cls._instance = cls.__new__(cls)
cls._clan_store = GKVStore(
VMStore, lambda store: store.first().data.flake.flake_url
)
cls._emitter = Emitter()
return cls._instance
def emit(self, signal: str) -> None:
self._emitter.emit(signal)
def connect(self, signal: str, cb: Callable[(...), Any]) -> None:
self._emitter.connect(signal, cb)
def set_logging_vm(self, ident: str) -> VMObject | None:
vm = self.get_vm(ClanURI.from_str(f"clan://{ident}"))
if vm is not None:
self._logging_vm = vm
return self._logging_vm
def register_on_deep_change(
self, callback: Callable[[GKVStore, int, int, int], None]
) -> None:
"""
Register a callback that is called when a clan_store or one of the included VMStores changes
"""
def on_vmstore_change(
store: VMStore, position: int, removed: int, added: int
) -> None:
callback(store, position, removed, added)
def on_clanstore_change(
store: "GKVStore", position: int, removed: int, added: int
) -> None:
if added > 0:
store.values()[position].register_on_change(on_vmstore_change)
callback(store, position, removed, added)
self.clan_store.register_on_change(on_clanstore_change)
@property
def clan_store(self) -> GKVStore[Flake, VMStore]:
return self._clan_store
def create_vm_task(self, vm: HistoryEntry) -> bool:
self.push_history_entry(vm)
return GLib.SOURCE_REMOVE
def push_history_entry(self, entry: HistoryEntry) -> None:
# TODO: We shouldn't do this here but in the list view
if entry.flake.icon is None:
icon: Path = assets.loc / "placeholder.jpeg"
else:
icon = Path(entry.flake.icon)
def log_details(gfile: Gio.File) -> None:
self.log_details(vm, gfile)
vm = VMObject(icon=icon, data=entry, build_log_cb=log_details)
self.push(vm)
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
views = ViewStack.use().view
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
def file_read_callback(
source_object: Gio.File, result: Gio.AsyncResult, _user_data: Any
) -> None:
try:
# Finish the asynchronous read operation
res = source_object.load_contents_finish(result)
_success, contents, _etag_out = res
# Convert the byte array to a string and print it
logs_view.set_message(contents.decode("utf-8"))
except Exception as e:
print(f"Error reading file: {e}")
# only one vm can output logs at a time
if vm == self._logging_vm:
gfile.load_contents_async(None, file_read_callback, None)
# we cannot check this type, python is not smart enough
def push(self, vm: VMObject) -> None:
url = vm.data.flake.flake_url
# Only write to the store if the Clan is not already in it
# Every write to the KVStore rerenders bound widgets to the clan_store
if url not in self.clan_store:
log.debug(f"Creating new VMStore for {url}")
vm_store = VMStore()
vm_store.append(vm)
self.clan_store[url] = vm_store
else:
vm_store = self.clan_store[url]
machine = vm.data.flake.flake_attr
old_vm = vm_store.get(machine)
if old_vm:
log.info(
f"VM {vm.data.flake.flake_attr} already exists in store. Updating data field."
)
old_vm.update(vm.data)
else:
log.debug(f"Appending VM {vm.data.flake.flake_attr} to store")
vm_store.append(vm)
def remove(self, vm: VMObject) -> None:
del self.clan_store[vm.data.flake.flake_url][vm.data.flake.flake_attr]
def get_vm(self, uri: ClanURI) -> None | VMObject:
machine = Machine(uri.machine_name, uri.flake)
vm_store = self.clan_store.get(machine.flake)
if vm_store is None:
return None
vm = vm_store.get(str(machine.name), None)
return vm
def get_running_vms(self) -> list[VMObject]:
return [
vm
for clan in self.clan_store.values()
for vm in clan.values()
if vm.is_running()
]
def kill_all(self) -> None:
for vm in self.get_running_vms():
vm.kill()

View File

@@ -1,61 +0,0 @@
import os
from collections.abc import Callable
from functools import partial
from typing import Any, Literal, TypeVar
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GObject, Gtk
# Define a TypeVar that is bound to GObject.Object
ListItem = TypeVar("ListItem", bound=GObject.Object)
def create_details_list[ListItem: GObject.Object](
model: Gio.ListStore, render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget]
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class PreferencesValue(GObject.Object):
variant: Literal["CPU", "MEMORY"]
editable: bool
data: Any
def __init__(
self, variant: Literal["CPU", "MEMORY"], editable: bool, data: Any
) -> None:
super().__init__()
self.variant = variant
self.editable = editable
self.data = data
class Details(Gtk.Box):
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
preferences_store = Gio.ListStore.new(PreferencesValue)
preferences_store.append(PreferencesValue("CPU", True, 1))
self.details_list = create_details_list(
model=preferences_store, render_row=self.render_entry_row
)
self.append(self.details_list)
def render_entry_row(
self, boxed_list: Gtk.ListBox, item: PreferencesValue
) -> Gtk.Widget:
cores: int | None = os.cpu_count()
fcores = float(cores) if cores else 1.0
row = Adw.SpinRow.new_with_range(0, fcores, 1)
row.set_value(item.data)
return row

View File

@@ -1,358 +0,0 @@
import base64
import logging
from collections.abc import Callable
from functools import partial
from typing import Any, TypeVar
import gi
from clan_lib.errors import ClanError
from clan_vm_manager.clan_uri import ClanURI
from clan_vm_manager.components.gkvstore import GKVStore
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.components.list_splash import EmptySplash
from clan_vm_manager.components.vmobj import VMObject
from clan_vm_manager.singletons.toast import (
LogToast,
SuccessToast,
ToastOverlay,
WarningToast,
)
from clan_vm_manager.singletons.use_join import JoinList, JoinValue
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.singletons.use_vms import ClanStore, VMStore
from clan_vm_manager.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gdk, Gio, GLib, GObject, Gtk
log = logging.getLogger(__name__)
ListItem = TypeVar("ListItem", bound=GObject.Object)
CustomStore = TypeVar("CustomStore", bound=Gio.ListModel)
def create_boxed_list[CustomStore: Gio.ListModel, ListItem: GObject.Object](
model: CustomStore,
render_row: Callable[[Gtk.ListBox, ListItem], Gtk.Widget],
) -> Gtk.ListBox:
boxed_list = Gtk.ListBox()
boxed_list.set_selection_mode(Gtk.SelectionMode.NONE)
boxed_list.add_css_class("boxed-list")
boxed_list.add_css_class("no-shadow")
boxed_list.bind_model(model, create_widget_func=partial(render_row, boxed_list))
return boxed_list
class ClanList(Gtk.Box):
"""
The ClanList
Is the composition of
the ClanListToolbar
the clanListView
# ------------------------ #
# - Tools <Start> <Stop> < Edit> #
# ------------------------ #
# - List Items
# - <...>
# ------------------------#
"""
def __init__(self, config: ClanConfig) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
app.connect("join_request", self.on_join_request)
self.log_label: Gtk.Label = Gtk.Label()
# Add join list
self.join_boxed_list = create_boxed_list(
model=JoinList.use().list_store, render_row=self.render_join_row
)
self.join_boxed_list.add_css_class("join-list")
self.append(self.join_boxed_list)
clan_store = ClanStore.use()
clan_store.connect("is_ready", self.display_splash)
self.group_list = create_boxed_list(
model=clan_store.clan_store, render_row=self.render_group_row
)
self.group_list.add_css_class("group-list")
self.append(self.group_list)
self.splash = EmptySplash(on_join=lambda x: self.on_join_request(x, x))
def display_splash(self, source: GKVStore) -> None:
print("Displaying splash")
if (
ClanStore.use().clan_store.get_n_items() == 0
and JoinList.use().list_store.get_n_items() == 0
):
self.append(self.splash)
def render_group_row(
self, boxed_list: Gtk.ListBox, vm_store: VMStore
) -> Gtk.Widget:
self.remove(self.splash)
vm = vm_store.first()
log.debug("Rendering group row for %s", vm.data.flake.flake_url)
grp = Adw.PreferencesGroup()
grp.set_title(vm.data.flake.clan_name)
grp.set_description(str(vm.data.flake.flake_url))
add_action = Gio.SimpleAction.new("add", GLib.VariantType.new("s"))
add_action.connect("activate", self.on_add)
app = Gio.Application.get_default()
assert app is not None
app.add_action(add_action)
# menu_model = Gio.Menu()
# TODO: Make this lazy, blocks UI startup for too long
# for vm in machines.list.list_nixos_machines(flake_url=vm.data.flake.flake_url):
# if vm not in vm_store:
# menu_model.append(vm, f"app.add::{vm}")
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
add_button = Gtk.Button()
add_button_content = Adw.ButtonContent.new()
add_button_content.set_label("Add machine")
add_button_content.set_icon_name("list-add-symbolic")
add_button.add_css_class("flat")
add_button.set_child(add_button_content)
# add_button.set_has_frame(False)
# add_button.set_menu_model(menu_model)
# add_button.set_label("Add machine")
box.append(add_button)
grp.set_header_suffix(box)
vm_list = create_boxed_list(model=vm_store, render_row=self.render_vm_row)
grp.add(vm_list)
return grp
def on_add(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Adding new machine", target)
def render_vm_row(self, boxed_list: Gtk.ListBox, vm: VMObject) -> Gtk.Widget:
# Remove no-shadow class if attached
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
flake = vm.data.flake
row = Adw.ActionRow()
# ====== Display Avatar ======
avatar = Adw.Avatar()
machine_icon = flake.vm.machine_icon
# If there is a machine icon, display it else
# display the clan icon
if machine_icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(machine_icon)))
elif flake.icon:
avatar.set_custom_image(Gdk.Texture.new_from_filename(str(flake.icon)))
else:
avatar.set_text(flake.clan_name + " " + flake.flake_attr)
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
# ====== Display Name And Url =====
row.set_title(flake.flake_attr)
row.set_title_lines(1)
row.set_title_selectable(True)
# If there is a machine description, display it else
# display the clan name
if flake.vm.machine_description:
row.set_subtitle(flake.vm.machine_description)
else:
row.set_subtitle(flake.clan_name)
row.set_subtitle_lines(1)
# ==== Display build progress bar ====
build_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
build_box.set_valign(Gtk.Align.CENTER)
build_box.append(vm.progress_bar)
build_box.set_homogeneous(False)
row.add_suffix(build_box) # This allows children to have different sizes
# ==== Action buttons ====
button_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
button_box.set_valign(Gtk.Align.CENTER)
## Drop down menu
open_action = Gio.SimpleAction.new("edit", GLib.VariantType.new("s"))
open_action.connect("activate", self.on_edit)
action_id = base64.b64encode(vm.get_id().encode("utf-8")).decode("utf-8")
build_logs_action = Gio.SimpleAction.new(
f"logs.{action_id}", GLib.VariantType.new("s")
)
build_logs_action.connect("activate", self.on_show_build_logs)
build_logs_action.set_enabled(False)
app = Gio.Application.get_default()
assert app is not None
app.add_action(open_action)
app.add_action(build_logs_action)
# set a callback function for conditionally enabling the build_logs action
def on_vm_build_notify(
vm: VMObject, is_building: bool, is_running: bool
) -> None:
build_logs_action.set_enabled(is_building or is_running)
app.add_action(build_logs_action)
if is_building:
ToastOverlay.use().add_toast_unique(
LogToast(
"""Build process running ...""",
on_button_click=lambda: self.show_vm_build_logs(vm.get_id()),
).toast,
f"info.build.running.{vm}",
)
vm.connect("vm_build_notify", on_vm_build_notify)
menu_model = Gio.Menu()
menu_model.append("Edit", f"app.edit::{vm.get_id()}")
menu_model.append("Show Logs", f"app.logs.{action_id}::{vm.get_id()}")
pref_button = Gtk.MenuButton()
pref_button.set_icon_name("open-menu-symbolic")
pref_button.set_menu_model(menu_model)
button_box.append(pref_button)
## VM switch button
switch_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
switch_box.set_valign(Gtk.Align.CENTER)
switch_box.append(vm.switch)
button_box.append(switch_box)
row.add_suffix(button_box)
return row
def on_edit(self, source: Any, parameter: Any) -> None:
target = parameter.get_string()
print("Editing settings for machine", target)
def on_show_build_logs(self, _: Any, parameter: Any) -> None:
target = parameter.get_string()
self.show_vm_build_logs(target)
def show_vm_build_logs(self, target: str) -> None:
vm = ClanStore.use().set_logging_vm(target)
if vm is None:
msg = f"VM {target} not found"
raise ClanError(msg)
views = ViewStack.use().view
# Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore
if logs is None:
msg = "Logs view not found"
raise ClanError(msg)
name = vm.machine.name if vm.machine else "Unknown"
logs.set_title(f"""📄<span weight="normal"> {name}</span>""")
# initial message. Streaming happens automatically when the file is changed by the build process
logs.set_message(vm.build_process.out_file.read_text())
views.set_visible_child_name("logs")
def render_join_row(
self, boxed_list: Gtk.ListBox, join_val: JoinValue
) -> Gtk.Widget:
if boxed_list.has_css_class("no-shadow"):
boxed_list.remove_css_class("no-shadow")
log.debug("Rendering join row for %s", join_val.url)
row = Adw.ActionRow()
row.set_title(join_val.url.machine_name)
row.set_subtitle(str(join_val.url))
row.add_css_class("trust")
vm = ClanStore.use().get_vm(join_val.url)
# Can't do this here because clan store is empty at this point
if vm is not None:
sub = row.get_subtitle()
assert sub is not None
ToastOverlay.use().add_toast_unique(
WarningToast(
f"""<span weight="regular">{join_val.url.machine_name!s}</span> Already exists. Joining again will update it"""
).toast,
"warning.duplicate.join",
)
row.set_subtitle(
sub + "\nClan already exists. Joining again will update it"
)
avatar = Adw.Avatar()
avatar.set_text(str(join_val.url.machine_name))
avatar.set_show_initials(True)
avatar.set_size(50)
row.add_prefix(avatar)
cancel_button = Gtk.Button(label="Cancel")
cancel_button.add_css_class("error")
cancel_button.connect("clicked", partial(self.on_discard_clicked, join_val))
self.cancel_button = cancel_button
trust_button = Gtk.Button(label="Join")
trust_button.add_css_class("success")
trust_button.connect("clicked", partial(self.on_trust_clicked, join_val))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=5)
box.set_valign(Gtk.Align.CENTER)
box.append(cancel_button)
box.append(trust_button)
row.add_suffix(box)
return row
def on_join_request(self, source: Any, url: str) -> None:
log.debug("Join request: %s", url)
clan_uri = ClanURI.from_str(url)
JoinList.use().push(clan_uri, self.on_after_join)
def on_after_join(self, source: JoinValue) -> None:
ToastOverlay.use().add_toast_unique(
SuccessToast(f"Updated {source.url.machine_name}").toast,
"success.join",
)
# If the join request list is empty disable the shadow artefact
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")
def on_trust_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
source.set_sensitive(False)
self.cancel_button.set_sensitive(False)
value.join()
def on_discard_clicked(self, value: JoinValue, source: Gtk.Widget) -> None:
JoinList.use().discard(value)
if JoinList.use().is_empty():
self.join_boxed_list.add_css_class("no-shadow")

View File

@@ -1,65 +0,0 @@
import logging
import gi
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, Gtk
from clan_vm_manager.singletons.use_views import ViewStack
log = logging.getLogger(__name__)
class Logs(Gtk.Box):
"""
Simple log view
This includes a banner and a text view and a button to close the log and navigate back to the overview
"""
def __init__(self) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL)
app = Gio.Application.get_default()
assert app is not None
self.banner = Adw.Banner.new("")
self.banner.set_use_markup(True)
self.banner.set_revealed(True)
self.banner.set_button_label("Close")
self.banner.connect(
"button-clicked",
lambda _: ViewStack.use().view.set_visible_child_name("list"),
)
self.text_view = Gtk.TextView()
self.text_view.set_editable(False)
self.text_view.set_wrap_mode(Gtk.WrapMode.WORD)
self.text_view.add_css_class("log-view")
self.append(self.banner)
self.append(self.text_view)
def set_title(self, title: str) -> None:
self.banner.set_title(title)
def set_message(self, message: str) -> None:
"""
Set the log message. This will delete any previous message
"""
buffer = self.text_view.get_buffer()
buffer.set_text(message)
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
def append_message(self, message: str) -> None:
"""
Append to the end of a potentially existent log message
"""
buffer = self.text_view.get_buffer()
end_iter = buffer.get_end_iter()
buffer.insert(end_iter, message) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)

View File

@@ -1,88 +0,0 @@
import logging
import threading
import gi
from clan_vm_manager.components.interfaces import ClanConfig
from clan_vm_manager.history import list_history
from clan_vm_manager.singletons.toast import ToastOverlay
from clan_vm_manager.singletons.use_views import ViewStack
from clan_vm_manager.singletons.use_vms import ClanStore
from clan_vm_manager.views.details import Details
from clan_vm_manager.views.list import ClanList
from clan_vm_manager.views.logs import Logs
gi.require_version("Adw", "1")
from gi.repository import Adw, Gio, GLib, Gtk
from clan_vm_manager.components.trayicon import TrayIcon
log = logging.getLogger(__name__)
class MainWindow(Adw.ApplicationWindow):
def __init__(self, config: ClanConfig) -> None:
super().__init__()
self.set_title("Clan Manager")
self.set_default_size(980, 850)
overlay = ToastOverlay.use().overlay
view = Adw.ToolbarView()
overlay.set_child(view)
self.set_content(overlay)
header = Adw.HeaderBar()
view.add_top_bar(header)
app = Gio.Application.get_default()
assert app is not None
self.tray_icon: TrayIcon = TrayIcon(app)
# Initialize all ClanStore
threading.Thread(target=self._populate_vms).start()
# Initialize all views
stack_view = ViewStack.use().view
# @hsjobeki: Do not remove clamp it is needed to limit the width
clamp = Adw.Clamp()
clamp.set_child(stack_view)
clamp.set_maximum_size(1000)
scroll = Gtk.ScrolledWindow()
scroll.set_propagate_natural_height(True)
scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
scroll.set_child(clamp)
stack_view.add_named(ClanList(config), "list")
stack_view.add_named(Details(), "details")
stack_view.add_named(Logs(), "logs")
stack_view.set_visible_child_name(config.initial_view)
view.set_content(scroll)
self.connect("destroy", self.on_destroy)
def _set_clan_store_ready(self) -> bool:
ClanStore.use().emit("is_ready")
return GLib.SOURCE_REMOVE
def _populate_vms(self) -> None:
# Execute `clan flakes add <path>` to democlan for this to work
# TODO: Make list_history a generator function
for entry in list_history():
GLib.idle_add(ClanStore.use().create_vm_task, entry)
GLib.idle_add(self._set_clan_store_ready)
def kill_vms(self) -> None:
log.debug("Killing all VMs")
ClanStore.use().kill_all()
def on_destroy(self, source: "Adw.ApplicationWindow") -> None:
log.info("====Destroying Adw.ApplicationWindow===")
ClanStore.use().kill_all()
self.tray_icon.destroy()

View File

@@ -1,164 +0,0 @@
{
adwaita-icon-theme,
clan-cli,
copyDesktopItems,
fontconfig,
gobject-introspection,
gtk4,
libadwaita,
makeDesktopItem,
pygobject-stubs,
pygobject3,
pytest, # Testing framework
pytest-subprocess, # fake the real subprocess behavior to make your tests more independent.
pytest-timeout, # Add timeouts to your tests
pytest-xdist, # Run tests in parallel on multiple cores
buildPythonApplication,
runCommand,
setuptools,
webkitgtk_6_0,
wrapGAppsHook,
python,
lib,
stdenv,
}:
let
source = ./.;
desktop-file = makeDesktopItem {
name = "org.clan.vm-manager";
exec = "clan-vm-manager %u";
icon = "clan-white";
desktopName = "Clan Manager";
startupWMClass = "clan";
mimeTypes = [ "x-scheme-handler/clan" ];
};
# Dependencies that are directly used in the project but nor from internal python packages
externalPythonDeps = [
pygobject3
pygobject-stubs
gtk4
libadwaita
adwaita-icon-theme
]
++ clan-cli.propagatedBuildInputs
++ lib.optionals (!stdenv.isDarwin) [
webkitgtk_6_0
];
# Deps including python packages from the local project
allPythonDeps = [ (python.pkgs.toPythonModule clan-cli) ] ++ externalPythonDeps;
# Runtime binary dependencies required by the application
runtimeDependencies = [
];
# Dependencies required for running tests
externalTestDeps =
externalPythonDeps
++ runtimeDependencies
++ [
pytest # Testing framework
pytest-subprocess # fake the real subprocess behavior to make your tests more independent.
pytest-xdist # Run tests in parallel on multiple cores
pytest-timeout # Add timeouts to your tests
];
# Dependencies required for running tests
testDependencies = runtimeDependencies ++ allPythonDeps ++ externalTestDeps;
# Setup Python environment with all dependencies for running tests
pythonWithTestDeps = python.withPackages (_ps: testDependencies);
in
buildPythonApplication rec {
name = "clan-vm-manager";
src = source;
format = "pyproject";
makeWrapperArgs = [
"--set FONTCONFIG_FILE ${fontconfig.out}/etc/fonts/fonts.conf"
# This prevents problems with mixed glibc versions that might occur when the
# cli is called through a browser built against another glibc
"--unset LD_LIBRARY_PATH"
];
# Deps needed only at build time
nativeBuildInputs = [
setuptools
copyDesktopItems
wrapGAppsHook
gobject-introspection
];
# The necessity of setting buildInputs and propagatedBuildInputs to the
# same values for your Python package within Nix largely stems from ensuring
# that all necessary dependencies are consistently available both
# at build time and runtime,
buildInputs = allPythonDeps ++ runtimeDependencies;
propagatedBuildInputs = allPythonDeps ++ runtimeDependencies ++ [ ];
# also re-expose dependencies so we test them in CI
passthru = {
tests = {
clan-vm-manager-pytest =
runCommand "clan-vm-manager-pytest" { inherit buildInputs propagatedBuildInputs nativeBuildInputs; }
''
cp -r ${source} ./src
chmod +w -R ./src
cd ./src
export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf
export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts
mkdir -p .home/.local/share/fonts
export HOME=.home
fc-cache --verbose
# > fc-cache succeeded
echo "Loaded the following fonts ..."
fc-list
echo "STARTING ..."
export NIX_STATE_DIR=$TMPDIR/nix IN_NIX_SANDBOX=1
${pythonWithTestDeps}/bin/python -m pytest -s -m "not impure" ./tests
touch $out
'';
};
};
# Additional pass-through attributes
passthru.desktop-file = desktop-file;
passthru.externalPythonDeps = externalPythonDeps;
passthru.externalTestDeps = externalTestDeps;
passthru.runtimeDependencies = runtimeDependencies;
passthru.testDependencies = testDependencies;
postInstall = ''
mkdir -p $out/share/icons/hicolor
cp -r ./clan_vm_manager/assets/white-favicons/* $out/share/icons/hicolor
'';
# Don't leak python packages into a devshell.
# It can be very confusing if you `nix run` than load the cli from the devshell instead.
postFixup = ''
rm $out/nix-support/propagated-build-inputs
'';
checkPhase = ''
export FONTCONFIG_FILE=${fontconfig.out}/etc/fonts/fonts.conf
export FONTCONFIG_PATH=${fontconfig.out}/etc/fonts
mkdir -p .home/.local/share/fonts
export HOME=.home
fc-cache --verbose
# > fc-cache succeeded
echo "Loaded the following fonts ..."
fc-list
PYTHONPATH= $out/bin/clan-vm-manager --help
'';
desktopItems = [ desktop-file ];
}

View File

@@ -1,54 +0,0 @@
#!/usr/bin/env bash
set -e -o pipefail
check_git_tag() {
local repo_path="$1"
local target_tag="$2"
# Change directory to the specified Git repository
pushd "$repo_path" > /dev/null 2>&1
# shellcheck disable=SC2181
if [ $? -ne 0 ]; then
echo "Error: Failed to change directory to $repo_path"
return 1
fi
# Get the current Git tag
local current_tag
current_tag=$(git describe --tags --exact-match 2>/dev/null)
# Restore the original directory
popd > /dev/null 2>&1
# Check if the current tag is 2.0
if [ "$current_tag" = "$target_tag" ]; then
echo "Current Git tag in $repo_path is $target_tag"
else
echo "Error: Current Git tag in $repo_path is not $target_tag"
exit 1
fi
}
if [ -z "$1" ]; then
echo "Usage: $0 <democlan>"
exit 1
fi
democlan="$1"
check_git_tag "$democlan" "v2.2"
check_git_tag "." "demo-v2.3"
rm -rf ~/.config/clan
clan history add "clan://$democlan#localsend-wayland1"
clear
cat << EOF
Open up this link in a browser:
"clan://$democlan#localsend-wayland2"
EOF

View File

@@ -1,23 +0,0 @@
{ ... }:
{
perSystem =
{
config,
pkgs,
lib,
system,
...
}:
{
devShells.clan-vm-manager = pkgs.callPackage ./shell.nix {
inherit (config.packages) clan-vm-manager;
};
}
// lib.optionalAttrs (system != lib.platforms.darwin) {
packages.clan-vm-manager = pkgs.python3.pkgs.callPackage ./default.nix {
inherit (config.packages) clan-cli;
};
checks = config.packages.clan-vm-manager.tests;
};
}

View File

@@ -1,22 +0,0 @@
#!/usr/bin/env bash
if ! command -v xdg-mime &> /dev/null; then
echo "Warning: 'xdg-mime' is not available. The desktop file cannot be installed."
fi
ALREADY_INSTALLED=$(nix profile list --json | jq 'has("elements") and (.elements | has("clan-vm-manager"))')
if [ "$ALREADY_INSTALLED" = "true" ]; then
echo "Upgrading installed clan-vm-manager"
nix profile upgrade clan-vm-manager
else
nix profile install .#clan-vm-manager --priority 4
fi
# install desktop file
set -eou pipefail
DESKTOP_FILE_NAME=org.clan.vm-manager.desktop
xdg-mime default "$DESKTOP_FILE_NAME" x-scheme-handler/clan

View File

@@ -1,7 +0,0 @@
# Webkit GTK doesn't interop flawless with Solid.js build result
1. Webkit expects script tag to be in `body` only solid.js puts the in the head.
2. script and css files are loaded with type="module" and crossorigin tags being set. WebKit silently fails to load then.
3. Paths to resiources are not allowed to start with "/" because webkit interprets them relative to the system and not the base url.
4. webkit doesn't support native features such as directly handling external urls (i.e opening them in the default browser)
6. Other problems to be found?

View File

@@ -1,41 +0,0 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "clan-vm-manager"
description = "clan vm manager"
dynamic = ["version"]
scripts = { clan-vm-manager = "clan_vm_manager:main", clan-vm-manager-history = "clan_vm_manager.history:main" }
[project.urls]
Homepage = "https://clan.lol/"
Documentation = "https://docs.clan.lol/"
Repository = "https://git.clan.lol/clan/clan-core"
[tool.setuptools.packages.find]
exclude = ["result"]
[tool.setuptools.package-data]
clan_vm_manager = ["**/assets/*"]
[tool.pytest.ini_options]
testpaths = "tests"
faulthandler_timeout = 60
log_level = "DEBUG"
log_format = "%(levelname)s: %(message)s\n %(pathname)s:%(lineno)d::%(funcName)s"
addopts = "--durations 5 --color=yes --new-first" # Add --pdb for debugging
norecursedirs = "tests/helpers"
markers = ["impure"]
[tool.mypy]
python_version = "3.13"
warn_redundant_casts = true
disallow_untyped_calls = true
disallow_untyped_defs = true
no_implicit_optional = true
[[tool.mypy.overrides]]
module = "argcomplete.*"
ignore_missing_imports = true

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

View File

@@ -1,53 +0,0 @@
{
lib,
stdenv,
clan-vm-manager,
mkShell,
ruff,
desktop-file-utils,
xdg-utils,
mypy,
python3,
gtk4,
libadwaita,
}:
let
devshellTestDeps =
clan-vm-manager.externalTestDeps
++ (with python3.pkgs; [
rope
mypy
setuptools
wheel
pip
]);
in
mkShell {
inherit (clan-vm-manager) nativeBuildInputs;
name = "clan-vm-manager";
buildInputs = [
ruff
gtk4.dev # has the demo called 'gtk4-widget-factory'
libadwaita.devdoc # has the demo called 'adwaita-1-demo'
]
++ devshellTestDeps
# Dependencies for testing for linux hosts
++ (lib.optionals stdenv.isLinux [
xdg-utils # install desktop files
desktop-file-utils # verify desktop files
]);
shellHook = ''
export GIT_ROOT=$(git rev-parse --show-toplevel)
export PKG_ROOT=$GIT_ROOT/pkgs/clan-vm-manager
# Add clan-vm-manager command to PATH
export PATH="$PKG_ROOT/bin":"$PATH"
# Add clan-cli to the python path so that we can import it without building it in nix first
export PYTHONPATH="$GIT_ROOT/pkgs/clan-cli":"$PYTHONPATH"
'';
}

View File

@@ -1,65 +0,0 @@
import contextlib
import os
import signal
import subprocess
from collections.abc import Iterator
from pathlib import Path
from typing import IO, Any
import pytest
_FILE = None | int | IO[Any]
class Command:
def __init__(self) -> None:
self.processes: list[subprocess.Popen[str]] = []
def run(
self,
command: list[str],
extra_env: dict[str, str] | None = None,
stdin: _FILE = None,
stdout: _FILE = None,
stderr: _FILE = None,
workdir: Path | None = None,
) -> subprocess.Popen[str]:
if extra_env is None:
extra_env = {}
env = os.environ.copy()
env.update(extra_env)
# We start a new session here so that we can than more reliably kill all children as well
p = subprocess.Popen(
command,
env=env,
start_new_session=True,
stdout=stdout,
stderr=stderr,
stdin=stdin,
text=True,
cwd=workdir,
)
self.processes.append(p)
return p
def terminate(self) -> None:
# Stop in reverse order in case there are dependencies.
# We just kill all processes as quickly as possible because we don't
# care about corrupted state and want to make tests fasts.
for p in reversed(self.processes):
with contextlib.suppress(OSError):
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
@pytest.fixture
def command() -> Iterator[Command]:
"""
Starts a background command. The process is automatically terminated in the end.
>>> p = command.run(["some", "daemon"])
>>> print(p.pid)
"""
c = Command()
try:
yield c
finally:
c.terminate()

View File

@@ -1,46 +0,0 @@
import logging
import subprocess
import sys
from pathlib import Path
import pytest
from clan_lib.custom_logger import setup_logging
from clan_lib.nix import nix_shell
sys.path.append(str(Path(__file__).parent / "helpers"))
sys.path.append(
str(Path(__file__).parent.parent)
) # Also add clan vm manager to PYTHONPATH
pytest_plugins = [
"temporary_dir",
"root",
"command",
"wayland",
"stdout",
]
# Executed on pytest session start
def pytest_sessionstart(session: pytest.Session) -> None:
# This function will be called once at the beginning of the test session
print("Starting pytest session")
# You can access the session config, items, testsfailed, etc.
print(f"Session config: {session.config}")
setup_logging(logging.DEBUG)
# fixture for git_repo
@pytest.fixture
def git_repo(tmp_path: Path) -> Path:
# initialize a git repository
cmd = nix_shell(["nixpkgs#git"], ["git", "init"])
subprocess.run(cmd, cwd=tmp_path, check=True)
# set user.name and user.email
cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "test"])
subprocess.run(cmd, cwd=tmp_path, check=True)
cmd = nix_shell(["nixpkgs#git"], ["git", "config", "user.email", "test@test.test"])
subprocess.run(cmd, cwd=tmp_path, check=True)
# return the path to the git repository
return tmp_path

View File

@@ -1,25 +0,0 @@
import logging
import os
import shlex
from clan_lib.custom_logger import get_callers
from clan_vm_manager import main
log = logging.getLogger(__name__)
def print_trace(msg: str) -> None:
trace_depth = int(os.environ.get("TRACE_DEPTH", "0"))
callers = get_callers(2, 2 + trace_depth)
if "run_no_stdout" in callers[0]:
callers = get_callers(3, 3 + trace_depth)
callers_str = "\n".join(f"{i + 1}: {caller}" for i, caller in enumerate(callers))
log.debug(f"{msg} \nCallers: \n{callers_str}")
class Cli:
def run(self, args: list[str]) -> None:
cmd = shlex.join(["clan", *args])
print_trace(f"$ {cmd}")
main(args)

View File

@@ -1,35 +0,0 @@
import os
from pathlib import Path
import pytest
TEST_ROOT = Path(__file__).parent.resolve()
PROJECT_ROOT = TEST_ROOT.parent
if CLAN_CORE_ := os.environ.get("CLAN_CORE_PATH"):
CLAN_CORE = Path(CLAN_CORE_)
else:
CLAN_CORE = PROJECT_ROOT.parent.parent
@pytest.fixture(scope="session")
def project_root() -> Path:
"""
Root directory the clan-cli
"""
return PROJECT_ROOT
@pytest.fixture(scope="session")
def test_root() -> Path:
"""
Root directory of the tests
"""
return TEST_ROOT
@pytest.fixture(scope="session")
def clan_core() -> Path:
"""
Directory of the clan-core flake
"""
return CLAN_CORE

View File

@@ -1,34 +0,0 @@
import types
import pytest
class CaptureOutput:
def __init__(self, capsys: pytest.CaptureFixture) -> None:
self.capsys = capsys
self.capsys_disabled = capsys.disabled()
self.capsys_disabled.__enter__()
def __enter__(self) -> "CaptureOutput":
self.capsys_disabled.__exit__(None, None, None)
self.capsys.readouterr()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
res = self.capsys.readouterr()
self.out = res.out
self.err = res.err
# Disable capsys again
self.capsys_disabled = self.capsys.disabled()
self.capsys_disabled.__enter__()
@pytest.fixture
def capture_output(capsys: pytest.CaptureFixture) -> CaptureOutput:
return CaptureOutput(capsys)

View File

@@ -1,27 +0,0 @@
import logging
import os
import tempfile
from collections.abc import Iterator
from pathlib import Path
import pytest
log = logging.getLogger(__name__)
@pytest.fixture
def temporary_home(monkeypatch: pytest.MonkeyPatch) -> Iterator[Path]:
env_dir = os.getenv("TEST_TEMPORARY_DIR")
if env_dir is not None:
path = Path(env_dir).resolve()
log.debug("Temp HOME directory: %s", str(path))
monkeypatch.setenv("HOME", str(path))
monkeypatch.chdir(str(path))
yield path
else:
with tempfile.TemporaryDirectory(prefix="pytest-") as dirpath:
monkeypatch.setenv("HOME", str(dirpath))
monkeypatch.setenv("XDG_CONFIG_HOME", str(Path(dirpath) / ".config"))
monkeypatch.chdir(str(dirpath))
log.debug("Temp HOME directory: %s", str(dirpath))
yield Path(dirpath)

View File

@@ -1,99 +0,0 @@
from pathlib import Path
import pytest
from clan_cli.tests.fixtures_flakes import ClanFlake
from clan_lib.flake import Flake
from clan_vm_manager.clan_uri import ClanURI
def test_get_url() -> None:
# Create a ClanURI object from a remote URI with parameters
uri = ClanURI.from_str("clan://https://example.com?password=1234#myVM")
assert uri.get_url() == "https://example.com?password=1234"
uri = ClanURI.from_str("clan://~/Downloads")
assert uri.get_url().endswith("/Downloads")
uri = ClanURI.from_str("clan:///home/user/Downloads")
assert uri.get_url() == "/home/user/Downloads"
uri = ClanURI.from_str("clan://file:///home/user/Downloads")
assert uri.get_url() == "file:///home/user/Downloads"
@pytest.mark.impure
def test_is_local(flake: ClanFlake) -> None:
uri = ClanURI.from_str(f"clan://git+file://{flake.path}")
assert uri.get_url() == str(flake.path)
assert uri.flake.is_local
myflake = Flake(f"git+file://{flake.path}")
assert myflake.is_local
def test_firefox_strip_uri() -> None:
uri = ClanURI.from_str("clan://git+https//git.clan.lol/clan/democlan.git")
assert uri.get_url() == "git+https://git.clan.lol/clan/democlan.git"
def test_local_uri(temp_dir: Path) -> None:
flake_nix = temp_dir / "flake.nix"
flake_nix.write_text("outputs = _: {}")
# Create a ClanURI object from a local URI
uri = ClanURI.from_str(f"clan://file://{temp_dir}")
assert uri.flake.path == temp_dir
def test_is_remote() -> None:
# Create a ClanURI object from a remote URI
uri = ClanURI.from_str("clan://https://example.com")
assert uri.flake.identifier == "https://example.com"
def test_direct_local_path() -> None:
# Create a ClanURI object from a remote URI
uri = ClanURI.from_str("clan://~/Downloads")
assert uri.get_url().endswith("/Downloads")
def test_direct_local_path2() -> None:
# Create a ClanURI object from a remote URI
uri = ClanURI.from_str("clan:///home/user/Downloads")
assert uri.get_url() == "/home/user/Downloads"
def test_remote_with_clanparams() -> None:
# Create a ClanURI object from a remote URI with parameters
uri = ClanURI.from_str("clan://https://example.com")
assert uri.machine_name == "defaultVM"
assert uri.flake.identifier == "https://example.com"
def test_from_str_remote() -> None:
uri = ClanURI.from_str(url="https://example.com", machine_name="myVM")
assert uri.get_url() == "https://example.com"
assert uri.machine_name == "myVM"
assert uri.flake.identifier == "https://example.com"
def test_from_str_local(temp_dir: Path) -> None:
flake_nix = temp_dir / "flake.nix"
flake_nix.write_text("outputs = _: {}")
uri = ClanURI.from_str(url=str(temp_dir), machine_name="myVM")
assert uri.get_url().endswith(str(temp_dir))
assert uri.machine_name == "myVM"
assert uri.flake.is_local
assert str(uri.flake).endswith(str(temp_dir))
def test_from_str_local_no_machine(temp_dir: Path) -> None:
flake_nix = temp_dir / "flake.nix"
flake_nix.write_text("outputs = _: {}")
uri = ClanURI.from_str(str(temp_dir))
assert uri.get_url().endswith(str(temp_dir))
assert uri.machine_name == "defaultVM"
assert uri.flake.is_local
assert str(uri.flake).endswith(str(temp_dir))

View File

@@ -1,8 +0,0 @@
import pytest
from cli import Cli
def test_help(capfd: pytest.CaptureFixture) -> None:
cli = Cli()
with pytest.raises(SystemExit):
cli.run(["clan-vm-manager", "--help"])

View File

@@ -1,8 +0,0 @@
import time
from wayland import GtkProc
def test_open(app: GtkProc) -> None:
time.sleep(0.5)
assert app.poll() is None

View File

@@ -1,27 +0,0 @@
import sys
from collections.abc import Generator
from subprocess import Popen
from typing import NewType
import pytest
@pytest.fixture(scope="session")
def wayland_compositor() -> Generator[Popen]:
# Start the Wayland compositor (e.g., Weston)
# compositor = Popen(["weston", "--backend=headless-backend.so"])
compositor = Popen(["weston"])
yield compositor
# Cleanup: Terminate the compositor
compositor.terminate()
GtkProc = NewType("GtkProc", Popen)
@pytest.fixture
def app() -> Generator[GtkProc]:
rapp = Popen([sys.executable, "-m", "clan_vm_manager"], text=True)
yield GtkProc(rapp)
# Cleanup: Terminate your application
rapp.terminate()

View File

@@ -3,7 +3,6 @@
{
imports = [
./clan-cli/flake-module.nix
./clan-vm-manager/flake-module.nix
./installer/flake-module.nix
./icon-update/flake-module.nix
./generate-test-vars/flake-module.nix