Improved spawn interface. Added garbage collector

This commit is contained in:
Qubasa
2023-12-19 17:51:43 +01:00
parent 9d877969c9
commit e0e7324876
4 changed files with 110 additions and 28 deletions

View File

@@ -5,10 +5,25 @@ from dataclasses import dataclass
from enum import Enum, member from enum import Enum, member
from pathlib import Path from pathlib import Path
from typing import Self from typing import Self
import urllib.request
from .errors import ClanError from .errors import ClanError
def url_ok(url: str):
# Create a request object with the URL and the HEAD method
req = urllib.request.Request(url, method="HEAD")
try:
# Open the URL and get the response object
res = urllib.request.urlopen(req)
# Return True if the status code is 200 (OK)
if not res.status_code == 200:
raise ClanError(f"URL has status code: {res.status_code}")
except urllib.error.URLError as ex:
raise ClanError(f"URL error: {ex}")
# Define an enum with different members that have different values # Define an enum with different members that have different values
class ClanScheme(Enum): class ClanScheme(Enum):
# Use the dataclass decorator to add fields and methods to the members # Use the dataclass decorator to add fields and methods to the members
@@ -88,6 +103,14 @@ class ClanURI:
case _: case _:
raise ClanError(f"Unsupported uri components: {comb}") raise ClanError(f"Unsupported uri components: {comb}")
def check_exits(self):
match self.scheme:
case ClanScheme.FILE.value(path):
if not path.exists():
raise ClanError(f"File does not exist: {path}")
case ClanScheme.HTTP.value(url):
return url_ok(url)
def get_internal(self) -> str: def get_internal(self) -> str:
match self.scheme: match self.scheme:
case ClanScheme.FILE.value(path): case ClanScheme.FILE.value(path):

View File

@@ -47,6 +47,7 @@ def list_history() -> list[HistoryEntry]:
def add_history(uri: ClanURI) -> list[HistoryEntry]: def add_history(uri: ClanURI) -> list[HistoryEntry]:
uri.check_exits()
user_history_file().parent.mkdir(parents=True, exist_ok=True) user_history_file().parent.mkdir(parents=True, exist_ok=True)
logs = list_history() logs = list_history()
found = False found = False

View File

@@ -125,9 +125,12 @@ def dummy_f(msg: str) -> None:
def show_run_vm(parser: argparse.ArgumentParser) -> None: def show_run_vm(parser: argparse.ArgumentParser) -> None:
from pathlib import Path
from .executor import spawn from .executor import spawn
proc = spawn(wait_stdin_connect=True, func=dummy_f, msg="Hello") log_path = Path(".").resolve()
proc = spawn(wait_stdin_con=True, log_path=log_path, func=dummy_f, msg="Hello")
input("Press enter to kill process: ") input("Press enter to kill process: ")
proc.kill_group() proc.kill_group()

View File

