add sudo_askpass_proxy

This commit is contained in:
Jörg Thalheim
2025-05-05 12:20:18 +02:00
parent 6839b9616d
commit cedc5113ea
10 changed files with 345 additions and 85 deletions

View File

@@ -19,6 +19,7 @@ from clan_lib.colors import AnsiColor
from clan_lib.errors import ClanError # Assuming these are available
from clan_lib.nix import nix_shell
from clan_lib.ssh.parse import parse_deployment_address
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
cmdlog = logging.getLogger(__name__)
@@ -39,7 +40,9 @@ class Remote:
verbose_ssh: bool = False
ssh_options: dict[str, str] = field(default_factory=dict)
tor_socks: bool = False
_control_path_dir: Path | None = None
_askpass_path: str | None = None
def __str__(self) -> str:
return self.target
@@ -124,11 +127,61 @@ class Remote:
_askpass_path=self._askpass_path,
)
@contextmanager
def become_root(self) -> Iterator["Remote"]:
"""
Context manager to set up sudo askpass proxy.
This will set up a proxy for sudo password prompts.
"""
if self.user == "root":
yield self
return
if (
os.environ.get("DISPLAY")
or os.environ.get("WAYLAND_DISPLAY")
or sys.platform == "darwin"
):
command = ["zenity", "--password", "--title", "%title%"]
dependencies = ["zenity"]
else:
command = [
"dialog",
"--stdout",
"--insecure",
"--title",
"%title%",
"--passwordbox",
"",
"10",
"50",
]
dependencies = ["dialog"]
proxy = SudoAskpassProxy(self, nix_shell(dependencies, command))
try:
askpass_path = proxy.run()
yield Remote(
address=self.address,
user=self.user,
command_prefix=self.command_prefix,
port=self.port,
private_key=self.private_key,
password=self.password,
forward_agent=self.forward_agent,
host_key_check=self.host_key_check,
verbose_ssh=self.verbose_ssh,
ssh_options=self.ssh_options,
tor_socks=self.tor_socks,
_control_path_dir=self._control_path_dir,
_askpass_path=askpass_path,
)
finally:
proxy.cleanup()
def run(
self,
cmd: list[str],
opts: RunOpts | None = None,
become_root: bool = False,
extra_env: dict[str, str] | None = None,
tty: bool = False,
verbose_ssh: bool = False,
@@ -144,9 +197,14 @@ class Remote:
if opts is None:
opts = RunOpts()
sudo = ""
if become_root and self.user != "root":
sudo = "sudo -- "
sudo = []
if self._askpass_path is not None:
sudo = [
f"SUDO_ASKPASS={shlex.quote(self._askpass_path)}",
"sudo",
"-A",
"--",
]
env_vars = []
for k, v in extra_env.items():
@@ -182,14 +240,20 @@ class Remote:
else:
bash_cmd += 'exec "$@"'
ssh_cmd_list = self.ssh_cmd(
verbose_ssh=verbose_ssh, tty=tty, control_master=control_master
)
ssh_cmd_list.extend(
["--", f"{sudo}bash -c {quote(bash_cmd)} -- {' '.join(map(quote, cmd))}"]
)
ssh_cmd = [
*self.ssh_cmd(
verbose_ssh=verbose_ssh, tty=tty, control_master=control_master
),
"--",
*sudo,
"bash",
"-c",
quote(bash_cmd),
"--",
" ".join(map(quote, cmd)),
]
return run(ssh_cmd_list, opts)
return run(ssh_cmd, opts)
def nix_ssh_env(
self,

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
import logging
import subprocess
import sys
# to get the the current tty state
import termios
import threading
from pathlib import Path
from typing import TYPE_CHECKING
from clan_lib.cmd import terminate_process
from clan_lib.errors import ClanError
if TYPE_CHECKING:
from clan_lib.ssh.remote import Remote
logger = logging.getLogger(__name__)
remote_script = (Path(__file__).parent / "sudo_askpass_proxy.sh").read_text()
class SudoAskpassProxy:
def __init__(self, host: "Remote", prompt_command: list[str]) -> None:
self.host = host
self.password_prompt_command = prompt_command
self.ssh_process: subprocess.Popen | None = None
self.thread: threading.Thread | None = None
def handle_password_request(self, prompt: str) -> str:
"""Get password from local user"""
try:
# Run the password prompt command
password_command = [
arg.replace("%title%", prompt) for arg in self.password_prompt_command
]
old_settings = None
if sys.stdin.isatty():
# If stdin is a tty, we can safely change terminal settings
old_settings = termios.tcgetattr(sys.stdin.fileno())
try:
logger.debug(
f"Running password prompt command: {' '.join(password_command)}"
)
password_process = subprocess.run(
password_command, text=True, check=False, stdout=subprocess.PIPE
)
finally: # dialog messes with the terminal settings, so we need to restore them
if old_settings is not None:
termios.tcsetattr(
sys.stdin.fileno(), termios.TCSADRAIN, old_settings
)
if password_process.returncode != 0:
return "CANCELED"
return password_process.stdout.strip()
except ClanError as e:
msg = f"Error running password prompt command: {e}"
raise ClanError(msg) from e
def _process(self, ssh_process: subprocess.Popen) -> None:
"""Execute the remote command with password proxying"""
# Monitor SSH output for password requests
assert ssh_process.stdout is not None, "SSH process stdout is None"
try:
for line in ssh_process.stdout:
line = line.strip()
if line.startswith("PASSWORD_REQUESTED:"):
prompt = line[len("PASSWORD_REQUESTED:") :].strip()
password = self.handle_password_request(prompt)
print(password, file=ssh_process.stdin)
assert ssh_process.stdin is not None, "SSH process stdin is None"
ssh_process.stdin.flush()
else:
print(line)
except Exception as e:
logger.error(f"Error processing passwords requests output: {e}")
def run(self) -> str:
"""Run the SSH command with password proxying. Returns the askpass script path."""
# Create a shell script to run on the remote host
# Start SSH process
cmd = [*self.host.ssh_cmd(), remote_script]
try:
self.ssh_process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)
except OSError as e:
msg = f"error connecting to {self.host.target}: {e}"
raise ClanError(msg) from e
# Monitor SSH output for password requests
assert self.ssh_process.stdout is not None, "SSH process stdout is None"
for line in self.ssh_process.stdout:
line = line.strip()
if line.startswith("ASKPASS_SCRIPT:"):
askpass_script = line[len("ASKPASS_SCRIPT:") :].strip()
break
else:
msg = f"Failed to create askpass script on {self.host.target}. Did not receive ASKPASS_SCRIPT line."
raise ClanError(msg)
self.thread = threading.Thread(
target=self._process, name="SudoAskpassProxy", args=(self.ssh_process,)
)
self.thread.start()
return askpass_script
def cleanup(self) -> None:
"""Terminate SSH process if still running"""
if self.ssh_process:
with terminate_process(self.ssh_process):
pass
# Unclear why we have to close this manually, but pytest reports unclosed fd
assert self.ssh_process.stdout is not None
self.ssh_process.stdout.close()
assert self.ssh_process.stdin is not None
self.ssh_process.stdin.close()
self.ssh_process = None
if self.thread:
self.thread.join()

View File

@@ -0,0 +1,37 @@
# shellcheck shell=bash
set -euo pipefail
# Create temporary directory
tmpdir=$(mktemp -d)
trap 'rm -rf "$tmpdir"' EXIT
# Create FIFOs
prompt_fifo="$tmpdir/prompt_fifo"
password_fifo="$tmpdir/password_fifo"
mkfifo -m600 "$prompt_fifo"
mkfifo -m600 "$password_fifo"
# Create askpass script
askpass_script="$tmpdir/askpass.sh"
cat >"$askpass_script" <<EOF
#!/bin/sh
set -eu
prompt="\${1:-[sudo] password:}"
echo "\$prompt" > "$prompt_fifo"
password=\$(head -n 1 "$password_fifo")
if [ "\$password" = "CANCELED" ]; then
exit 1
fi
echo "\$password"
EOF
chmod +x "$askpass_script"
echo "ASKPASS_SCRIPT: $askpass_script"
while read -r PROMPT < "$prompt_fifo"; do
echo "PASSWORD_REQUESTED: $PROMPT"
read -r password
echo ####################################### >&2
echo "$password" >"$password_fifo"
done