From 616ddca734b43e1c1e7f81230618ab0cd3673d33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Wed, 13 Nov 2024 12:25:20 +0100 Subject: [PATCH] cmd: also process stdin --- pkgs/clan-cli/clan_cli/cmd.py | 40 ++++++++++++++++++++++++------- pkgs/clan-cli/clan_cli/vms/run.py | 4 ++-- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/pkgs/clan-cli/clan_cli/cmd.py b/pkgs/clan-cli/clan_cli/cmd.py index a50e217ec..064e754d8 100644 --- a/pkgs/clan-cli/clan_cli/cmd.py +++ b/pkgs/clan-cli/clan_cli/cmd.py @@ -29,14 +29,17 @@ class Log(Enum): NONE = 4 -def handle_output(process: subprocess.Popen, log: Log) -> tuple[str, str]: +def handle_io( + process: subprocess.Popen, input_bytes: bytes | None, log: Log +) -> tuple[str, str]: rlist = [process.stdout, process.stderr] + wlist = [process.stdin] if input_bytes is not None else [] stdout_buf = b"" stderr_buf = b"" - while len(rlist) != 0: - readlist, _, _ = select.select(rlist, [], [], 0.1) - if len(readlist) == 0: # timeout in select + while len(rlist) != 0 or len(wlist) != 0: + readlist, writelist, _ = select.select(rlist, wlist, [], 0.1) + if len(readlist) == 0 and len(writelist) == 0: if process.poll() is None: continue # Process has exited @@ -62,12 +65,31 @@ def handle_output(process: subprocess.Popen, log: Log) -> tuple[str, str]: sys.stderr.buffer.write(ret) sys.stderr.flush() stderr_buf += ret + + if process.stdin in writelist: + if input_bytes: + try: + assert process.stdin is not None + written = os.write(process.stdin.fileno(), input_bytes) + except BrokenPipeError: + wlist.remove(process.stdin) + else: + input_bytes = input_bytes[written:] + if len(input_bytes) == 0: + wlist.remove(process.stdin) + process.stdin.flush() + process.stdin.close() + else: + wlist.remove(process.stdin) return stdout_buf.decode("utf-8", "replace"), stderr_buf.decode("utf-8", "replace") @contextmanager def terminate_process_group(process: subprocess.Popen) -> Iterator[None]: - process_group = os.getpgid(process.pid) + try: + process_group = os.getpgid(process.pid) + except ProcessLookupError: + return if process_group == os.getpgid(os.getpid()): msg = "Bug! Refusing to terminate the current process group" raise ClanError(msg) @@ -183,11 +205,13 @@ def run( start = timeit.default_timer() with ExitStack() as stack: + stdin = subprocess.PIPE if input is not None else None process = stack.enter_context( subprocess.Popen( cmd, cwd=str(cwd), env=env, + stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=not needs_user_terminal, @@ -200,10 +224,8 @@ def run( else: stack.enter_context(terminate_process_group(process)) - stdout_buf, stderr_buf = handle_output(process, log) - - if input: - process.communicate(input) + stdout_buf, stderr_buf = handle_io(process, input, log) + process.wait() global TIME_TABLE if TIME_TABLE: diff --git a/pkgs/clan-cli/clan_cli/vms/run.py b/pkgs/clan-cli/clan_cli/vms/run.py index f43613349..1af362be5 100644 --- a/pkgs/clan-cli/clan_cli/vms/run.py +++ b/pkgs/clan-cli/clan_cli/vms/run.py @@ -12,7 +12,7 @@ from dataclasses import dataclass from pathlib import Path from tempfile import TemporaryDirectory -from clan_cli.cmd import CmdOut, Log, handle_output, run +from clan_cli.cmd import CmdOut, Log, handle_io, run from clan_cli.completions import add_dynamic_completer, complete_machines from clan_cli.dirs import module_root, user_cache_dir, vm_state_dir from clan_cli.errors import ClanCmdError, ClanError @@ -339,7 +339,7 @@ def run_vm( ) as vm, ThreadPoolExecutor(max_workers=1) as executor, ): - future = executor.submit(handle_output, vm.process, Log.BOTH) + future = executor.submit(handle_io, vm.process, input_bytes=None, log=Log.BOTH) args: list[str] = vm.process.args # type: ignore[assignment] if runtime_config.command is not None: