clan-app: Initial move to process based api calling instead of thread based
This commit is contained in:
@@ -60,6 +60,10 @@ class ImplFunc(GObject.Object, Generic[P, B]):
|
||||
return result
|
||||
|
||||
|
||||
# TODO: Reimplement this such that it uses a multiprocessing.Array of type ctypes.c_char
|
||||
# all fn arguments are serialized to json and passed to the new process over the Array
|
||||
# the new process deserializes the json and calls the function
|
||||
# the result is serialized to json and passed back to the main process over another Array
|
||||
class MethodExecutor(threading.Thread):
|
||||
def __init__(
|
||||
self, function: Callable[..., Any], *args: Any, **kwargs: dict[str, Any]
|
||||
|
||||
@@ -54,6 +54,10 @@ class open_file(
|
||||
op_key=op_key, data=selected_paths, status="success"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.returns(
|
||||
SuccessDataClass(op_key=op_key, data=None, status="success")
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected files: {e}")
|
||||
|
||||
@@ -67,6 +71,10 @@ class open_file(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.returns(
|
||||
SuccessDataClass(op_key=op_key, data=None, status="success")
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected directory: {e}")
|
||||
|
||||
@@ -80,6 +88,10 @@ class open_file(
|
||||
op_key=op_key, data=selected_path, status="success"
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.returns(
|
||||
SuccessDataClass(op_key=op_key, data=None, status="success")
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"Error getting selected file: {e}")
|
||||
|
||||
|
||||
127
pkgs/clan-app/clan_app/components/executor.py
Normal file
127
pkgs/clan-app/clan_app/components/executor.py
Normal file
@@ -0,0 +1,127 @@
|
||||
import dataclasses
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import traceback
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||
def _kill_group(proc: mp.Process) -> None:
|
||||
pid = proc.pid
|
||||
if proc.is_alive() and pid:
|
||||
os.killpg(pid, signal.SIGTERM)
|
||||
else:
|
||||
log.warning(f"Process '{proc.name}' with pid '{pid}' is already dead")
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MPProcess:
|
||||
name: str
|
||||
proc: mp.Process
|
||||
out_file: Path
|
||||
|
||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||
def kill_group(self) -> None:
|
||||
_kill_group(proc=self.proc)
|
||||
|
||||
|
||||
def _set_proc_name(name: str) -> None:
|
||||
if sys.platform != "linux":
|
||||
return
|
||||
import ctypes
|
||||
|
||||
# Define the prctl function with the appropriate arguments and return type
|
||||
libc = ctypes.CDLL("libc.so.6")
|
||||
prctl = libc.prctl
|
||||
prctl.argtypes = [
|
||||
ctypes.c_int,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_ulong,
|
||||
ctypes.c_ulong,
|
||||
ctypes.c_ulong,
|
||||
]
|
||||
prctl.restype = ctypes.c_int
|
||||
|
||||
# Set the process name to "my_process"
|
||||
prctl(15, name.encode(), 0, 0, 0)
|
||||
|
||||
|
||||
def _init_proc(
|
||||
func: Callable,
|
||||
out_file: Path,
|
||||
proc_name: str,
|
||||
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
# Create a new process group
|
||||
os.setsid()
|
||||
|
||||
# Open stdout and stderr
|
||||
with open(out_file, "w") as out_fd:
|
||||
os.dup2(out_fd.fileno(), sys.stdout.fileno())
|
||||
os.dup2(out_fd.fileno(), sys.stderr.fileno())
|
||||
|
||||
# Print some information
|
||||
pid = os.getpid()
|
||||
gpid = os.getpgid(pid=pid)
|
||||
|
||||
# Set the process name
|
||||
_set_proc_name(proc_name)
|
||||
|
||||
# Close stdin
|
||||
sys.stdin.close()
|
||||
|
||||
linebreak = "=" * 5
|
||||
# Execute the main function
|
||||
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
|
||||
try:
|
||||
func(**kwargs)
|
||||
except Exception as ex:
|
||||
traceback.print_exc()
|
||||
if on_except is not None:
|
||||
on_except(ex, mp.current_process())
|
||||
|
||||
# Kill the new process and all its children by sending a SIGTERM signal to the process group
|
||||
pid = os.getpid()
|
||||
gpid = os.getpgid(pid=pid)
|
||||
print(f"Killing process group pid={pid} gpid={gpid}", file=sys.stderr)
|
||||
os.killpg(gpid, signal.SIGTERM)
|
||||
sys.exit(1)
|
||||
# Don't use a finally block here, because we want the exitcode to be set to
|
||||
# 0 if the function returns normally
|
||||
|
||||
|
||||
def spawn(
|
||||
*,
|
||||
out_file: Path,
|
||||
on_except: Callable[[Exception, mp.process.BaseProcess], None] | None,
|
||||
func: Callable,
|
||||
**kwargs: Any,
|
||||
) -> MPProcess:
|
||||
# Decouple the process from the parent
|
||||
if mp.get_start_method(allow_none=True) is None:
|
||||
mp.set_start_method(method="forkserver")
|
||||
|
||||
# Set names
|
||||
proc_name = f"MPExec:{func.__name__}"
|
||||
|
||||
# Start the process
|
||||
proc = mp.Process(
|
||||
target=_init_proc,
|
||||
args=(func, out_file, proc_name, on_except),
|
||||
name=proc_name,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
proc.start()
|
||||
|
||||
# Return the process
|
||||
mp_proc = MPProcess(name=proc_name, proc=proc, out_file=out_file)
|
||||
|
||||
return mp_proc
|
||||
@@ -36,10 +36,6 @@ disallow_untyped_calls = true
|
||||
disallow_untyped_defs = true
|
||||
no_implicit_optional = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = "argcomplete.*"
|
||||
ignore_missing_imports = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = "clan_cli.*"
|
||||
ignore_missing_imports = true
|
||||
|
||||
Reference in New Issue
Block a user