Removed unecessary debug.py and test code
This commit is contained in:
@@ -1,90 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import ipdb
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def block_for_input() -> None:
|
||||
log = logging.getLogger(__name__)
|
||||
procid = os.getpid()
|
||||
f"echo 'continue' > /sys/proc/{procid}/fd/{sys.stdin.fileno()}"
|
||||
|
||||
while True:
|
||||
log.warning("Use sudo cntr attach <pid> to attach to the container.")
|
||||
# log.warning("Resume execution by executing '%s' in cntr shell", command)
|
||||
time.sleep(1)
|
||||
log.info("Resuming execution.")
|
||||
|
||||
|
||||
def breakpoint_container(
|
||||
work_dir: Path,
|
||||
env: dict[str, str] | None = None,
|
||||
cmd: list[str] | None = None,
|
||||
) -> None:
|
||||
if env is None:
|
||||
env = os.environ.copy()
|
||||
else:
|
||||
env = env.copy()
|
||||
|
||||
dump_env(env, work_dir / "env.sh")
|
||||
|
||||
if cmd is not None:
|
||||
log.debug("Command: %s", shlex.join(cmd))
|
||||
mycommand = shlex.join(cmd)
|
||||
write_command(mycommand, work_dir / "cmd.sh")
|
||||
|
||||
block_for_input()
|
||||
|
||||
|
||||
def breakpoint_shell(
|
||||
work_dir: Path = Path(os.getcwd()),
|
||||
env: dict[str, str] | None = None,
|
||||
cmd: list[str] | None = None,
|
||||
) -> None:
|
||||
if env is None:
|
||||
env = os.environ.copy()
|
||||
else:
|
||||
env = env.copy()
|
||||
|
||||
# Cmd appending
|
||||
args = ["xterm", "-e", "zsh", "-df"]
|
||||
if cmd is not None:
|
||||
mycommand = shlex.join(cmd)
|
||||
write_command(mycommand, work_dir / "cmd.sh")
|
||||
proc = subprocess.Popen(args, env=env, cwd=work_dir.resolve())
|
||||
|
||||
with proc:
|
||||
try:
|
||||
ipdb.set_trace()
|
||||
finally:
|
||||
proc.terminate()
|
||||
|
||||
|
||||
def write_command(command: str, loc: Path) -> None:
|
||||
log.info("Dumping command to %s", loc)
|
||||
with open(loc, "w") as f:
|
||||
f.write("#!/usr/bin/env bash\n")
|
||||
f.write(command)
|
||||
st = os.stat(loc)
|
||||
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
||||
|
||||
|
||||
def dump_env(env: dict[str, str], loc: Path) -> None:
|
||||
cenv = env.copy()
|
||||
log.info("Dumping environment variables to %s", loc)
|
||||
with open(loc, "w") as f:
|
||||
f.write("#!/usr/bin/env bash\n")
|
||||
for k, v in cenv.items():
|
||||
if v.count("\n") > 0 or v.count('"') > 0 or v.count("'") > 0:
|
||||
continue
|
||||
f.write(f"export {k}='{v}'\n")
|
||||
st = os.stat(loc)
|
||||
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
||||
@@ -3,7 +3,6 @@ import argparse
|
||||
from .app import (
|
||||
register_join_parser,
|
||||
register_overview_parser,
|
||||
register_run_parser,
|
||||
show_overview,
|
||||
)
|
||||
|
||||
@@ -21,8 +20,6 @@ def main() -> None:
|
||||
|
||||
register_overview_parser(subparser.add_parser("overview", help="overview screen"))
|
||||
|
||||
register_run_parser(subparser.add_parser("run", help="run a vm"))
|
||||
|
||||
# Executed when no command is given
|
||||
parser.set_defaults(func=show_overview)
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -14,7 +14,7 @@ from clan_cli.clan_uri import ClanURI
|
||||
from gi.repository import Gio, Gtk
|
||||
|
||||
from .constants import constants
|
||||
from .executor import ProcessManager, spawn
|
||||
from .executor import ProcessManager
|
||||
from .interfaces import Callbacks, InitialFlashValues, InitialJoinValues
|
||||
from .windows.join import JoinWindow
|
||||
from .windows.overview import OverviewWindow
|
||||
@@ -163,33 +163,3 @@ def show_overview(args: argparse.Namespace) -> None:
|
||||
|
||||
def register_overview_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.set_defaults(func=show_overview)
|
||||
|
||||
|
||||
# Define a function that writes to the memfd
|
||||
def dummy_f(msg: str) -> None:
|
||||
import sys
|
||||
import time
|
||||
|
||||
print(f"Receeived message {msg}")
|
||||
c = 0
|
||||
while True: # Simulate a long running process
|
||||
print(f"out: Hello from process c={c}", file=sys.stdout)
|
||||
print(f"err: Hello from process c={c}", file=sys.stderr)
|
||||
user = input("Enter to continue: \n")
|
||||
if user == "q":
|
||||
raise Exception("User quit")
|
||||
print(f"User entered {user}", file=sys.stdout)
|
||||
print(f"User entered {user}", file=sys.stderr)
|
||||
time.sleep(1) # Wait for 1 second
|
||||
c += 1
|
||||
|
||||
|
||||
def show_run_vm(parser: argparse.ArgumentParser) -> None:
|
||||
log_path = Path(".").resolve()
|
||||
proc = spawn(wait_stdin_con=True, log_path=log_path, func=dummy_f, msg="Hello")
|
||||
input("Press enter to kill process: ")
|
||||
proc.kill_group()
|
||||
|
||||
|
||||
def register_run_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.set_defaults(func=show_run_vm)
|
||||
|
||||
Reference in New Issue
Block a user