clan_vm_manager: New directory structure

This commit is contained in:
Qubasa
2024-03-03 16:38:16 +07:00
parent 754e0ca9e8
commit 359ad22c90
13 changed files with 13 additions and 37 deletions

View File

@@ -0,0 +1,137 @@
import logging
import os
import signal
import sys
import traceback
from datetime import datetime
from pathlib import Path
from typing import Any
import gi
gi.require_version("GdkPixbuf", "2.0")
import dataclasses
import multiprocessing as mp
from collections.abc import Callable
log = logging.getLogger(__name__)
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def _kill_group(proc: mp.Process) -> None:
pid = proc.pid
if proc.is_alive() and pid:
os.killpg(pid, signal.SIGTERM)
else:
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
@dataclasses.dataclass(frozen=True)
class MPProcess:
name: str
proc: mp.Process
out_file: Path
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
_kill_group(proc=self.proc)
def _set_proc_name(name: str) -> None:
if sys.platform != "linux":
return
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _init_proc(
func: Callable,
out_file: Path,
proc_name: str,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
tstart: datetime,
**kwargs: Any,
) -> None:
# Create a new process group
os.setsid()
# Open stdout and stderr
with open(out_file, "w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
# Set the process name
_set_proc_name(proc_name)
# Close stdin
sys.stdin.close()
linebreak = "=" * 5
# Execute the main function
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
print(f"Spawn overhead time: {datetime.now() - tstart}s", file=sys.stderr)
try:
func(**kwargs)
except Exception as ex:
traceback.print_exc()
if on_except is not None:
on_except(ex, mp.current_process())
# Kill the new process and all its children by sending a SIGTERM signal to the process group
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
os.killpg(gpid, signal.SIGTERM)
sys.exit(1)
# Don't use a finally block here, because we want the exitcode to be set to
# 0 if the function returns normally
def spawn(
*,
out_file: Path,
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
func: Callable,
**kwargs: Any,
) -> MPProcess:
tstart = datetime.now()
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="forkserver")
# Set names
proc_name = f"MPExec:{func.__name__}"
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, proc_name, on_except, tstart),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Return the process
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
return mp_proc

View File

