clan_vm_manager: Partially working process executor with killpg
This commit is contained in:
102
pkgs/clan-vm-manager/clan_vm_manager/executor.py
Normal file
102
pkgs/clan-vm-manager/clan_vm_manager/executor.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import gi
|
||||||
|
|
||||||
|
gi.require_version("GdkPixbuf", "2.0")
|
||||||
|
|
||||||
|
import multiprocessing as mp
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
|
||||||
|
class MPProcess:
|
||||||
|
def __init__(
|
||||||
|
self, *, name: str, proc: mp.Process, out_file_name: str, in_file_name: str
|
||||||
|
) -> None:
|
||||||
|
self.name = name
|
||||||
|
self.proc = proc
|
||||||
|
self.out_file_name = out_file_name
|
||||||
|
self.in_file_name = in_file_name
|
||||||
|
|
||||||
|
def kill_all(self) -> None:
|
||||||
|
pid = self.proc.pid
|
||||||
|
assert pid is not None
|
||||||
|
|
||||||
|
# Get the process group ID of the new process
|
||||||
|
new_pgid = os.getpgid(pid)
|
||||||
|
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||||
|
os.killpg(new_pgid, signal.SIGTERM)
|
||||||
|
|
||||||
|
# def get_all_output(self) -> str:
|
||||||
|
# os.lseek(self.out_fd, 0, os.SEEK_SET)
|
||||||
|
# return os.read(self.out_fd, 1024).decode("utf-8")
|
||||||
|
|
||||||
|
# def write_all_input(self, input_str: str) -> None:
|
||||||
|
# os.lseek(self.in_fd, 0, os.SEEK_SET)
|
||||||
|
# os.write(self.in_fd, input_str.encode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
def init_proc(
|
||||||
|
func: Callable, out_file: str, in_file: str, wait_stdin_connect: bool, **kwargs: Any
|
||||||
|
) -> None:
|
||||||
|
os.setsid()
|
||||||
|
|
||||||
|
out_fd = os.open(out_file, flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC)
|
||||||
|
os.dup2(out_fd, sys.stdout.fileno())
|
||||||
|
os.dup2(out_fd, sys.stderr.fileno())
|
||||||
|
|
||||||
|
flags = None
|
||||||
|
if wait_stdin_connect:
|
||||||
|
print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr)
|
||||||
|
flags = os.O_RDONLY
|
||||||
|
else:
|
||||||
|
flags = os.O_RDONLY | os.O_NONBLOCK
|
||||||
|
|
||||||
|
in_fd = os.open(in_file, flags=flags)
|
||||||
|
os.dup2(in_fd, sys.stdin.fileno())
|
||||||
|
|
||||||
|
print(f"Executing function {func.__name__} now", file=sys.stderr)
|
||||||
|
try:
|
||||||
|
func(**kwargs)
|
||||||
|
except Exception:
|
||||||
|
traceback.print_exc()
|
||||||
|
pid = os.getpid()
|
||||||
|
gpid = os.getpgid(pid=pid)
|
||||||
|
print(f"Killing process group pid={pid} gpid={gpid}")
|
||||||
|
os.killpg(gpid, signal.SIGKILL)
|
||||||
|
|
||||||
|
|
||||||
|
def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess:
|
||||||
|
if mp.get_start_method(allow_none=True) is None:
|
||||||
|
print("Setting start method to spawn")
|
||||||
|
mp.set_start_method(method="spawn")
|
||||||
|
# rand_name = str(uuid.uuid4())
|
||||||
|
rand_name = "test"
|
||||||
|
proc_name = f"MPExecutor:{func.__name__}:{rand_name}"
|
||||||
|
out_file_name = f"{rand_name}_out.log"
|
||||||
|
in_file_name = f"{rand_name}_in.log"
|
||||||
|
|
||||||
|
if os.path.exists(in_file_name):
|
||||||
|
os.unlink(in_file_name)
|
||||||
|
os.mkfifo(in_file_name)
|
||||||
|
|
||||||
|
proc = mp.Process(
|
||||||
|
target=init_proc,
|
||||||
|
args=(func, out_file_name, in_file_name, wait_stdin_connect),
|
||||||
|
name=proc_name,
|
||||||
|
kwargs=kwargs,
|
||||||
|
)
|
||||||
|
proc.start()
|
||||||
|
assert proc.pid is not None
|
||||||
|
print(f"Started process '{proc_name}'. pid={proc.pid} gpid={os.getpgid(proc.pid)}")
|
||||||
|
|
||||||
|
mp_proc = MPProcess(
|
||||||
|
name=proc_name,
|
||||||
|
proc=proc,
|
||||||
|
out_file_name=out_file_name,
|
||||||
|
in_file_name=in_file_name,
|
||||||
|
)
|
||||||
|
return mp_proc
|
||||||
@@ -4,14 +4,33 @@ from pathlib import Path
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import gi
|
import gi
|
||||||
from clan_cli import history, vms
|
from clan_cli import history
|
||||||
|
|
||||||
gi.require_version("GdkPixbuf", "2.0")
|
gi.require_version("GdkPixbuf", "2.0")
|
||||||
|
|
||||||
from gi.repository import GdkPixbuf
|
from gi.repository import GdkPixbuf
|
||||||
|
|
||||||
from clan_vm_manager import assets
|
from clan_vm_manager import assets
|
||||||
|
|
||||||
|
|
||||||
|
# Define a function that writes to the memfd
|
||||||
|
def dummy_f() -> None:
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
c = 0
|
||||||
|
while True: # Simulate a long running process
|
||||||
|
print(f"out: Hello from process c={c}", file=sys.stdout)
|
||||||
|
print(f"err: Hello from process c={c}", file=sys.stderr)
|
||||||
|
user = input("Enter to continue: \n")
|
||||||
|
if user == "q":
|
||||||
|
raise Exception("User quit")
|
||||||
|
print(f"User entered {user}", file=sys.stdout)
|
||||||
|
print(f"User entered {user}", file=sys.stderr)
|
||||||
|
time.sleep(1) # Wait for 1 second
|
||||||
|
c += 1
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class VMBase:
|
class VMBase:
|
||||||
icon: Path | GdkPixbuf.Pixbuf
|
icon: Path | GdkPixbuf.Pixbuf
|
||||||
@@ -46,11 +65,21 @@ class VMBase:
|
|||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
print(f"Running VM {self.name}")
|
print(f"Running VM {self.name}")
|
||||||
import asyncio
|
# vm = vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM")
|
||||||
|
import os
|
||||||
|
|
||||||
# raise Exception("Cannot run VMs yet")
|
from .executor import spawn
|
||||||
vm = asyncio.run(vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM"))
|
|
||||||
vms.run.run_vm(vm)
|
# proc = spawn(vms.run.run_vm, vm=vm)
|
||||||
|
proc = spawn(wait_stdin_connect=True, func=dummy_f)
|
||||||
|
|
||||||
|
pid = os.getpid()
|
||||||
|
gpid = os.getpgid(pid)
|
||||||
|
print(f"Main pid={pid} gpid={gpid}")
|
||||||
|
assert proc.proc.pid is not None
|
||||||
|
gpid = os.getpgid(proc.proc.pid)
|
||||||
|
print(f"Child pid={proc.proc.pid} gpid={gpid}")
|
||||||
|
# os.killpg(gpid, signal.SIGKILL)
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user