From b53df4f8964f9610ed230c15b3ecbac1ce3b25e0 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Sun, 17 Dec 2023 22:18:52 +0100 Subject: [PATCH 1/6] clan_vm_manager: Partially working process executor with killpg --- .../clan_vm_manager/executor.py | 102 ++++++++++++++++++ .../clan-vm-manager/clan_vm_manager/models.py | 39 ++++++- 2 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 pkgs/clan-vm-manager/clan_vm_manager/executor.py diff --git a/pkgs/clan-vm-manager/clan_vm_manager/executor.py b/pkgs/clan-vm-manager/clan_vm_manager/executor.py new file mode 100644 index 000000000..d931b4061 --- /dev/null +++ b/pkgs/clan-vm-manager/clan_vm_manager/executor.py @@ -0,0 +1,102 @@ +import os +import signal +import sys +import traceback +from typing import Any + +import gi + +gi.require_version("GdkPixbuf", "2.0") + +import multiprocessing as mp +from collections.abc import Callable + + +class MPProcess: + def __init__( + self, *, name: str, proc: mp.Process, out_file_name: str, in_file_name: str + ) -> None: + self.name = name + self.proc = proc + self.out_file_name = out_file_name + self.in_file_name = in_file_name + + def kill_all(self) -> None: + pid = self.proc.pid + assert pid is not None + + # Get the process group ID of the new process + new_pgid = os.getpgid(pid) + # Kill the new process and all its children by sending a SIGTERM signal to the process group + os.killpg(new_pgid, signal.SIGTERM) + + # def get_all_output(self) -> str: + # os.lseek(self.out_fd, 0, os.SEEK_SET) + # return os.read(self.out_fd, 1024).decode("utf-8") + + # def write_all_input(self, input_str: str) -> None: + # os.lseek(self.in_fd, 0, os.SEEK_SET) + # os.write(self.in_fd, input_str.encode("utf-8")) + + +def init_proc( + func: Callable, out_file: str, in_file: str, wait_stdin_connect: bool, **kwargs: Any +) -> None: + os.setsid() + + out_fd = os.open(out_file, flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC) + os.dup2(out_fd, sys.stdout.fileno()) + os.dup2(out_fd, sys.stderr.fileno()) + + flags = None + if wait_stdin_connect: + print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr) + flags = os.O_RDONLY + else: + flags = os.O_RDONLY | os.O_NONBLOCK + + in_fd = os.open(in_file, flags=flags) + os.dup2(in_fd, sys.stdin.fileno()) + + print(f"Executing function {func.__name__} now", file=sys.stderr) + try: + func(**kwargs) + except Exception: + traceback.print_exc() + pid = os.getpid() + gpid = os.getpgid(pid=pid) + print(f"Killing process group pid={pid} gpid={gpid}") + os.killpg(gpid, signal.SIGKILL) + + +def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess: + if mp.get_start_method(allow_none=True) is None: + print("Setting start method to spawn") + mp.set_start_method(method="spawn") + # rand_name = str(uuid.uuid4()) + rand_name = "test" + proc_name = f"MPExecutor:{func.__name__}:{rand_name}" + out_file_name = f"{rand_name}_out.log" + in_file_name = f"{rand_name}_in.log" + + if os.path.exists(in_file_name): + os.unlink(in_file_name) + os.mkfifo(in_file_name) + + proc = mp.Process( + target=init_proc, + args=(func, out_file_name, in_file_name, wait_stdin_connect), + name=proc_name, + kwargs=kwargs, + ) + proc.start() + assert proc.pid is not None + print(f"Started process '{proc_name}'. pid={proc.pid} gpid={os.getpgid(proc.pid)}") + + mp_proc = MPProcess( + name=proc_name, + proc=proc, + out_file_name=out_file_name, + in_file_name=in_file_name, + ) + return mp_proc diff --git a/pkgs/clan-vm-manager/clan_vm_manager/models.py b/pkgs/clan-vm-manager/clan_vm_manager/models.py index 90967ce72..e2a62aa09 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/models.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/models.py @@ -4,14 +4,33 @@ from pathlib import Path from typing import Any import gi -from clan_cli import history, vms +from clan_cli import history gi.require_version("GdkPixbuf", "2.0") + from gi.repository import GdkPixbuf from clan_vm_manager import assets +# Define a function that writes to the memfd +def dummy_f() -> None: + import sys + import time + + c = 0 + while True: # Simulate a long running process + print(f"out: Hello from process c={c}", file=sys.stdout) + print(f"err: Hello from process c={c}", file=sys.stderr) + user = input("Enter to continue: \n") + if user == "q": + raise Exception("User quit") + print(f"User entered {user}", file=sys.stdout) + print(f"User entered {user}", file=sys.stderr) + time.sleep(1) # Wait for 1 second + c += 1 + + @dataclass(frozen=True) class VMBase: icon: Path | GdkPixbuf.Pixbuf @@ -46,11 +65,21 @@ class VMBase: def run(self) -> None: print(f"Running VM {self.name}") - import asyncio + # vm = vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM") + import os - # raise Exception("Cannot run VMs yet") - vm = asyncio.run(vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM")) - vms.run.run_vm(vm) + from .executor import spawn + + # proc = spawn(vms.run.run_vm, vm=vm) + proc = spawn(wait_stdin_connect=True, func=dummy_f) + + pid = os.getpid() + gpid = os.getpgid(pid) + print(f"Main pid={pid} gpid={gpid}") + assert proc.proc.pid is not None + gpid = os.getpgid(proc.proc.pid) + print(f"Child pid={proc.proc.pid} gpid={gpid}") + # os.killpg(gpid, signal.SIGKILL) @dataclass(frozen=True) From 62e8da8f416c03382a11a731712d8294d09cf367 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 19 Dec 2023 13:40:17 +0100 Subject: [PATCH 2/6] merge --- pkgs/clan-vm-manager/test_out.log | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100755 pkgs/clan-vm-manager/test_out.log diff --git a/pkgs/clan-vm-manager/test_out.log b/pkgs/clan-vm-manager/test_out.log new file mode 100755 index 000000000..8ffb59639 --- /dev/null +++ b/pkgs/clan-vm-manager/test_out.log @@ -0,0 +1,12 @@ +Waiting for stdin connection on file test_in.log +Executing function dummy_f now +out: Hello from process c=0 +err: Hello from process c=0 +Enter to continue: +Traceback (most recent call last): + File "/home/lhebendanz/Projects/clan-core/pkgs/clan-vm-manager/clan_vm_manager/executor.py", line 64, in init_proc + func(**kwargs) + File "/home/lhebendanz/Projects/clan-core/pkgs/clan-vm-manager/clan_vm_manager/models.py", line 26, in dummy_f + raise Exception("User quit") +Exception: User quit +Killing process group pid=547301 gpid=547301 From c64420f1917dfd37e68b54832fc5f40ff72e0474 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 19 Dec 2023 14:36:37 +0100 Subject: [PATCH 3/6] Added proc executor --- .../clan_vm_manager/__init__.py | 9 +++- pkgs/clan-vm-manager/clan_vm_manager/app.py | 45 +++++++++++++++++++ .../clan_vm_manager/executor.py | 5 +-- .../clan-vm-manager/clan_vm_manager/models.py | 32 ------------- pkgs/clan-vm-manager/test_out.log | 12 ----- 5 files changed, 55 insertions(+), 48 deletions(-) delete mode 100755 pkgs/clan-vm-manager/test_out.log diff --git a/pkgs/clan-vm-manager/clan_vm_manager/__init__.py b/pkgs/clan-vm-manager/clan_vm_manager/__init__.py index 8802affd1..25c75b085 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/__init__.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/__init__.py @@ -1,6 +1,11 @@ import argparse -from .app import register_join_parser, register_overview_parser, show_overview +from .app import ( + register_join_parser, + register_overview_parser, + register_run_parser, + show_overview, +) def main() -> None: @@ -16,6 +21,8 @@ def main() -> None: register_overview_parser(subparser.add_parser("overview", help="overview screen")) + register_run_parser(subparser.add_parser("run", help="run a vm")) + # Executed when no command is given parser.set_defaults(func=show_overview) args = parser.parse_args() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py index a1a787c00..22a8a86b7 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/app.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -103,3 +103,48 @@ def show_overview(args: argparse.Namespace) -> None: def register_overview_parser(parser: argparse.ArgumentParser) -> None: parser.set_defaults(func=show_overview) + + +# Define a function that writes to the memfd +def dummy_f() -> None: + import sys + import time + + c = 0 + while True: # Simulate a long running process + print(f"out: Hello from process c={c}", file=sys.stdout) + print(f"err: Hello from process c={c}", file=sys.stderr) + user = input("Enter to continue: \n") + if user == "q": + raise Exception("User quit") + print(f"User entered {user}", file=sys.stdout) + print(f"User entered {user}", file=sys.stderr) + time.sleep(1) # Wait for 1 second + c += 1 + + +def show_run_vm(parser: argparse.ArgumentParser) -> None: + import os + + from .executor import spawn + + print("Spawn process") + # proc = spawn(vms.run.run_vm, vm=vm) + proc = spawn(wait_stdin_connect=True, func=dummy_f) + + pid = os.getpid() + gpid = os.getpgid(pid) + print(f"Main pid={pid} gpid={gpid}") + assert proc.proc.pid is not None + gpid = os.getpgid(proc.proc.pid) + print(f"Child pid={proc.proc.pid} gpid={gpid}") + + +def register_run_parser(parser: argparse.ArgumentParser) -> None: + # parser.add_argument( + # "command", + # type=str, + # help="command to run", + # choices=["join", "overview"], + # ) + parser.set_defaults(func=show_run_vm) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/executor.py b/pkgs/clan-vm-manager/clan_vm_manager/executor.py index d931b4061..3f18ec16d 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/executor.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/executor.py @@ -33,7 +33,6 @@ class MPProcess: # def get_all_output(self) -> str: # os.lseek(self.out_fd, 0, os.SEEK_SET) # return os.read(self.out_fd, 1024).decode("utf-8") - # def write_all_input(self, input_str: str) -> None: # os.lseek(self.in_fd, 0, os.SEEK_SET) # os.write(self.in_fd, input_str.encode("utf-8")) @@ -76,8 +75,8 @@ def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProce # rand_name = str(uuid.uuid4()) rand_name = "test" proc_name = f"MPExecutor:{func.__name__}:{rand_name}" - out_file_name = f"{rand_name}_out.log" - in_file_name = f"{rand_name}_in.log" + out_file_name = f"{rand_name}_out.txt" + in_file_name = f"{rand_name}_in.fifo" if os.path.exists(in_file_name): os.unlink(in_file_name) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/models.py b/pkgs/clan-vm-manager/clan_vm_manager/models.py index e2a62aa09..c1b726fda 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/models.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/models.py @@ -13,24 +13,6 @@ from gi.repository import GdkPixbuf from clan_vm_manager import assets -# Define a function that writes to the memfd -def dummy_f() -> None: - import sys - import time - - c = 0 - while True: # Simulate a long running process - print(f"out: Hello from process c={c}", file=sys.stdout) - print(f"err: Hello from process c={c}", file=sys.stderr) - user = input("Enter to continue: \n") - if user == "q": - raise Exception("User quit") - print(f"User entered {user}", file=sys.stdout) - print(f"User entered {user}", file=sys.stderr) - time.sleep(1) # Wait for 1 second - c += 1 - - @dataclass(frozen=True) class VMBase: icon: Path | GdkPixbuf.Pixbuf @@ -66,20 +48,6 @@ class VMBase: def run(self) -> None: print(f"Running VM {self.name}") # vm = vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM") - import os - - from .executor import spawn - - # proc = spawn(vms.run.run_vm, vm=vm) - proc = spawn(wait_stdin_connect=True, func=dummy_f) - - pid = os.getpid() - gpid = os.getpgid(pid) - print(f"Main pid={pid} gpid={gpid}") - assert proc.proc.pid is not None - gpid = os.getpgid(proc.proc.pid) - print(f"Child pid={proc.proc.pid} gpid={gpid}") - # os.killpg(gpid, signal.SIGKILL) @dataclass(frozen=True) diff --git a/pkgs/clan-vm-manager/test_out.log b/pkgs/clan-vm-manager/test_out.log deleted file mode 100755 index 8ffb59639..000000000 --- a/pkgs/clan-vm-manager/test_out.log +++ /dev/null @@ -1,12 +0,0 @@ -Waiting for stdin connection on file test_in.log -Executing function dummy_f now -out: Hello from process c=0 -err: Hello from process c=0 -Enter to continue: -Traceback (most recent call last): - File "/home/lhebendanz/Projects/clan-core/pkgs/clan-vm-manager/clan_vm_manager/executor.py", line 64, in init_proc - func(**kwargs) - File "/home/lhebendanz/Projects/clan-core/pkgs/clan-vm-manager/clan_vm_manager/models.py", line 26, in dummy_f - raise Exception("User quit") -Exception: User quit -Killing process group pid=547301 gpid=547301 From 646671a89fc8fcbe55b1ab952579446240503429 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 19 Dec 2023 16:44:26 +0100 Subject: [PATCH 4/6] Fully working executor --- pkgs/clan-vm-manager/clan_vm_manager/app.py | 24 +++-------- .../clan_vm_manager/executor.py | 42 ++++++++++++------- 2 files changed, 32 insertions(+), 34 deletions(-) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py index 22a8a86b7..63f1e65dd 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/app.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -106,10 +106,11 @@ def register_overview_parser(parser: argparse.ArgumentParser) -> None: # Define a function that writes to the memfd -def dummy_f() -> None: +def dummy_f(msg: str) -> None: import sys import time + print(f"Receeived message {msg}") c = 0 while True: # Simulate a long running process print(f"out: Hello from process c={c}", file=sys.stdout) @@ -124,27 +125,12 @@ def dummy_f() -> None: def show_run_vm(parser: argparse.ArgumentParser) -> None: - import os - from .executor import spawn - print("Spawn process") - # proc = spawn(vms.run.run_vm, vm=vm) - proc = spawn(wait_stdin_connect=True, func=dummy_f) - - pid = os.getpid() - gpid = os.getpgid(pid) - print(f"Main pid={pid} gpid={gpid}") - assert proc.proc.pid is not None - gpid = os.getpgid(proc.proc.pid) - print(f"Child pid={proc.proc.pid} gpid={gpid}") + proc = spawn(wait_stdin_connect=True, func=dummy_f, msg="Hello") + input("Press enter to kill process: ") + proc.kill_group() def register_run_parser(parser: argparse.ArgumentParser) -> None: - # parser.add_argument( - # "command", - # type=str, - # help="command to run", - # choices=["join", "overview"], - # ) parser.set_defaults(func=show_run_vm) diff --git a/pkgs/clan-vm-manager/clan_vm_manager/executor.py b/pkgs/clan-vm-manager/clan_vm_manager/executor.py index 3f18ec16d..a0f181a22 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/executor.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/executor.py @@ -21,21 +21,22 @@ class MPProcess: self.out_file_name = out_file_name self.in_file_name = in_file_name - def kill_all(self) -> None: + # Kill the new process and all its children by sending a SIGTERM signal to the process group + def kill_group(self) -> None: pid = self.proc.pid assert pid is not None + os.killpg(pid, signal.SIGTERM) - # Get the process group ID of the new process - new_pgid = os.getpgid(pid) - # Kill the new process and all its children by sending a SIGTERM signal to the process group - os.killpg(new_pgid, signal.SIGTERM) - # def get_all_output(self) -> str: - # os.lseek(self.out_fd, 0, os.SEEK_SET) - # return os.read(self.out_fd, 1024).decode("utf-8") - # def write_all_input(self, input_str: str) -> None: - # os.lseek(self.in_fd, 0, os.SEEK_SET) - # os.write(self.in_fd, input_str.encode("utf-8")) +def signal_handler(signum: int, frame: Any) -> None: + signame = signal.strsignal(signum) + print("Signal received:", signame) + + # Restore the default handler + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + # Re-raise the signal + os.kill(os.getpid(), signum) def init_proc( @@ -47,6 +48,13 @@ def init_proc( os.dup2(out_fd, sys.stdout.fileno()) os.dup2(out_fd, sys.stderr.fileno()) + pid = os.getpid() + gpid = os.getpgid(pid=pid) + print(f"Started new process pid={pid} gpid={gpid}") + + # Register the signal handler for SIGINT + signal.signal(signal.SIGTERM, signal_handler) + flags = None if wait_stdin_connect: print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr) @@ -70,11 +78,10 @@ def init_proc( def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess: if mp.get_start_method(allow_none=True) is None: - print("Setting start method to spawn") mp.set_start_method(method="spawn") # rand_name = str(uuid.uuid4()) rand_name = "test" - proc_name = f"MPExecutor:{func.__name__}:{rand_name}" + proc_name = f"MPExecutor:{rand_name}:{func.__name__}" out_file_name = f"{rand_name}_out.txt" in_file_name = f"{rand_name}_in.fifo" @@ -90,8 +97,13 @@ def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProce ) proc.start() assert proc.pid is not None - print(f"Started process '{proc_name}'. pid={proc.pid} gpid={os.getpgid(proc.pid)}") - + print(f"Started process '{proc_name}'") + print(f"Arguments: {kwargs}") + if wait_stdin_connect: + cmd = f"cat - > {in_file_name}" + print(f"Connect to stdin with : {cmd}") + cmd = f"tail -f {out_file_name}" + print(f"Connect to stdout with: {cmd}") mp_proc = MPProcess( name=proc_name, proc=proc, From aaf2fd5569b0b753680f66b9767b2d0bd2edadba Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 19 Dec 2023 17:51:43 +0100 Subject: [PATCH 5/6] Improved spawn interface. Added garbage collector --- pkgs/clan-cli/clan_cli/clan_uri.py | 23 ++++ pkgs/clan-cli/clan_cli/history/add.py | 1 + pkgs/clan-vm-manager/clan_vm_manager/app.py | 5 +- .../clan_vm_manager/executor.py | 109 +++++++++++++----- 4 files changed, 110 insertions(+), 28 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/clan_uri.py b/pkgs/clan-cli/clan_cli/clan_uri.py index d4c1be403..c799b3c7c 100644 --- a/pkgs/clan-cli/clan_cli/clan_uri.py +++ b/pkgs/clan-cli/clan_cli/clan_uri.py @@ -5,10 +5,25 @@ from dataclasses import dataclass from enum import Enum, member from pathlib import Path from typing import Self +import urllib.request + from .errors import ClanError +def url_ok(url: str): + # Create a request object with the URL and the HEAD method + req = urllib.request.Request(url, method="HEAD") + try: + # Open the URL and get the response object + res = urllib.request.urlopen(req) + # Return True if the status code is 200 (OK) + if not res.status_code == 200: + raise ClanError(f"URL has status code: {res.status_code}") + except urllib.error.URLError as ex: + raise ClanError(f"URL error: {ex}") + + # Define an enum with different members that have different values class ClanScheme(Enum): # Use the dataclass decorator to add fields and methods to the members @@ -88,6 +103,14 @@ class ClanURI: case _: raise ClanError(f"Unsupported uri components: {comb}") + def check_exits(self): + match self.scheme: + case ClanScheme.FILE.value(path): + if not path.exists(): + raise ClanError(f"File does not exist: {path}") + case ClanScheme.HTTP.value(url): + return url_ok(url) + def get_internal(self) -> str: match self.scheme: case ClanScheme.FILE.value(path): diff --git a/pkgs/clan-cli/clan_cli/history/add.py b/pkgs/clan-cli/clan_cli/history/add.py index de334314f..a2c0fabd7 100644 --- a/pkgs/clan-cli/clan_cli/history/add.py +++ b/pkgs/clan-cli/clan_cli/history/add.py @@ -47,6 +47,7 @@ def list_history() -> list[HistoryEntry]: def add_history(uri: ClanURI) -> list[HistoryEntry]: + uri.check_exits() user_history_file().parent.mkdir(parents=True, exist_ok=True) logs = list_history() found = False diff --git a/pkgs/clan-vm-manager/clan_vm_manager/app.py b/pkgs/clan-vm-manager/clan_vm_manager/app.py index 63f1e65dd..aecf04af2 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/app.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/app.py @@ -125,9 +125,12 @@ def dummy_f(msg: str) -> None: def show_run_vm(parser: argparse.ArgumentParser) -> None: + from pathlib import Path + from .executor import spawn - proc = spawn(wait_stdin_connect=True, func=dummy_f, msg="Hello") + log_path = Path(".").resolve() + proc = spawn(wait_stdin_con=True, log_path=log_path, func=dummy_f, msg="Hello") input("Press enter to kill process: ") proc.kill_group() diff --git a/pkgs/clan-vm-manager/clan_vm_manager/executor.py b/pkgs/clan-vm-manager/clan_vm_manager/executor.py index a0f181a22..0c85ba0f1 100644 --- a/pkgs/clan-vm-manager/clan_vm_manager/executor.py +++ b/pkgs/clan-vm-manager/clan_vm_manager/executor.py @@ -2,6 +2,7 @@ import os import signal import sys import traceback +from pathlib import Path from typing import Any import gi @@ -11,15 +12,18 @@ gi.require_version("GdkPixbuf", "2.0") import multiprocessing as mp from collections.abc import Callable +OUT_FILE: Path | None = None +IN_FILE: Path | None = None + class MPProcess: def __init__( - self, *, name: str, proc: mp.Process, out_file_name: str, in_file_name: str + self, *, name: str, proc: mp.Process, out_file: Path, in_file: Path ) -> None: self.name = name self.proc = proc - self.out_file_name = out_file_name - self.in_file_name = in_file_name + self.out_file = out_file + self.in_file = in_file # Kill the new process and all its children by sending a SIGTERM signal to the process group def kill_group(self) -> None: @@ -28,10 +32,35 @@ class MPProcess: os.killpg(pid, signal.SIGTERM) -def signal_handler(signum: int, frame: Any) -> None: +def _set_proc_name(name: str) -> None: + import ctypes + + # Define the prctl function with the appropriate arguments and return type + libc = ctypes.CDLL("libc.so.6") + prctl = libc.prctl + prctl.argtypes = [ + ctypes.c_int, + ctypes.c_char_p, + ctypes.c_ulong, + ctypes.c_ulong, + ctypes.c_ulong, + ] + prctl.restype = ctypes.c_int + + # Set the process name to "my_process" + prctl(15, name.encode(), 0, 0, 0) + + +def _signal_handler(signum: int, frame: Any) -> None: signame = signal.strsignal(signum) print("Signal received:", signame) + # Delete files + if OUT_FILE is not None: + OUT_FILE.unlink() + if IN_FILE is not None: + IN_FILE.unlink() + # Restore the default handler signal.signal(signal.SIGTERM, signal.SIG_DFL) @@ -39,32 +68,49 @@ def signal_handler(signum: int, frame: Any) -> None: os.kill(os.getpid(), signum) -def init_proc( - func: Callable, out_file: str, in_file: str, wait_stdin_connect: bool, **kwargs: Any +def _init_proc( + func: Callable, + out_file: Path, + in_file: Path, + wait_stdin_connect: bool, + proc_name: str, + **kwargs: Any, ) -> None: + # Set the global variables + global OUT_FILE, IN_FILE + OUT_FILE = out_file + IN_FILE = in_file + + # Create a new process group os.setsid() - out_fd = os.open(out_file, flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC) + # Open stdout and stderr + out_fd = os.open(str(out_file), flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC) os.dup2(out_fd, sys.stdout.fileno()) os.dup2(out_fd, sys.stderr.fileno()) + # Print some information pid = os.getpid() gpid = os.getpgid(pid=pid) print(f"Started new process pid={pid} gpid={gpid}") # Register the signal handler for SIGINT - signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGTERM, _signal_handler) + # Set the process name + _set_proc_name(proc_name) + + # Open stdin flags = None if wait_stdin_connect: print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr) flags = os.O_RDONLY else: flags = os.O_RDONLY | os.O_NONBLOCK - - in_fd = os.open(in_file, flags=flags) + in_fd = os.open(str(in_file), flags=flags) os.dup2(in_fd, sys.stdin.fileno()) + # Execute the main function print(f"Executing function {func.__name__} now", file=sys.stderr) try: func(**kwargs) @@ -73,41 +119,50 @@ def init_proc( pid = os.getpid() gpid = os.getpgid(pid=pid) print(f"Killing process group pid={pid} gpid={gpid}") - os.killpg(gpid, signal.SIGKILL) + os.killpg(gpid, signal.SIGTERM) -def spawn(*, wait_stdin_connect: bool, func: Callable, **kwargs: Any) -> MPProcess: +def spawn( + *, wait_stdin_con: bool, log_path: Path, func: Callable, **kwargs: Any +) -> MPProcess: + # Decouple the process from the parent if mp.get_start_method(allow_none=True) is None: mp.set_start_method(method="spawn") - # rand_name = str(uuid.uuid4()) - rand_name = "test" - proc_name = f"MPExecutor:{rand_name}:{func.__name__}" - out_file_name = f"{rand_name}_out.txt" - in_file_name = f"{rand_name}_in.fifo" - if os.path.exists(in_file_name): - os.unlink(in_file_name) - os.mkfifo(in_file_name) + # Set names + proc_name = f"MPExec:{func.__name__}" + out_file = log_path / "out.log" + in_file = log_path / "in.fifo" + # Create stdin fifo + if in_file.exists(): + in_file.unlink() + os.mkfifo(in_file) + + # Start the process proc = mp.Process( - target=init_proc, - args=(func, out_file_name, in_file_name, wait_stdin_connect), + target=_init_proc, + args=(func, out_file, in_file, wait_stdin_con, proc_name), name=proc_name, kwargs=kwargs, ) proc.start() + + # Print some information assert proc.pid is not None print(f"Started process '{proc_name}'") print(f"Arguments: {kwargs}") - if wait_stdin_connect: - cmd = f"cat - > {in_file_name}" + if wait_stdin_con: + cmd = f"cat - > {in_file}" print(f"Connect to stdin with : {cmd}") - cmd = f"tail -f {out_file_name}" + cmd = f"tail -f {out_file}" print(f"Connect to stdout with: {cmd}") + + # Return the process mp_proc = MPProcess( name=proc_name, proc=proc, - out_file_name=out_file_name, - in_file_name=in_file_name, + out_file=out_file, + in_file=in_file, ) return mp_proc From 30d80a3b28ea853ecfc40f5968abadf441d9aeeb Mon Sep 17 00:00:00 2001 From: Qubasa Date: Tue, 19 Dec 2023 18:02:06 +0100 Subject: [PATCH 6/6] nix fmt --- pkgs/clan-cli/clan_cli/clan_uri.py | 13 ++++++------- pkgs/clan-cli/pyproject.toml | 1 + 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/clan_uri.py b/pkgs/clan-cli/clan_cli/clan_uri.py index c799b3c7c..f592b96d3 100644 --- a/pkgs/clan-cli/clan_cli/clan_uri.py +++ b/pkgs/clan-cli/clan_cli/clan_uri.py @@ -1,17 +1,16 @@ # Import the urllib.parse, enum and dataclasses modules import dataclasses import urllib.parse +import urllib.request from dataclasses import dataclass from enum import Enum, member from pathlib import Path from typing import Self -import urllib.request - from .errors import ClanError -def url_ok(url: str): +def url_ok(url: str) -> None: # Create a request object with the URL and the HEAD method req = urllib.request.Request(url, method="HEAD") try: @@ -99,11 +98,11 @@ class ClanURI: case ("http" | "https", _, _, _, _, _): self.scheme = ClanScheme.HTTP.value(self._components.geturl()) # type: ignore case ("file", "", path, "", "", "") | ("", "", path, "", "", ""): # type: ignore - self.scheme = ClanScheme.FILE.value(Path(path)) # type: ignore + self.scheme = ClanScheme.FILE.value(Path(path)) case _: raise ClanError(f"Unsupported uri components: {comb}") - def check_exits(self): + def check_exits(self) -> None: match self.scheme: case ClanScheme.FILE.value(path): if not path.exists(): @@ -114,9 +113,9 @@ class ClanURI: def get_internal(self) -> str: match self.scheme: case ClanScheme.FILE.value(path): - return str(path) # type: ignore + return str(path) case ClanScheme.HTTP.value(url): - return url # type: ignore + return url case _: raise ClanError(f"Unsupported uri components: {self.scheme}") diff --git a/pkgs/clan-cli/pyproject.toml b/pkgs/clan-cli/pyproject.toml index 05ce0a0a6..cd6fc12b2 100644 --- a/pkgs/clan-cli/pyproject.toml +++ b/pkgs/clan-cli/pyproject.toml @@ -30,6 +30,7 @@ warn_redundant_casts = true disallow_untyped_calls = true disallow_untyped_defs = true no_implicit_optional = true +disable_error_code = ["has-type"] exclude = "clan_cli.nixpkgs" [[tool.mypy.overrides]]