Merge pull request 'clan_vm_manager: Partially working process executor with killpg' (#656) from Qubasa-main into main

This commit is contained in:
clan-bot
2023-12-19 17:05:21 +00:00
7 changed files with 240 additions and 10 deletions

View File

@@ -1,6 +1,7 @@
# Import the urllib.parse, enum and dataclasses modules # Import the urllib.parse, enum and dataclasses modules
import dataclasses import dataclasses
import urllib.parse import urllib.parse
import urllib.request
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, member from enum import Enum, member
from pathlib import Path from pathlib import Path
@@ -9,6 +10,19 @@ from typing import Self
from .errors import ClanError from .errors import ClanError
def url_ok(url: str) -> None:
# Create a request object with the URL and the HEAD method
req = urllib.request.Request(url, method="HEAD")
try:
# Open the URL and get the response object
res = urllib.request.urlopen(req)
# Return True if the status code is 200 (OK)
if not res.status_code == 200:
raise ClanError(f"URL has status code: {res.status_code}")
except urllib.error.URLError as ex:
raise ClanError(f"URL error: {ex}")
# Define an enum with different members that have different values # Define an enum with different members that have different values
class ClanScheme(Enum): class ClanScheme(Enum):
# Use the dataclass decorator to add fields and methods to the members # Use the dataclass decorator to add fields and methods to the members
@@ -84,16 +98,24 @@ class ClanURI:
case ("http" | "https", _, _, _, _, _): case ("http" | "https", _, _, _, _, _):
self.scheme = ClanScheme.HTTP.value(self._components.geturl()) # type: ignore self.scheme = ClanScheme.HTTP.value(self._components.geturl()) # type: ignore
case ("file", "", path, "", "", "") | ("", "", path, "", "", ""): # type: ignore case ("file", "", path, "", "", "") | ("", "", path, "", "", ""): # type: ignore
self.scheme = ClanScheme.FILE.value(Path(path)) # type: ignore self.scheme = ClanScheme.FILE.value(Path(path))
case _: case _:
raise ClanError(f"Unsupported uri components: {comb}") raise ClanError(f"Unsupported uri components: {comb}")
def check_exits(self) -> None:
match self.scheme:
case ClanScheme.FILE.value(path):
if not path.exists():
raise ClanError(f"File does not exist: {path}")
case ClanScheme.HTTP.value(url):
return url_ok(url)
def get_internal(self) -> str: def get_internal(self) -> str:
match self.scheme: match self.scheme:
case ClanScheme.FILE.value(path): case ClanScheme.FILE.value(path):
return str(path) # type: ignore return str(path)
case ClanScheme.HTTP.value(url): case ClanScheme.HTTP.value(url):
return url # type: ignore return url
case _: case _:
raise ClanError(f"Unsupported uri components: {self.scheme}") raise ClanError(f"Unsupported uri components: {self.scheme}")

View File

@@ -47,6 +47,7 @@ def list_history() -> list[HistoryEntry]:
def add_history(uri: ClanURI) -> list[HistoryEntry]: def add_history(uri: ClanURI) -> list[HistoryEntry]:
uri.check_exits()
user_history_file().parent.mkdir(parents=True, exist_ok=True) user_history_file().parent.mkdir(parents=True, exist_ok=True)
logs = list_history() logs = list_history()
found = False found = False

View File

@@ -30,6 +30,7 @@ warn_redundant_casts = true
disallow_untyped_calls = true disallow_untyped_calls = true
disallow_untyped_defs = true disallow_untyped_defs = true
no_implicit_optional = true no_implicit_optional = true
disable_error_code = ["has-type"]
exclude = "clan_cli.nixpkgs" exclude = "clan_cli.nixpkgs"
[[tool.mypy.overrides]] [[tool.mypy.overrides]]

View File

@@ -1,6 +1,11 @@
import argparse import argparse
from .app import register_join_parser, register_overview_parser, show_overview from .app import (
register_join_parser,
register_overview_parser,
register_run_parser,
show_overview,
)
def main() -> None: def main() -> None:
@@ -16,6 +21,8 @@ def main() -> None:
register_overview_parser(subparser.add_parser("overview", help="overview screen")) register_overview_parser(subparser.add_parser("overview", help="overview screen"))
register_run_parser(subparser.add_parser("run", help="run a vm"))
# Executed when no command is given # Executed when no command is given
parser.set_defaults(func=show_overview) parser.set_defaults(func=show_overview)
args = parser.parse_args() args = parser.parse_args()

View File

@@ -103,3 +103,37 @@ def show_overview(args: argparse.Namespace) -> None:
def register_overview_parser(parser: argparse.ArgumentParser) -> None: def register_overview_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=show_overview) parser.set_defaults(func=show_overview)
# Define a function that writes to the memfd
def dummy_f(msg: str) -> None:
import sys
import time
print(f"Receeived message {msg}")
c = 0
while True: # Simulate a long running process
print(f"out: Hello from process c={c}", file=sys.stdout)
print(f"err: Hello from process c={c}", file=sys.stderr)
user = input("Enter to continue: \n")
if user == "q":
raise Exception("User quit")
print(f"User entered {user}", file=sys.stdout)
print(f"User entered {user}", file=sys.stderr)
time.sleep(1) # Wait for 1 second
c += 1
def show_run_vm(parser: argparse.ArgumentParser) -> None:
from pathlib import Path
from .executor import spawn
log_path = Path(".").resolve()
proc = spawn(wait_stdin_con=True, log_path=log_path, func=dummy_f, msg="Hello")
input("Press enter to kill process: ")
proc.kill_group()
def register_run_parser(parser: argparse.ArgumentParser) -> None:
parser.set_defaults(func=show_run_vm)