@@ -2,6 +2,7 @@ import os
import signal import signal
import sys import sys
import traceback import traceback
from pathlib import Path
from typing import Any from typing import Any
import gi import gi
@@ -11,15 +12,18 @@ gi.require_version("GdkPixbuf", "2.0")
import multiprocessing as mp import multiprocessing as mp
from collections.abc import Callable from collections.abc import Callable
OUT_FILE: Path | None = None
IN_FILE: Path | None = None
class MPProcess: class MPProcess:
def __init__( def __init__(
self, *, name: str, proc: mp.Process, out_file_name: str, in_file_name: str self, *, name: str, proc: mp.Process, out_file: Path, in_file: Path
) -> None: ) -> None:
self.name = name self.name = name
self.proc = proc self.proc = proc
self.out_file_name = out_file_name self.out_file = out_file
self.in_file_name = in_file_name self.in_file = in_file
# Kill the new process and all its children by sending a SIGTERM signal to the process group # Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None: def kill_group(self) -> None:
@@ -28,10 +32,35 @@ class MPProcess:
os.killpg(pid, signal.SIGTERM) os.killpg(pid, signal.SIGTERM)
def signal_handler(signum: int, frame: Any) -> None: def _set_proc_name(name: str) -> None:
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _signal_handler(signum: int, frame: Any) -> None:
signame = signal.strsignal(signum) signame = signal.strsignal(signum)
print("Signal received:", signame) print("Signal received:", signame)
# Delete files
if OUT_FILE is not None:
OUT_FILE.unlink()
if IN_FILE is not None:
IN_FILE.unlink()
# Restore the default handler # Restore the default handler
signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGTERM, signal.SIG_DFL)
@@ -39,32 +68,49 @@ def signal_handler(signum: int, frame: Any) -> None:
os.kill(os.getpid(), signum) os.kill(os.getpid(), signum)
def init_proc( def _init_proc(
func: Callable, out_file: str, in_file: str, wait_stdin_connect: bool, **kwargs: Any func: Callable,
out_file: Path,
in_file: Path,
wait_stdin_connect: bool,
proc_name: str,
**kwargs: Any,
) -> None: ) -> None:
# Set the global variables
global OUT_FILE, IN_FILE
OUT_FILE = out_file
IN_FILE = in_file
# Create a new process group
os.setsid() os.setsid()
out_fd = os.open(out_file, flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC) # Open stdout and stderr
out_fd = os.open(str(out_file), flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC)
os.dup2(out_fd, sys.stdout.fileno()) os.dup2(out_fd, sys.stdout.fileno())
os.dup2(out_fd, sys.stderr.fileno()) os.dup2(out_fd, sys.stderr.fileno())
# Print some information
pid = os.getpid() pid = os.getpid()
gpid = os.getpgid(pid=pid) gpid = os.getpgid(pid=pid)
print(f"Started new process pid={pid} gpid={gpid}") print(f"Started new process pid={pid} gpid={gpid}")
# Register the signal handler for SIGINT # Register the signal handler for SIGINT
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, _signal_handler)
# Set the process name
_set_proc_name(proc_name)
# Open stdin
flags = None flags = None
if wait_stdin_connect: if wait_stdin_connect:
print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr) print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr)
flags = os.O_RDONLY flags = os.O_RDONLY
else: else:
flags = os.O_RDONLY | os.O_NONBLOCK flags = os.O_RDONLY | os.O_NONBLOCK
in_fd = os.open(str(in_file), flags=flags)
in_fd = os.open(in_file, flags=flags)
os.dup2(in_fd, sys.stdin.fileno()) os.dup2(in_fd, sys.stdin.fileno())
# Execute the main function
print(f"Executing function {func.__name__} now", file=sys.stderr) print(f"Executing function {func.__name__} now", file=sys.stderr)
try: try:
func(**kwargs) func(**kwargs)
@@ -73,41 +119,50 @@ def init_proc(
pid = os.getpid() pid = os.getpid()
gpid = os.getpgid(pid=pid) gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}") print(f"Killing process group pid={pid} gpid={gpid}")
os.killpg(gpid, signal.SIGKILL) os.killpg(gpid, signal.SIGTERM)
def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess: def spawn(
*, wait_stdin_con: bool, log_path: Path, func: Callable, **kwargs: Any
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None: if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="spawn") mp.set_start_method(method="spawn")
# rand_name = str(uuid.uuid4())
rand_name = "test"
proc_name = f"MPExecutor:{rand_name}:{func.__name__}"
out_file_name = f"{rand_name}_out.txt"
in_file_name = f"{rand_name}_in.fifo"
if os.path.exists(in_file_name): # Set names
os.unlink(in_file_name) proc_name = f"MPExec:{func.__name__}"
os.mkfifo(in_file_name) out_file = log_path / "out.log"
in_file = log_path / "in.fifo"
# Create stdin fifo
if in_file.exists():
in_file.unlink()
os.mkfifo(in_file)
# Start the process
proc = mp.Process( proc = mp.Process(
target=init_proc, target=_init_proc,
args=(func, out_file_name, in_file_name, wait_stdin_connect), args=(func, out_file, in_file, wait_stdin_con, proc_name),
name=proc_name, name=proc_name,
kwargs=kwargs, kwargs=kwargs,
) )
proc.start() proc.start()
# Print some information
assert proc.pid is not None assert proc.pid is not None
print(f"Started process '{proc_name}'") print(f"Started process '{proc_name}'")
print(f"Arguments: {kwargs}") print(f"Arguments: {kwargs}")
if wait_stdin_connect: if wait_stdin_con:
cmd = f"cat - > {in_file_name}" cmd = f"cat - > {in_file}"
print(f"Connect to stdin with : {cmd}") print(f"Connect to stdin with : {cmd}")
cmd = f"tail -f {out_file_name}" cmd = f"tail -f {out_file}"
print(f"Connect to stdout with: {cmd}") print(f"Connect to stdout with: {cmd}")
# Return the process
mp_proc = MPProcess( mp_proc = MPProcess(
name=proc_name, name=proc_name,
proc=proc, proc=proc,
out_file_name=out_file_name, out_file=out_file,
in_file_name=in_file_name, in_file=in_file,
) )
return mp_proc return mp_proc