@@ -0,0 +1,218 @@
import logging
from collections import OrderedDict
from collections.abc import Callable
from typing import Any, Generic, TypeVar
import gi
gi.require_version("Gio", "2.0")
from gi.repository import Gio, GObject
log = logging.getLogger(__name__)
# Define type variables for key and value types
K = TypeVar("K") # Key type
V = TypeVar(
"V", bound=GObject.Object
) # Value type, bound to GObject.GObject or its subclasses
class GKVStore(GObject.GObject, Gio.ListModel, Generic[K, V]):
__gtype_name__ = "MyGKVStore"
"""
A simple key-value store that implements the Gio.ListModel interface, with generic types for keys and values.
Only use self[key] and del self[key] for accessing the items for better performance.
This class could be optimized by having the objects remember their position in the list.
"""
def __init__(self, gtype: type[V], key_gen: Callable[[V], K]) -> None:
super().__init__()
self.gtype = gtype
self.key_gen = key_gen
self._items: "OrderedDict[K, V]" = OrderedDict()
##################################
# #
# Gio.ListStore Interface #
# #
##################################
@classmethod
def new(cls: Any, gtype: type[V]) -> "GKVStore":
return cls.__new__(cls, gtype)
def append(self, item: V) -> None:
key = self.key_gen(item)
self[key] = item
def find(self, item: V) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if v == item:
return True, i
return False, -1
def find_with_equal_func(
self, item: V, equal_func: Callable[[V, V], bool]
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item):
return True, i
return False, -1
def find_with_equal_func_full(
self, item: V, equal_func: Callable[[V, V, Any], bool], user_data: Any
) -> tuple[bool, int]:
log.warning("Finding is O(n) in GKVStore. Better use indexing")
for i, v in enumerate(self.values()):
if equal_func(v, item, user_data):
return True, i
return False, -1
def insert(self, position: int, item: V) -> None:
log.warning("Inserting is O(n) in GKVStore. Better use append")
log.warning(
"This functions may have incorrect items_changed signal behavior. Please test it"
)
key = self.key_gen(item)
if key in self._items:
raise ValueError("Key already exists in the dictionary")
if position < 0 or position > len(self._items):
raise IndexError("Index out of range")
# Temporary storage for items to be reinserted
temp_list = [(k, self._items[k]) for k in list(self.keys())[position:]]
# Delete items from the original dict
for k in list(self.keys())[position:]:
del self._items[k]
# Insert the new key-value pair
self._items[key] = item
# Reinsert the items
for i, (k, v) in enumerate(temp_list):
self._items[k] = v
# Notify the model of the changes
self.items_changed(position, 0, 1)
def insert_sorted(
self, item: V, compare_func: Callable[[V, V, Any], int], user_data: Any
) -> None:
raise NotImplementedError("insert_sorted is not implemented in GKVStore")
def remove(self, position: int) -> None:
if position < 0 or position >= self.get_n_items():
return
key = self.keys()[position]
del self[key]
self.items_changed(position, 1, 0)
def remove_all(self) -> None:
self._items.clear()
self.items_changed(0, len(self._items), 0)
def sort(self, compare_func: Callable[[V, V, Any], int], user_data: Any) -> None:
raise NotImplementedError("sort is not implemented in GKVStore")
def splice(self, position: int, n_removals: int, additions: list[V]) -> None:
raise NotImplementedError("splice is not implemented in GKVStore")
##################################
# #
# Gio.ListModel Interface #
# #
##################################
def get_item(self, position: int) -> V | None:
if position < 0 or position >= self.get_n_items():
return None
# Access items by index since OrderedDict does not support direct indexing
key = list(self._items.keys())[position]
return self._items[key]
def do_get_item(self, position: int) -> V | None:
return self.get_item(position)
def get_item_type(self) -> GObject.GType:
return self.gtype.__gtype__
def do_get_item_type(self) -> GObject.GType:
return self.get_item_type()
def get_n_items(self) -> int:
return len(self._items)
def do_get_n_items(self) -> int:
return self.get_n_items()
##################################
# #
# Dict Interface #
# #
##################################
def keys(self) -> list[K]:
return list(self._items.keys())
def values(self) -> list[V]:
return list(self._items.values())
def items(self) -> list[tuple[K, V]]:
return list(self._items.items())
def get(self, key: K, default: V | None = None) -> V | None:
return self._items.get(key, default)
# O(1) operation if the key does not exist, O(n) if it does
def __setitem__(self, key: K, value: V) -> None:
# If the key already exists, remove it O(n)
# TODO: We have to check if updating an existing key is working correctly
if key in self._items:
log.warning("Updating an existing key in GKVStore is O(n)")
position = self.keys().index(key)
self._items[key] = value
self.items_changed(position, 1, 1)
else:
# Add the new key-value pair
self._items[key] = value
self._items.move_to_end(key)
position = max(len(self._items) - 1, 0)
self.items_changed(position, 0, 1)
# O(n) operation
def __delitem__(self, key: K) -> None:
position = self.keys().index(key)
del self._items[key]
self.items_changed(position, 1, 0)
def __len__(self) -> int:
return len(self._items)
# O(1) operation
def __getitem__(self, key: K) -> V:
return self._items[key]
def __contains__(self, key: K) -> bool:
return key in self._items
def __str__(self) -> str:
resp = "GKVStore(\n"
for k, v in self._items.items():
resp += f"{k}: {v}\n"
resp += ")"
return resp
def __repr__(self) -> str:
return self._items.__str__()
##################################
# #
# Custom Methods #
# #
##################################
def first(self) -> V:
return self.values()[0]
def last(self) -> V:
return self.values()[-1]

View File

@@ -0,0 +1,10 @@
from dataclasses import dataclass
import gi
gi.require_version("Gtk", "4.0")
@dataclass
class ClanConfig:
initial_view: str

File diff suppressed because it is too large Load Diff