View File

@@ -0,0 +1,168 @@
import os
import signal
import sys
import traceback
from pathlib import Path
from typing import Any
import gi
gi.require_version("GdkPixbuf", "2.0")
import multiprocessing as mp
from collections.abc import Callable
OUT_FILE: Path | None = None
IN_FILE: Path | None = None
class MPProcess:
def __init__(
self, *, name: str, proc: mp.Process, out_file: Path, in_file: Path
) -> None:
self.name = name
self.proc = proc
self.out_file = out_file
self.in_file = in_file
# Kill the new process and all its children by sending a SIGTERM signal to the process group
def kill_group(self) -> None:
pid = self.proc.pid
assert pid is not None
os.killpg(pid, signal.SIGTERM)
def _set_proc_name(name: str) -> None:
import ctypes
# Define the prctl function with the appropriate arguments and return type
libc = ctypes.CDLL("libc.so.6")
prctl = libc.prctl
prctl.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_ulong,
ctypes.c_ulong,
ctypes.c_ulong,
]
prctl.restype = ctypes.c_int
# Set the process name to "my_process"
prctl(15, name.encode(), 0, 0, 0)
def _signal_handler(signum: int, frame: Any) -> None:
signame = signal.strsignal(signum)
print("Signal received:", signame)
# Delete files
if OUT_FILE is not None:
OUT_FILE.unlink()
if IN_FILE is not None:
IN_FILE.unlink()
# Restore the default handler
signal.signal(signal.SIGTERM, signal.SIG_DFL)
# Re-raise the signal
os.kill(os.getpid(), signum)
def _init_proc(
func: Callable,
out_file: Path,
in_file: Path,
wait_stdin_connect: bool,
proc_name: str,
**kwargs: Any,
) -> None:
# Set the global variables
global OUT_FILE, IN_FILE
OUT_FILE = out_file
IN_FILE = in_file
# Create a new process group
os.setsid()
# Open stdout and stderr
out_fd = os.open(str(out_file), flags=os.O_RDWR | os.O_CREAT | os.O_TRUNC)
os.dup2(out_fd, sys.stdout.fileno())
os.dup2(out_fd, sys.stderr.fileno())
# Print some information
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Started new process pid={pid} gpid={gpid}")
# Register the signal handler for SIGINT
signal.signal(signal.SIGTERM, _signal_handler)
# Set the process name
_set_proc_name(proc_name)
# Open stdin
flags = None
if wait_stdin_connect:
print(f"Waiting for stdin connection on file {in_file}", file=sys.stderr)
flags = os.O_RDONLY
else:
flags = os.O_RDONLY | os.O_NONBLOCK
in_fd = os.open(str(in_file), flags=flags)
os.dup2(in_fd, sys.stdin.fileno())
# Execute the main function
print(f"Executing function {func.__name__} now", file=sys.stderr)
try:
func(**kwargs)
except Exception:
traceback.print_exc()
pid = os.getpid()
gpid = os.getpgid(pid=pid)
print(f"Killing process group pid={pid} gpid={gpid}")
os.killpg(gpid, signal.SIGTERM)
def spawn(
*, wait_stdin_con: bool, log_path: Path, func: Callable, **kwargs: Any
) -> MPProcess:
# Decouple the process from the parent
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="spawn")
# Set names
proc_name = f"MPExec:{func.__name__}"
out_file = log_path / "out.log"
in_file = log_path / "in.fifo"
# Create stdin fifo
if in_file.exists():
in_file.unlink()
os.mkfifo(in_file)
# Start the process
proc = mp.Process(
target=_init_proc,
args=(func, out_file, in_file, wait_stdin_con, proc_name),
name=proc_name,
kwargs=kwargs,
)
proc.start()
# Print some information
assert proc.pid is not None
print(f"Started process '{proc_name}'")
print(f"Arguments: {kwargs}")
if wait_stdin_con:
cmd = f"cat - > {in_file}"
print(f"Connect to stdin with : {cmd}")
cmd = f"tail -f {out_file}"
print(f"Connect to stdout with: {cmd}")
# Return the process
mp_proc = MPProcess(
name=proc_name,
proc=proc,
out_file=out_file,
in_file=in_file,
)
return mp_proc

View File

@@ -4,9 +4,10 @@ from pathlib import Path
from typing import Any from typing import Any
import gi import gi
from clan_cli import history, vms from clan_cli import history
gi.require_version("GdkPixbuf", "2.0") gi.require_version("GdkPixbuf", "2.0")
from gi.repository import GdkPixbuf from gi.repository import GdkPixbuf
from clan_vm_manager import assets from clan_vm_manager import assets
@@ -46,11 +47,7 @@ class VMBase:
def run(self) -> None: def run(self) -> None:
print(f"Running VM {self.name}") print(f"Running VM {self.name}")
import asyncio # vm = vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM")
# raise Exception("Cannot run VMs yet")
vm = asyncio.run(vms.run.inspect_vm(flake_url=self.url, flake_attr="defaultVM"))
vms.run.run_vm(vm)
@dataclass(frozen=True) @dataclass(frozen=True)