From e0e732487634a5c61c91d2a2b6c5751f92bf601f Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 19 Dec 2023 17:51:43 +0100 Subject: [PATCH] Improved spawn interface. Added garbage collector --- pkgs/clan-cli/clan_cli/clan_uri.py | 23 ++++ pkgs/clan-cli/clan_cli/history/add.py | 1 + pkgs/clan-vm-manager/clan_vm_manager/app.py | 5 +- .../clan_vm_manager/executor.py | 109 +++++++++++++----- 4 files changed, 110 insertions(+), 28 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/clan_uri.py b/pkgs/clan-cli/clan_cli/clan_uri.py index d4c1be403..c799b3c7c 100644 --- a/pkgs/clan-cli/clan_cli/clan_uri.py +++ b/pkgs/clan-cli/clan_cli/clan_uri.py @@ -5,10 +5,25 @@ from dataclasses import dataclass from enum import Enum, member from pathlib import Path from typing import Self +import urllib.request + from .errors import ClanError +def url_ok(url: str): + # Create a request object with the URL and the HEAD method + req = urllib.request.Request(url, method="HEAD") + try: + # Open the URL and get the response object + res = urllib.request.urlopen(req) + # Return True if the status code is 200 (OK) + if not res.status_code == 200: + raise ClanError(f"URL has status code: {res.status_code}") + except urllib.error.URLError as ex: + raise ClanError(f"URL error: {ex}") + + # Define an enum with different members that have different values class ClanScheme(Enum): # Use the dataclass decorator to add fields and methods to the members @@ -88,6 +103,14 @@ class ClanURI: case _: raise ClanError(f"Unsupported uri components: {comb}") + def check_exits(self): + match self.scheme: + case ClanScheme.FILE.value(path): + if not path.exists(): + raise ClanError(f"File does not exist: {path}") + case ClanScheme.HTTP.value(url): + return url_ok(url) + def get_internal(self) -> str: match self.scheme: case ClanScheme.FILE.value(path): diff --git a/pkgs/clan-cli/clan_cli/history/add.py b/pkgs/clan-cli/clan_cli/history/add.py index de334314f..a2c0fabd7 100644 --- a/pkgs/clan-cli/clan_cli/history/add.py +++ b/pkgs/clan-cli/clan_cli/history/add.py @@ -47,6 +47,7 @@ def list_history() -> list[HistoryEntry]: def add_history(uri: ClanURI) -> list[HistoryEntry]: + uri.check_exits() user_history_file().parent.mkdir(parents=True, exist_ok=True) logs = list_history() found = False diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py index 63f1e65dd..aecf04af2 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/app.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -125,9 +125,12 @@ def dummy_f(msg: str) -> None: def show_run_vm(parser: argparse.ArgumentParser) -> None: + from pathlib import Path + from .executor import spawn - proc = spawn(wait_stdin_connect=True, func=dummy_f, msg="Hello") + log_path = Path(".").resolve() + proc = spawn(wait_stdin_con=True, log_path=log_path, func=dummy_f, msg="Hello") input("Press enter to kill process: ") proc.kill_group() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/executor.py b/pkgs/clan-vm-manager/clan_vm_manager/executor.py index a0f181a22..0c85ba0f1 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/executor.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/executor.py @@ -2,6 +2,7 @@ import os import signal import sys import traceback +from pathlib import Path from typing import Any import gi @@ -11,15 +12,18 @@ gi.require_version("GdkPixbuf", "2.0") import multiprocessing as mp from collections.abc import Callable +OUT_FILE: Path | None = None +IN_FILE: Path | None = None + class MPProcess: def __init__( - self, *, name: str, proc: mp.Process, out_file_name: str, in_file_name: str + self, *, name: str, proc: mp.Process, out_file: Path, in_file: Path ) -> None: self.name = name self.proc = proc - self.out_file_name = out_file_name - self.in_file_name = in_file_name + self.out_file = out_file + self.in_file = in_file # Kill the new process and all its children by sending a SIGTERM signal to the process group def kill_group(self) -> None: @@ -28,10 +32,35 @@ class MPProcess: os.killpg(pid, signal.SIGTERM) -def signal_handler(signum: int, frame: Any) -> None: +def _set_proc_name(name: str) -> None: + import ctypes + + # Define the prctl function with the appropriate arguments and return type + libc = ctypes.CDLL("libc.so.6") + prctl = libc.prctl + prctl.argtypes = [ + ctypes.c_int, + ctypes.c_char_p, + ctypes.c_ulong, + ctypes.c_ulong, + ctypes.c_ulong, + ] + prctl.restype = ctypes.c_int + + # Set the process name to "my_process" + prctl(15, name.encode(), 0, 0, 0) + + +def _signal_handler(signum: int, frame: Any) -> None: signame = signal.strsignal(signum) print("Signal received:", signame) + # Delete files + if OUT_FILE is not None: + OUT_FILE.unlink() + if IN_FILE is not None: + IN_FILE.unlink() + # Restore the default handler signal.signal(signal.SIGTERM, signal.SIG_DFL) @@ -39,32 +68,49 @@ def signal_handler(signum: int, frame: Any) -> None: os.kill(os.getpid(), signum) -def init_proc( - func: Callable, out_file: str, in_file: str, wait_stdin_connect: bool, **kwargs: Any +def _init_proc( + func: Callable, + out_file: Path, + in_file: Path, + wait_stdin_connect: bool, + proc_name: str, + **kwargs: Any, ) -> None: + # Set the global variables + global OUT_FILE, IN_FILE + OUT_FILE = out_file + IN_FILE = in_file + + # Create a new process group os.setsid() - out_fd = os.open(out_file, flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC) + # Open stdout and stderr + out_fd = os.open(str(out_file), flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC) os.dup2(out_fd, sys.stdout.fileno()) os.dup2(out_fd, sys.stderr.fileno()) + # Print some information pid = os.getpid() gpid = os.getpgid(pid=pid) print(f"Started new process pid={pid} gpid={gpid}") # Register the signal handler for SIGINT - signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGTERM, _signal_handler) + # Set the process name + _set_proc_name(proc_name) + + # Open stdin flags = None if wait_stdin_connect: print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr) flags = os.O_RDONLY else: flags = os.O_RDONLY | os.O_NONBLOCK - - in_fd = os.open(in_file, flags=flags) + in_fd = os.open(str(in_file), flags=flags) os.dup2(in_fd, sys.stdin.fileno()) + # Execute the main function print(f"Executing function {func.__name__} now", file=sys.stderr) try: func(**kwargs) @@ -73,41 +119,50 @@ def init_proc( pid = os.getpid() gpid = os.getpgid(pid=pid) print(f"Killing process group pid={pid} gpid={gpid}") - os.killpg(gpid, signal.SIGKILL) + os.killpg(gpid, signal.SIGTERM) -def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess: +def spawn( + *, wait_stdin_con: bool, log_path: Path, func: Callable, **kwargs: Any +) -> MPProcess: + # Decouple the process from the parent if mp.get_start_method(allow_none=True) is None: mp.set_start_method(method="spawn") - # rand_name = str(uuid.uuid4()) - rand_name = "test" - proc_name = f"MPExecutor:{rand_name}:{func.__name__}" - out_file_name = f"{rand_name}_out.txt" - in_file_name = f"{rand_name}_in.fifo" - if os.path.exists(in_file_name): - os.unlink(in_file_name) - os.mkfifo(in_file_name) + # Set names + proc_name = f"MPExec:{func.__name__}" + out_file = log_path / "out.log" + in_file = log_path / "in.fifo" + # Create stdin fifo + if in_file.exists(): + in_file.unlink() + os.mkfifo(in_file) + + # Start the process proc = mp.Process( - target=init_proc, - args=(func, out_file_name, in_file_name, wait_stdin_connect), + target=_init_proc, + args=(func, out_file, in_file, wait_stdin_con, proc_name), name=proc_name, kwargs=kwargs, ) proc.start() + + # Print some information assert proc.pid is not None print(f"Started process '{proc_name}'") print(f"Arguments: {kwargs}") - if wait_stdin_connect: - cmd = f"cat - > {in_file_name}" + if wait_stdin_con: + cmd = f"cat - > {in_file}" print(f"Connect to stdin with : {cmd}") - cmd = f"tail -f {out_file_name}" + cmd = f"tail -f {out_file}" print(f"Connect to stdout with: {cmd}") + + # Return the process mp_proc = MPProcess( name=proc_name, proc=proc, - out_file_name=out_file_name, - in_file_name=in_file_name, + out_file=out_file, + in_file=in_file, ) return mp_proc