Fully working executor
This commit is contained in:
@@ -106,10 +106,11 @@ def register_overview_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
|
|
||||||
|
|
||||||
# Define a function that writes to the memfd
|
# Define a function that writes to the memfd
|
||||||
def dummy_f() -> None:
|
def dummy_f(msg: str) -> None:
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
print(f"Receeived message {msg}")
|
||||||
c = 0
|
c = 0
|
||||||
while True: # Simulate a long running process
|
while True: # Simulate a long running process
|
||||||
print(f"out: Hello from process c={c}", file=sys.stdout)
|
print(f"out: Hello from process c={c}", file=sys.stdout)
|
||||||
@@ -124,27 +125,12 @@ def dummy_f() -> None:
|
|||||||
|
|
||||||
|
|
||||||
def show_run_vm(parser: argparse.ArgumentParser) -> None:
|
def show_run_vm(parser: argparse.ArgumentParser) -> None:
|
||||||
import os
|
|
||||||
|
|
||||||
from .executor import spawn
|
from .executor import spawn
|
||||||
|
|
||||||
print("Spawn process")
|
proc = spawn(wait_stdin_connect=True, func=dummy_f, msg="Hello")
|
||||||
# proc = spawn(vms.run.run_vm, vm=vm)
|
input("Press enter to kill process: ")
|
||||||
proc = spawn(wait_stdin_connect=True, func=dummy_f)
|
proc.kill_group()
|
||||||
|
|
||||||
pid = os.getpid()
|
|
||||||
gpid = os.getpgid(pid)
|
|
||||||
print(f"Main pid={pid} gpid={gpid}")
|
|
||||||
assert proc.proc.pid is not None
|
|
||||||
gpid = os.getpgid(proc.proc.pid)
|
|
||||||
print(f"Child pid={proc.proc.pid} gpid={gpid}")
|
|
||||||
|
|
||||||
|
|
||||||
def register_run_parser(parser: argparse.ArgumentParser) -> None:
|
def register_run_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
# parser.add_argument(
|
|
||||||
# "command",
|
|
||||||
# type=str,
|
|
||||||
# help="command to run",
|
|
||||||
# choices=["join", "overview"],
|
|
||||||
# )
|
|
||||||
parser.set_defaults(func=show_run_vm)
|
parser.set_defaults(func=show_run_vm)
|
||||||
|
|||||||
@@ -21,21 +21,22 @@ class MPProcess:
|
|||||||
self.out_file_name = out_file_name
|
self.out_file_name = out_file_name
|
||||||
self.in_file_name = in_file_name
|
self.in_file_name = in_file_name
|
||||||
|
|
||||||
def kill_all(self) -> None:
|
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||||
|
def kill_group(self) -> None:
|
||||||
pid = self.proc.pid
|
pid = self.proc.pid
|
||||||
assert pid is not None
|
assert pid is not None
|
||||||
|
os.killpg(pid, signal.SIGTERM)
|
||||||
|
|
||||||
# Get the process group ID of the new process
|
|
||||||
new_pgid = os.getpgid(pid)
|
|
||||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
|
||||||
os.killpg(new_pgid, signal.SIGTERM)
|
|
||||||
|
|
||||||
# def get_all_output(self) -> str:
|
def signal_handler(signum: int, frame: Any) -> None:
|
||||||
# os.lseek(self.out_fd, 0, os.SEEK_SET)
|
signame = signal.strsignal(signum)
|
||||||
# return os.read(self.out_fd, 1024).decode("utf-8")
|
print("Signal received:", signame)
|
||||||
# def write_all_input(self, input_str: str) -> None:
|
|
||||||
# os.lseek(self.in_fd, 0, os.SEEK_SET)
|
# Restore the default handler
|
||||||
# os.write(self.in_fd, input_str.encode("utf-8"))
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||||
|
|
||||||
|
# Re-raise the signal
|
||||||
|
os.kill(os.getpid(), signum)
|
||||||
|
|
||||||
|
|
||||||
def init_proc(
|
def init_proc(
|
||||||
@@ -47,6 +48,13 @@ def init_proc(
|
|||||||
os.dup2(out_fd, sys.stdout.fileno())
|
os.dup2(out_fd, sys.stdout.fileno())
|
||||||
os.dup2(out_fd, sys.stderr.fileno())
|
os.dup2(out_fd, sys.stderr.fileno())
|
||||||
|
|
||||||
|
pid = os.getpid()
|
||||||
|
gpid = os.getpgid(pid=pid)
|
||||||
|
print(f"Started new process pid={pid} gpid={gpid}")
|
||||||
|
|
||||||
|
# Register the signal handler for SIGINT
|
||||||
|
signal.signal(signal.SIGTERM, signal_handler)
|
||||||
|
|
||||||
flags = None
|
flags = None
|
||||||
if wait_stdin_connect:
|
if wait_stdin_connect:
|
||||||
print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr)
|
print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr)
|
||||||
@@ -70,11 +78,10 @@ def init_proc(
|
|||||||
|
|
||||||
def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess:
|
def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess:
|
||||||
if mp.get_start_method(allow_none=True) is None:
|
if mp.get_start_method(allow_none=True) is None:
|
||||||
print("Setting start method to spawn")
|
|
||||||
mp.set_start_method(method="spawn")
|
mp.set_start_method(method="spawn")
|
||||||
# rand_name = str(uuid.uuid4())
|
# rand_name = str(uuid.uuid4())
|
||||||
rand_name = "test"
|
rand_name = "test"
|
||||||
proc_name = f"MPExecutor:{func.__name__}:{rand_name}"
|
proc_name = f"MPExecutor:{rand_name}:{func.__name__}"
|
||||||
out_file_name = f"{rand_name}_out.txt"
|
out_file_name = f"{rand_name}_out.txt"
|
||||||
in_file_name = f"{rand_name}_in.fifo"
|
in_file_name = f"{rand_name}_in.fifo"
|
||||||
|
|
||||||
@@ -90,8 +97,13 @@ def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProce
|
|||||||
)
|
)
|
||||||
proc.start()
|
proc.start()
|
||||||
assert proc.pid is not None
|
assert proc.pid is not None
|
||||||
print(f"Started process '{proc_name}'. pid={proc.pid} gpid={os.getpgid(proc.pid)}")
|
print(f"Started process '{proc_name}'")
|
||||||
|
print(f"Arguments: {kwargs}")
|
||||||
|
if wait_stdin_connect:
|
||||||
|
cmd = f"cat - > {in_file_name}"
|
||||||
|
print(f"Connect to stdin with : {cmd}")
|
||||||
|
cmd = f"tail -f {out_file_name}"
|
||||||
|
print(f"Connect to stdout with: {cmd}")
|
||||||
mp_proc = MPProcess(
|
mp_proc = MPProcess(
|
||||||
name=proc_name,
|
name=proc_name,
|
||||||
proc=proc,
|
proc=proc,
|
||||||
|
|||||||
Reference in New Issue
Block a user