Merge pull request 'clan_vm_manager: Improve VM start and stop switch. Switch will be disabled while stopping vm' (#877) from Qubasa-main into main
This commit is contained in:
@@ -30,7 +30,6 @@ class VMAttr:
|
|||||||
@contextmanager
|
@contextmanager
|
||||||
def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
|
def qmp_ctx(self) -> Generator[QEMUMonitorProtocol, None, None]:
|
||||||
if self._qmp is None:
|
if self._qmp is None:
|
||||||
log.debug(f"qmp_socket: {self._qmp_socket}")
|
|
||||||
rpath = self._qmp_socket.resolve()
|
rpath = self._qmp_socket.resolve()
|
||||||
if not rpath.exists():
|
if not rpath.exists():
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
|
|||||||
@@ -76,7 +76,6 @@ def _init_proc(
|
|||||||
# Print some information
|
# Print some information
|
||||||
pid = os.getpid()
|
pid = os.getpid()
|
||||||
gpid = os.getpgid(pid=pid)
|
gpid = os.getpgid(pid=pid)
|
||||||
print(f"Started new process pid={pid} gpid={gpid}", file=sys.stderr)
|
|
||||||
|
|
||||||
# Set the process name
|
# Set the process name
|
||||||
_set_proc_name(proc_name)
|
_set_proc_name(proc_name)
|
||||||
@@ -84,20 +83,26 @@ def _init_proc(
|
|||||||
# Close stdin
|
# Close stdin
|
||||||
sys.stdin.close()
|
sys.stdin.close()
|
||||||
|
|
||||||
|
linebreak = "=" * 5
|
||||||
# Execute the main function
|
# Execute the main function
|
||||||
print(f"Executing function {func.__name__} now", file=sys.stderr)
|
print(linebreak + f"{func.__name__}:{pid}" + linebreak, file=sys.stderr)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
func(**kwargs)
|
func(**kwargs)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
if on_except is not None:
|
if on_except is not None:
|
||||||
on_except(ex, mp.current_process())
|
on_except(ex, mp.current_process())
|
||||||
finally:
|
|
||||||
|
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||||
pid = os.getpid()
|
pid = os.getpid()
|
||||||
gpid = os.getpgid(pid=pid)
|
gpid = os.getpgid(pid=pid)
|
||||||
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
|
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
|
||||||
os.killpg(gpid, signal.SIGTERM)
|
os.killpg(gpid, signal.SIGTERM)
|
||||||
|
|
||||||
|
# Don't use a finally block here, because we want the exitcode to be set to
|
||||||
|
# 0 if the function returns normally
|
||||||
|
|
||||||
|
|
||||||
def spawn(
|
def spawn(
|
||||||
*,
|
*,
|
||||||
@@ -122,10 +127,6 @@ def spawn(
|
|||||||
)
|
)
|
||||||
proc.start()
|
proc.start()
|
||||||
|
|
||||||
# Print some information
|
|
||||||
cmd = f"tail -f {out_file}"
|
|
||||||
log.info(f"Connect to stdout with: {cmd}")
|
|
||||||
|
|
||||||
# Return the process
|
# Return the process
|
||||||
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
|
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import time
|
||||||
import weakref
|
import weakref
|
||||||
|
from collections.abc import Generator
|
||||||
|
from contextlib import contextmanager
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import IO, Any, ClassVar
|
from typing import IO, Any, ClassVar
|
||||||
@@ -97,8 +100,7 @@ class Clans:
|
|||||||
class VM(GObject.Object):
|
class VM(GObject.Object):
|
||||||
# Define a custom signal with the name "vm_stopped" and a string argument for the message
|
# Define a custom signal with the name "vm_stopped" and a string argument for the message
|
||||||
__gsignals__: ClassVar = {
|
__gsignals__: ClassVar = {
|
||||||
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object]),
|
"vm_status_changed": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object])
|
||||||
"build_vm": (GObject.SignalFlags.RUN_FIRST, None, [GObject.Object, bool]),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -107,23 +109,40 @@ class VM(GObject.Object):
|
|||||||
data: HistoryEntry,
|
data: HistoryEntry,
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.KILL_TIMEOUT = 6 # seconds
|
|
||||||
|
# Store the data from the history entry
|
||||||
self.data = data
|
self.data = data
|
||||||
self.process = MPProcess("dummy", mp.Process(), Path("./dummy"))
|
|
||||||
self._watcher_id: int = 0
|
# Create a process object to store the VM process
|
||||||
self._stop_watcher_id: int = 0
|
self.vm_process = MPProcess("vm_dummy", mp.Process(), Path("./dummy"))
|
||||||
self._stop_timer_init: datetime | None = None
|
self.build_process = MPProcess("build_dummy", mp.Process(), Path("./dummy"))
|
||||||
self._logs_id: int = 0
|
self._start_thread: threading.Thread = threading.Thread()
|
||||||
self._log_file: IO[str] | None = None
|
self.machine: Machine | None = None
|
||||||
|
|
||||||
|
# Watcher to stop the VM
|
||||||
|
self.KILL_TIMEOUT = 20 # seconds
|
||||||
|
self._stop_thread: threading.Thread = threading.Thread()
|
||||||
|
|
||||||
|
# Build progress bar vars
|
||||||
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
|
self.progress_bar: Gtk.ProgressBar = Gtk.ProgressBar()
|
||||||
self.progress_bar.hide()
|
self.progress_bar.hide()
|
||||||
self.progress_bar.set_hexpand(True) # Horizontally expand
|
self.progress_bar.set_hexpand(True) # Horizontally expand
|
||||||
self.prog_bar_id: int = 0
|
self.prog_bar_id: int = 0
|
||||||
|
|
||||||
|
# Create a temporary directory to store the logs
|
||||||
self.log_dir = tempfile.TemporaryDirectory(
|
self.log_dir = tempfile.TemporaryDirectory(
|
||||||
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
|
prefix="clan_vm-", suffix=f"-{self.data.flake.flake_attr}"
|
||||||
)
|
)
|
||||||
|
self._logs_id: int = 0
|
||||||
|
self._log_file: IO[str] | None = None
|
||||||
|
|
||||||
|
# Make sure the VM is killed when the reference to this object is dropped
|
||||||
self._finalizer = weakref.finalize(self, self.kill_ref_drop)
|
self._finalizer = weakref.finalize(self, self.kill_ref_drop)
|
||||||
self.connect("build_vm", self.build_vm)
|
|
||||||
|
# We use a context manager to create the machine object
|
||||||
|
# and make sure it is destroyed when the context is exited
|
||||||
|
@contextmanager
|
||||||
|
def create_machine(self) -> Generator[Machine, None, None]:
|
||||||
uri = ClanURI.from_str(
|
uri = ClanURI.from_str(
|
||||||
url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr
|
url=self.data.flake.flake_url, flake_attr=self.data.flake.flake_attr
|
||||||
)
|
)
|
||||||
@@ -138,83 +157,82 @@ class VM(GObject.Object):
|
|||||||
name=self.data.flake.flake_attr,
|
name=self.data.flake.flake_attr,
|
||||||
flake=url, # type: ignore
|
flake=url, # type: ignore
|
||||||
)
|
)
|
||||||
|
yield self.machine
|
||||||
|
self.machine = None
|
||||||
|
|
||||||
def _pulse_progress_bar(self) -> bool:
|
def _pulse_progress_bar(self) -> bool:
|
||||||
|
if self.progress_bar.is_visible():
|
||||||
self.progress_bar.pulse()
|
self.progress_bar.pulse()
|
||||||
return GLib.SOURCE_CONTINUE
|
return GLib.SOURCE_CONTINUE
|
||||||
|
else:
|
||||||
|
return GLib.SOURCE_REMOVE
|
||||||
|
|
||||||
def build_vm(self, vm: "VM", _vm: "VM", building: bool) -> None:
|
def __start(self) -> None:
|
||||||
if building:
|
with self.create_machine() as machine:
|
||||||
log.info("Building VM")
|
# Start building VM
|
||||||
|
log.info(f"Building VM {self.get_id()}")
|
||||||
|
self.build_process = spawn(
|
||||||
|
on_except=None,
|
||||||
|
out_file=Path(str(self.log_dir.name)) / "build.log",
|
||||||
|
func=vms.run.build_vm,
|
||||||
|
machine=machine,
|
||||||
|
vm=self.data.flake.vm,
|
||||||
|
)
|
||||||
|
GLib.idle_add(self.emit, "vm_status_changed", self)
|
||||||
|
|
||||||
|
# Start the progress bar and show it
|
||||||
self.progress_bar.show()
|
self.progress_bar.show()
|
||||||
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar)
|
self.prog_bar_id = GLib.timeout_add(100, self._pulse_progress_bar)
|
||||||
if self.prog_bar_id == 0:
|
if self.prog_bar_id == 0:
|
||||||
raise ClanError("Couldn't spawn a progess bar task")
|
log.error("Couldn't spawn a progess bar task")
|
||||||
else:
|
|
||||||
|
# Wait for the build to finish then hide the progress bar
|
||||||
|
self.build_process.proc.join()
|
||||||
self.progress_bar.hide()
|
self.progress_bar.hide()
|
||||||
if not GLib.Source.remove(self.prog_bar_id):
|
|
||||||
log.error("Failed to remove progress bar task")
|
|
||||||
log.info("VM built")
|
|
||||||
|
|
||||||
def __start(self) -> None:
|
# Check if the VM was built successfully
|
||||||
log.info(f"Starting VM {self.get_id()}")
|
if self.build_process.proc.exitcode != 0:
|
||||||
vm = vms.run.inspect_vm(self.machine)
|
log.error(f"Failed to build VM {self.get_id()}")
|
||||||
|
GLib.idle_add(self.emit, "vm_status_changed", self)
|
||||||
|
return
|
||||||
|
log.info(f"Successfully built VM {self.get_id()}")
|
||||||
|
|
||||||
# GLib.idle_add(self.emit, "build_vm", self, True)
|
# Start the VM
|
||||||
# self.process = spawn(
|
self.vm_process = spawn(
|
||||||
# on_except=None,
|
|
||||||
# log_dir=Path(str(self.log_dir.name)),
|
|
||||||
# func=vms.run.build_vm,
|
|
||||||
# machine=self.machine,
|
|
||||||
# vm=vm,
|
|
||||||
# )
|
|
||||||
# self.process.proc.join()
|
|
||||||
|
|
||||||
# GLib.idle_add(self.emit, "build_vm", self, False)
|
|
||||||
|
|
||||||
# if self.process.proc.exitcode != 0:
|
|
||||||
# log.error(f"Failed to build VM {self.get_id()}")
|
|
||||||
# return
|
|
||||||
|
|
||||||
self.process = spawn(
|
|
||||||
on_except=None,
|
on_except=None,
|
||||||
out_file=Path(str(self.log_dir.name)) / "vm.log",
|
out_file=Path(str(self.log_dir.name)) / "vm.log",
|
||||||
func=vms.run.run_vm,
|
func=vms.run.run_vm,
|
||||||
vm=vm,
|
vm=self.data.flake.vm,
|
||||||
)
|
)
|
||||||
log.debug(f"Started VM {self.get_id()}")
|
log.debug(f"Started VM {self.get_id()}")
|
||||||
GLib.idle_add(self.emit, "vm_status_changed", self)
|
GLib.idle_add(self.emit, "vm_status_changed", self)
|
||||||
log.debug(f"Starting logs watcher on file: {self.process.out_file}")
|
|
||||||
|
# Start the logs watcher
|
||||||
self._logs_id = GLib.timeout_add(50, self._get_logs_task)
|
self._logs_id = GLib.timeout_add(50, self._get_logs_task)
|
||||||
if self._logs_id == 0:
|
if self._logs_id == 0:
|
||||||
raise ClanError("Failed to add logs watcher")
|
log.error("Failed to start VM log watcher")
|
||||||
|
log.debug(f"Starting logs watcher on file: {self.vm_process.out_file}")
|
||||||
|
|
||||||
log.debug(f"Starting VM watcher for: {self.machine.name}")
|
# Wait for the VM to stop
|
||||||
self._watcher_id = GLib.timeout_add(50, self._vm_watcher_task)
|
self.vm_process.proc.join()
|
||||||
if self._watcher_id == 0:
|
log.debug(f"VM {self.get_id()} has stopped")
|
||||||
raise ClanError("Failed to add watcher")
|
GLib.idle_add(self.emit, "vm_status_changed", self)
|
||||||
|
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
if self.is_running():
|
if self.is_running():
|
||||||
log.warn("VM is already running")
|
log.warn("VM is already running. Ignoring start request")
|
||||||
return
|
|
||||||
threading.Thread(target=self.__start).start()
|
|
||||||
|
|
||||||
def _vm_watcher_task(self) -> bool:
|
|
||||||
if not self.is_running():
|
|
||||||
self.emit("vm_status_changed", self)
|
self.emit("vm_status_changed", self)
|
||||||
log.debug("Removing VM watcher")
|
return
|
||||||
return GLib.SOURCE_REMOVE
|
self._start_thread = threading.Thread(target=self.__start)
|
||||||
|
self._start_thread.start()
|
||||||
return GLib.SOURCE_CONTINUE
|
|
||||||
|
|
||||||
def _get_logs_task(self) -> bool:
|
def _get_logs_task(self) -> bool:
|
||||||
if not self.process.out_file.exists():
|
if not self.vm_process.out_file.exists():
|
||||||
return GLib.SOURCE_CONTINUE
|
return GLib.SOURCE_CONTINUE
|
||||||
|
|
||||||
if not self._log_file:
|
if not self._log_file:
|
||||||
try:
|
try:
|
||||||
self._log_file = open(self.process.out_file)
|
self._log_file = open(self.vm_process.out_file)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
log.exception(ex)
|
log.exception(ex)
|
||||||
self._log_file = None
|
self._log_file = None
|
||||||
@@ -232,42 +250,60 @@ class VM(GObject.Object):
|
|||||||
return GLib.SOURCE_CONTINUE
|
return GLib.SOURCE_CONTINUE
|
||||||
|
|
||||||
def is_running(self) -> bool:
|
def is_running(self) -> bool:
|
||||||
return self.process.proc.is_alive()
|
return self._start_thread.is_alive()
|
||||||
|
|
||||||
|
def is_building(self) -> bool:
|
||||||
|
return self.build_process.proc.is_alive()
|
||||||
|
|
||||||
|
def is_shutting_down(self) -> bool:
|
||||||
|
return self._stop_thread.is_alive()
|
||||||
|
|
||||||
def get_id(self) -> str:
|
def get_id(self) -> str:
|
||||||
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
|
return f"{self.data.flake.flake_url}#{self.data.flake.flake_attr}"
|
||||||
|
|
||||||
def __shutdown_watchdog(self) -> None:
|
|
||||||
if self.is_running():
|
|
||||||
assert self._stop_timer_init is not None
|
|
||||||
diff = datetime.now() - self._stop_timer_init
|
|
||||||
if diff.seconds > self.KILL_TIMEOUT:
|
|
||||||
log.error(f"VM {self.get_id()} has not stopped. Killing it")
|
|
||||||
self.process.kill_group()
|
|
||||||
return GLib.SOURCE_CONTINUE
|
|
||||||
else:
|
|
||||||
log.info(f"VM {self.get_id()} has stopped")
|
|
||||||
return GLib.SOURCE_REMOVE
|
|
||||||
|
|
||||||
def __stop(self) -> None:
|
def __stop(self) -> None:
|
||||||
log.info(f"Stopping VM {self.get_id()}")
|
log.info(f"Stopping VM {self.get_id()}")
|
||||||
|
|
||||||
|
start_time = datetime.now()
|
||||||
|
while self.is_running():
|
||||||
|
diff = datetime.now() - start_time
|
||||||
|
if diff.seconds > self.KILL_TIMEOUT:
|
||||||
|
log.error(
|
||||||
|
f"VM {self.get_id()} has not stopped after {self.KILL_TIMEOUT}s. Killing it"
|
||||||
|
)
|
||||||
|
self.vm_process.kill_group()
|
||||||
|
return
|
||||||
|
if self.is_building():
|
||||||
|
log.info(f"VM {self.get_id()} is still building. Killing it")
|
||||||
|
self.build_process.kill_group()
|
||||||
|
return
|
||||||
|
if not self.machine:
|
||||||
|
log.error(f"Machine object is None. Killing VM {self.get_id()}")
|
||||||
|
self.vm_process.kill_group()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Try to shutdown the VM gracefully using QMP
|
||||||
try:
|
try:
|
||||||
with self.machine.vm.qmp_ctx() as qmp:
|
with self.machine.vm.qmp_ctx() as qmp:
|
||||||
qmp.command("system_powerdown")
|
qmp.command("system_powerdown")
|
||||||
except ClanError as e:
|
except (OSError, ClanError):
|
||||||
log.debug(e)
|
# log.debug(f"QMP command 'system_powerdown' ignored. Error: {e}")
|
||||||
|
pass
|
||||||
|
|
||||||
self._stop_timer_init = datetime.now()
|
# Try 2 times a second
|
||||||
self._stop_watcher_id = GLib.timeout_add(100, self.__shutdown_watchdog)
|
time.sleep(0.5)
|
||||||
if self._stop_watcher_id == 0:
|
GLib.idle_add(self.emit, "vm_status_changed", self)
|
||||||
raise ClanError("Failed to add stop watcher")
|
log.debug(f"VM {self.get_id()} has stopped")
|
||||||
|
|
||||||
def shutdown(self) -> None:
|
def shutdown(self) -> None:
|
||||||
if not self.is_running():
|
if not self.is_running():
|
||||||
|
log.warning("VM not running. Ignoring shutdown request.")
|
||||||
return
|
return
|
||||||
log.info(f"Stopping VM {self.get_id()}")
|
if self.is_shutting_down():
|
||||||
threading.Thread(target=self.__stop).start()
|
log.warning("Shutdown already in progress")
|
||||||
|
return
|
||||||
|
self._stop_thread = threading.Thread(target=self.__stop)
|
||||||
|
self._stop_thread.start()
|
||||||
|
|
||||||
def kill_ref_drop(self) -> None:
|
def kill_ref_drop(self) -> None:
|
||||||
if self.is_running():
|
if self.is_running():
|
||||||
@@ -279,13 +315,13 @@ class VM(GObject.Object):
|
|||||||
log.warning(f"Tried to kill VM {self.get_id()} is not running")
|
log.warning(f"Tried to kill VM {self.get_id()} is not running")
|
||||||
return
|
return
|
||||||
log.info(f"Killing VM {self.get_id()} now")
|
log.info(f"Killing VM {self.get_id()} now")
|
||||||
self.process.kill_group()
|
self.vm_process.kill_group()
|
||||||
|
|
||||||
def read_whole_log(self) -> str:
|
def read_whole_log(self) -> str:
|
||||||
if not self.process.out_file.exists():
|
if not self.vm_process.out_file.exists():
|
||||||
log.error(f"Log file {self.process.out_file} does not exist")
|
log.error(f"Log file {self.vm_process.out_file} does not exist")
|
||||||
return ""
|
return ""
|
||||||
return self.process.out_file.read_text()
|
return self.vm_process.out_file.read_text()
|
||||||
|
|
||||||
|
|
||||||
class VMs:
|
class VMs:
|
||||||
|
|||||||
@@ -194,6 +194,10 @@ class ClanList(Gtk.Box):
|
|||||||
box.append(pref_button)
|
box.append(pref_button)
|
||||||
|
|
||||||
switch.connect("notify::active", partial(self.on_row_toggle, vm))
|
switch.connect("notify::active", partial(self.on_row_toggle, vm))
|
||||||
|
# def on_switch_state_set(switch: Any, state: bool) -> bool:
|
||||||
|
# return True
|
||||||
|
# switch.connect("state-set", on_switch_state_set)
|
||||||
|
|
||||||
vm.connect("vm_status_changed", partial(self.vm_status_changed, switch))
|
vm.connect("vm_status_changed", partial(self.vm_status_changed, switch))
|
||||||
|
|
||||||
# suffix.append(box)
|
# suffix.append(box)
|
||||||
@@ -286,18 +290,20 @@ class ClanList(Gtk.Box):
|
|||||||
if not Join.use().list_store.get_n_items():
|
if not Join.use().list_store.get_n_items():
|
||||||
self.join_boxed_list.add_css_class("no-shadow")
|
self.join_boxed_list.add_css_class("no-shadow")
|
||||||
|
|
||||||
def on_row_toggle(self, vm: VM, row: Adw.SwitchRow, state: bool) -> None:
|
def on_row_toggle(self, vm: VM, switch: Gtk.Switch, user_state: bool) -> None:
|
||||||
if row.get_active():
|
if switch.get_active():
|
||||||
row.set_state(False)
|
switch.set_state(False)
|
||||||
vm.start()
|
vm.start()
|
||||||
|
else:
|
||||||
if not row.get_active():
|
switch.set_state(True)
|
||||||
row.set_state(True)
|
|
||||||
vm.shutdown()
|
vm.shutdown()
|
||||||
|
switch.set_sensitive(False)
|
||||||
|
|
||||||
def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None:
|
def vm_status_changed(self, switch: Gtk.Switch, vm: VM, _vm: VM) -> None:
|
||||||
switch.set_active(vm.is_running())
|
switch.set_state(vm.is_running() and not vm.is_building())
|
||||||
switch.set_state(vm.is_running())
|
if switch.get_sensitive() is False and not vm.is_building():
|
||||||
exitc = vm.process.proc.exitcode
|
switch.set_sensitive(True)
|
||||||
|
|
||||||
|
exitc = vm.vm_process.proc.exitcode
|
||||||
if not vm.is_running() and exitc != 0:
|
if not vm.is_running() and exitc != 0:
|
||||||
log.error(f"VM exited with error. Exitcode: {exitc}")
|
log.error(f"VM exited with error. Exitcode: {exitc}")
|
||||||
|
|||||||
Reference in New Issue
Block a user