add ssh askpass implementation
This commit is contained in:
51
pkgs/clan-cli/clan_lib/ssh/password_prompt.py
Normal file
51
pkgs/clan-cli/clan_lib/ssh/password_prompt.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
"""
|
||||||
|
Password prompt utilities for SSH and sudo operations.
|
||||||
|
|
||||||
|
This module provides functions to create password prompts using either
|
||||||
|
GUI (zenity) or terminal (dialog) interfaces based on the environment.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from clan_lib.nix import nix_shell
|
||||||
|
|
||||||
|
|
||||||
|
def get_password_command(title: str = "SSH Password", message: str = "") -> list[str]:
|
||||||
|
"""
|
||||||
|
Determine the appropriate password prompt command based on environment.
|
||||||
|
|
||||||
|
This function checks if a GUI environment is available and selects either zenity (for GUI)
|
||||||
|
or dialog (for terminal) to create a password prompt. It then returns a command that will
|
||||||
|
execute the selected tool within a Nix shell with the necessary dependencies.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
title: Title for the password dialog
|
||||||
|
message: Optional message for the dialog (only used by dialog, not zenity)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A list of strings representing the shell command to execute the password prompt
|
||||||
|
"""
|
||||||
|
if (
|
||||||
|
os.environ.get("DISPLAY")
|
||||||
|
or os.environ.get("WAYLAND_DISPLAY")
|
||||||
|
or sys.platform == "darwin"
|
||||||
|
):
|
||||||
|
# GUI environment - use zenity
|
||||||
|
cmd = ["zenity", "--password", "--title", title]
|
||||||
|
dependencies = ["zenity"]
|
||||||
|
else:
|
||||||
|
# Terminal environment - use dialog
|
||||||
|
cmd = [
|
||||||
|
"dialog",
|
||||||
|
"--stdout",
|
||||||
|
"--insecure",
|
||||||
|
"--title",
|
||||||
|
title,
|
||||||
|
"--passwordbox",
|
||||||
|
message or "Enter password:",
|
||||||
|
"10",
|
||||||
|
"50",
|
||||||
|
]
|
||||||
|
dependencies = ["dialog"]
|
||||||
|
return nix_shell(dependencies, cmd)
|
||||||
@@ -20,6 +20,7 @@ from clan_lib.errors import ClanError # Assuming these are available
|
|||||||
from clan_lib.nix import nix_shell
|
from clan_lib.nix import nix_shell
|
||||||
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
|
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
|
||||||
from clan_lib.ssh.parse import parse_ssh_uri
|
from clan_lib.ssh.parse import parse_ssh_uri
|
||||||
|
from clan_lib.ssh.password_prompt import get_password_command
|
||||||
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
|
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
|
||||||
|
|
||||||
cmdlog = logging.getLogger(__name__)
|
cmdlog = logging.getLogger(__name__)
|
||||||
@@ -27,6 +28,9 @@ cmdlog = logging.getLogger(__name__)
|
|||||||
# Seconds until a message is printed when _run produces no output.
|
# Seconds until a message is printed when _run produces no output.
|
||||||
NO_OUTPUT_TIMEOUT = 20
|
NO_OUTPUT_TIMEOUT = 20
|
||||||
|
|
||||||
|
# Path to the SSH_ASKPASS script (direct path to the script)
|
||||||
|
SSH_ASKPASS_PATH = str(Path(__file__).parent / "ssh_askpass")
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True)
|
@dataclass(frozen=True)
|
||||||
class Remote:
|
class Remote:
|
||||||
@@ -183,27 +187,8 @@ class Remote:
|
|||||||
yield self
|
yield self
|
||||||
return
|
return
|
||||||
|
|
||||||
if (
|
cmd = get_password_command(title="%title%", message="")
|
||||||
os.environ.get("DISPLAY")
|
proxy = SudoAskpassProxy(self, cmd)
|
||||||
or os.environ.get("WAYLAND_DISPLAY")
|
|
||||||
or sys.platform == "darwin"
|
|
||||||
):
|
|
||||||
command = ["zenity", "--password", "--title", "%title%"]
|
|
||||||
dependencies = ["zenity"]
|
|
||||||
else:
|
|
||||||
command = [
|
|
||||||
"dialog",
|
|
||||||
"--stdout",
|
|
||||||
"--insecure",
|
|
||||||
"--title",
|
|
||||||
"%title%",
|
|
||||||
"--passwordbox",
|
|
||||||
"",
|
|
||||||
"10",
|
|
||||||
"50",
|
|
||||||
]
|
|
||||||
dependencies = ["dialog"]
|
|
||||||
proxy = SudoAskpassProxy(self, nix_shell(dependencies, command))
|
|
||||||
try:
|
try:
|
||||||
askpass_path = proxy.run()
|
askpass_path = proxy.run()
|
||||||
yield Remote(
|
yield Remote(
|
||||||
@@ -243,6 +228,12 @@ class Remote:
|
|||||||
if opts is None:
|
if opts is None:
|
||||||
opts = RunOpts()
|
opts = RunOpts()
|
||||||
|
|
||||||
|
if not self.password:
|
||||||
|
# Set up environment for SSH_ASKPASS if we don't have a password
|
||||||
|
opts.env = opts.env or os.environ.copy()
|
||||||
|
opts.env["SSH_ASKPASS"] = SSH_ASKPASS_PATH
|
||||||
|
opts.env["SSH_ASKPASS_REQUIRE"] = "force"
|
||||||
|
|
||||||
sudo = []
|
sudo = []
|
||||||
if self._askpass_path is not None:
|
if self._askpass_path is not None:
|
||||||
sudo = [
|
sudo = [
|
||||||
@@ -306,11 +297,14 @@ class Remote:
|
|||||||
env: dict[str, str] | None = None,
|
env: dict[str, str] | None = None,
|
||||||
control_master: bool = True,
|
control_master: bool = True,
|
||||||
) -> dict[str, str]:
|
) -> dict[str, str]:
|
||||||
|
"""Return environment variables for nix commands that use SSH."""
|
||||||
if env is None:
|
if env is None:
|
||||||
env = {}
|
env = {}
|
||||||
env["NIX_SSHOPTS"] = " ".join(
|
env["NIX_SSHOPTS"] = " ".join(self.ssh_cmd_opts(control_master=control_master))
|
||||||
self.ssh_cmd_opts(control_master=control_master) # Renamed
|
# Set SSH_ASKPASS environment variable if not using password directly
|
||||||
)
|
if not self.password:
|
||||||
|
env["SSH_ASKPASS"] = SSH_ASKPASS_PATH
|
||||||
|
env["SSH_ASKPASS_REQUIRE"] = "force"
|
||||||
return env
|
return env
|
||||||
|
|
||||||
def ssh_cmd_opts(
|
def ssh_cmd_opts(
|
||||||
@@ -360,6 +354,8 @@ class Remote:
|
|||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
packages = []
|
packages = []
|
||||||
password_args = []
|
password_args = []
|
||||||
|
|
||||||
|
# If we have a password, use sshpass for direct entry
|
||||||
if self.password:
|
if self.password:
|
||||||
packages.append("sshpass")
|
packages.append("sshpass")
|
||||||
password_args = ["sshpass", "-p", self.password]
|
password_args = ["sshpass", "-p", self.password]
|
||||||
@@ -419,8 +415,16 @@ class Remote:
|
|||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
def interactive_ssh(self) -> None:
|
def interactive_ssh(self) -> None:
|
||||||
|
"""Run an interactive SSH session to the remote host."""
|
||||||
cmd_list = self.ssh_cmd(tty=True, control_master=False)
|
cmd_list = self.ssh_cmd(tty=True, control_master=False)
|
||||||
res = subprocess.run(cmd_list, check=False)
|
|
||||||
|
# Set SSH_ASKPASS environment variable if no password is provided
|
||||||
|
env = os.environ.copy()
|
||||||
|
if not self.password:
|
||||||
|
env["SSH_ASKPASS"] = SSH_ASKPASS_PATH
|
||||||
|
env["SSH_ASKPASS_REQUIRE"] = "force"
|
||||||
|
|
||||||
|
res = subprocess.run(cmd_list, check=False, env=env)
|
||||||
|
|
||||||
self.check_sshpass_errorcode(res)
|
self.check_sshpass_errorcode(res)
|
||||||
|
|
||||||
|
|||||||
61
pkgs/clan-cli/clan_lib/ssh/ssh_askpass
Executable file
61
pkgs/clan-cli/clan_lib/ssh/ssh_askpass
Executable file
@@ -0,0 +1,61 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Clan SSH Askpass - Password prompt utility for SSH
|
||||||
|
|
||||||
|
This script can be used as SSH_ASKPASS for SSH password prompts when a terminal
|
||||||
|
is not available. It automatically selects the appropriate GUI or terminal-based
|
||||||
|
password prompt based on the environment.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
1. As SSH_ASKPASS environment variable:
|
||||||
|
$ export SSH_ASKPASS=/path/to/this/script
|
||||||
|
$ ssh-add
|
||||||
|
|
||||||
|
2. Directly (only for testing):
|
||||||
|
$ ./ssh_askpass
|
||||||
|
|
||||||
|
The script will prompt for a password using:
|
||||||
|
- Zenity for GUI environments (DISPLAY/WAYLAND_DISPLAY set or on macOS)
|
||||||
|
- Dialog for terminal environments (no display detected)
|
||||||
|
|
||||||
|
Dependencies are automatically installed via nix-shell as needed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Add parent directory to path when script is executed directly
|
||||||
|
script_dir = Path(__file__).parent
|
||||||
|
clan_cli_dir = script_dir.parent.parent # pkgs/clan-cli
|
||||||
|
if str(clan_cli_dir) not in sys.path:
|
||||||
|
sys.path.insert(0, str(clan_cli_dir))
|
||||||
|
|
||||||
|
# Import the actual implementation
|
||||||
|
from clan_lib.ssh.password_prompt import get_password_command
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
"""Execute password prompt and print password to stdout."""
|
||||||
|
try:
|
||||||
|
# Get the appropriate command for the environment
|
||||||
|
cmd = get_password_command()
|
||||||
|
|
||||||
|
# Run the command and capture the password
|
||||||
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, text=True, check=True)
|
||||||
|
|
||||||
|
# Print the password to stdout (will be read by SSH)
|
||||||
|
print(result.stdout.strip())
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
# User canceled the password dialog
|
||||||
|
return 1
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
return 1
|
||||||
|
else:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
@@ -7,7 +7,7 @@ name = "clan"
|
|||||||
description = "clan cli tool"
|
description = "clan cli tool"
|
||||||
dynamic = ["version"]
|
dynamic = ["version"]
|
||||||
scripts = { clan = "clan_cli:main" }
|
scripts = { clan = "clan_cli:main" }
|
||||||
license = { text = "MIT" }
|
license = "MIT"
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
Homepage = "https://clan.lol/"
|
Homepage = "https://clan.lol/"
|
||||||
@@ -29,6 +29,7 @@ clan_lib = [
|
|||||||
"clan_core_templates/**/*",
|
"clan_core_templates/**/*",
|
||||||
"**/allowed-packages.json",
|
"**/allowed-packages.json",
|
||||||
"ssh/*.sh",
|
"ssh/*.sh",
|
||||||
|
"ssh/ssh_askpass",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
|
|||||||
Reference in New Issue
Block a user