Fix the ssh logging level. Currently the ssh commands is printed every time on an ssh connection. While seeing the command is useful, we should print this when running clan with the `--debug` flag.
547 lines
18 KiB
Python
547 lines
18 KiB
Python
import ipaddress
|
|
import logging
|
|
import os
|
|
import shlex
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from collections.abc import Iterator
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from shlex import quote
|
|
from tempfile import TemporaryDirectory
|
|
|
|
from clan_lib.api import API
|
|
from clan_lib.cmd import ClanCmdError, ClanCmdTimeoutError, CmdOut, RunOpts, run
|
|
from clan_lib.colors import AnsiColor
|
|
from clan_lib.errors import ClanError, indent_command # Assuming these are available
|
|
from clan_lib.nix import nix_shell
|
|
from clan_lib.ssh.host_key import HostKeyCheck, hostkey_to_ssh_opts
|
|
from clan_lib.ssh.parse import parse_ssh_uri
|
|
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
|
|
|
|
cmdlog = logging.getLogger(__name__)
|
|
|
|
# Seconds until a message is printed when _run produces no output.
|
|
NO_OUTPUT_TIMEOUT = 20
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Remote:
|
|
address: str
|
|
command_prefix: str
|
|
user: str = "root"
|
|
port: int | None = None
|
|
private_key: Path | None = None
|
|
password: str | None = None
|
|
forward_agent: bool = True
|
|
host_key_check: HostKeyCheck = "ask"
|
|
verbose_ssh: bool = False
|
|
ssh_options: dict[str, str] = field(default_factory=dict)
|
|
tor_socks: bool = False
|
|
|
|
_control_path_dir: Path | None = None
|
|
_askpass_path: str | None = None
|
|
|
|
def __str__(self) -> str:
|
|
return self.target
|
|
|
|
def is_ipv6(self) -> bool:
|
|
try:
|
|
return isinstance(ipaddress.ip_address(self.address), ipaddress.IPv6Address)
|
|
except ValueError:
|
|
return False
|
|
|
|
def override(
|
|
self,
|
|
*,
|
|
host_key_check: HostKeyCheck | None = None,
|
|
private_key: Path | None = None,
|
|
password: str | None = None,
|
|
tor_socks: bool | None = None,
|
|
command_prefix: str | None = None,
|
|
port: int | None = None,
|
|
ssh_options: dict[str, str] | None = None,
|
|
) -> "Remote":
|
|
"""
|
|
Returns a new Remote instance with the same data but with a different host_key_check.
|
|
"""
|
|
return Remote(
|
|
address=self.address,
|
|
user=self.user,
|
|
command_prefix=command_prefix or self.command_prefix,
|
|
port=port or self.port,
|
|
private_key=private_key if private_key is not None else self.private_key,
|
|
password=password if password is not None else self.password,
|
|
forward_agent=self.forward_agent,
|
|
host_key_check=(
|
|
host_key_check if host_key_check is not None else self.host_key_check
|
|
),
|
|
verbose_ssh=self.verbose_ssh,
|
|
ssh_options=ssh_options or self.ssh_options,
|
|
tor_socks=tor_socks if tor_socks is not None else self.tor_socks,
|
|
_control_path_dir=self._control_path_dir,
|
|
_askpass_path=self._askpass_path,
|
|
)
|
|
|
|
@property
|
|
def target(self) -> str:
|
|
return f"{self.user}@{self.address}"
|
|
|
|
@classmethod
|
|
def from_ssh_uri(
|
|
cls,
|
|
*,
|
|
machine_name: str,
|
|
address: str,
|
|
) -> "Remote":
|
|
"""
|
|
Parse a deployment address and return a Host object.
|
|
"""
|
|
|
|
return parse_ssh_uri(machine_name=machine_name, address=address)
|
|
|
|
def run_local(
|
|
self,
|
|
cmd: list[str],
|
|
opts: RunOpts | None = None,
|
|
extra_env: dict[str, str] | None = None,
|
|
) -> CmdOut:
|
|
"""
|
|
Command to run locally for the host
|
|
"""
|
|
if opts is None:
|
|
opts = RunOpts()
|
|
env = opts.env or os.environ.copy()
|
|
if extra_env:
|
|
env.update(extra_env)
|
|
displayed_cmd = " ".join(cmd)
|
|
cmdlog.info(
|
|
f"$ {displayed_cmd}",
|
|
extra={
|
|
"command_prefix": self.command_prefix,
|
|
"color": AnsiColor.GREEN.value,
|
|
},
|
|
)
|
|
opts.env = env
|
|
opts.prefix = self.command_prefix
|
|
return run(cmd, opts)
|
|
|
|
@contextmanager
|
|
def ssh_control_master(self) -> Iterator["Remote"]:
|
|
"""
|
|
Context manager to manage SSH ControlMaster connections.
|
|
This will create a temporary directory for the control socket.
|
|
"""
|
|
directory = None
|
|
if sys.platform == "darwin" and os.environ.get("TMPDIR", "").startswith(
|
|
"/var/folders/"
|
|
):
|
|
directory = "/tmp/"
|
|
with TemporaryDirectory(prefix="clan-ssh", dir=directory) as temp_dir:
|
|
remote = Remote(
|
|
address=self.address,
|
|
user=self.user,
|
|
command_prefix=self.command_prefix,
|
|
port=self.port,
|
|
private_key=self.private_key,
|
|
password=self.password,
|
|
forward_agent=self.forward_agent,
|
|
host_key_check=self.host_key_check,
|
|
verbose_ssh=self.verbose_ssh,
|
|
ssh_options=self.ssh_options,
|
|
tor_socks=self.tor_socks,
|
|
_control_path_dir=Path(temp_dir),
|
|
_askpass_path=self._askpass_path,
|
|
)
|
|
try:
|
|
yield remote
|
|
finally:
|
|
# Terminate the SSH master connection
|
|
socket_path = Path(temp_dir) / "socket"
|
|
if socket_path.exists():
|
|
try:
|
|
exit_cmd = [
|
|
"ssh",
|
|
"-o",
|
|
f"ControlPath={socket_path}",
|
|
"-O",
|
|
"exit",
|
|
]
|
|
exit_cmd.append(remote.target)
|
|
subprocess.run(exit_cmd, capture_output=True, timeout=5)
|
|
except (subprocess.TimeoutExpired, subprocess.CalledProcessError):
|
|
# If exit fails still try to stop the master connection
|
|
pass
|
|
|
|
@contextmanager
|
|
def become_root(self) -> Iterator["Remote"]:
|
|
"""
|
|
Context manager to set up sudo askpass proxy.
|
|
This will set up a proxy for sudo password prompts.
|
|
"""
|
|
if self.user == "root":
|
|
yield self
|
|
return
|
|
|
|
if (
|
|
os.environ.get("DISPLAY")
|
|
or os.environ.get("WAYLAND_DISPLAY")
|
|
or sys.platform == "darwin"
|
|
):
|
|
command = ["zenity", "--password", "--title", "%title%"]
|
|
dependencies = ["zenity"]
|
|
else:
|
|
command = [
|
|
"dialog",
|
|
"--stdout",
|
|
"--insecure",
|
|
"--title",
|
|
"%title%",
|
|
"--passwordbox",
|
|
"",
|
|
"10",
|
|
"50",
|
|
]
|
|
dependencies = ["dialog"]
|
|
proxy = SudoAskpassProxy(self, nix_shell(dependencies, command))
|
|
try:
|
|
askpass_path = proxy.run()
|
|
yield Remote(
|
|
address=self.address,
|
|
user=self.user,
|
|
command_prefix=self.command_prefix,
|
|
port=self.port,
|
|
private_key=self.private_key,
|
|
password=self.password,
|
|
forward_agent=self.forward_agent,
|
|
host_key_check=self.host_key_check,
|
|
verbose_ssh=self.verbose_ssh,
|
|
ssh_options=self.ssh_options,
|
|
tor_socks=self.tor_socks,
|
|
_control_path_dir=self._control_path_dir,
|
|
_askpass_path=askpass_path,
|
|
)
|
|
finally:
|
|
proxy.cleanup()
|
|
|
|
def run(
|
|
self,
|
|
cmd: list[str],
|
|
opts: RunOpts | None = None,
|
|
extra_env: dict[str, str] | None = None,
|
|
tty: bool = False,
|
|
verbose_ssh: bool = False,
|
|
quiet: bool = False,
|
|
control_master: bool = True,
|
|
) -> CmdOut:
|
|
"""
|
|
Internal method to run a command on the host via ssh.
|
|
`control_path_dir`: If provided, SSH ControlMaster options will be used.
|
|
"""
|
|
if extra_env is None:
|
|
extra_env = {}
|
|
if opts is None:
|
|
opts = RunOpts()
|
|
|
|
sudo = []
|
|
if self._askpass_path is not None:
|
|
sudo = [
|
|
f"SUDO_ASKPASS={shlex.quote(self._askpass_path)}",
|
|
"sudo",
|
|
"-A",
|
|
"--",
|
|
]
|
|
|
|
env_vars = []
|
|
for k, v in extra_env.items():
|
|
env_vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
|
|
|
if opts.prefix is None:
|
|
opts.prefix = self.command_prefix
|
|
opts.needs_user_terminal = True
|
|
if opts.cwd is not None:
|
|
msg = "cwd is not supported for remote commands"
|
|
raise ClanError(msg)
|
|
|
|
displayed_cmd = ""
|
|
export_cmd = ""
|
|
if env_vars:
|
|
export_cmd = f"export {' '.join(env_vars)}; "
|
|
displayed_cmd += export_cmd
|
|
displayed_cmd += " ".join(cmd)
|
|
|
|
if not quiet:
|
|
cmdlog.info(
|
|
f"$ {displayed_cmd}",
|
|
extra={
|
|
"command_prefix": self.command_prefix,
|
|
"color": AnsiColor.GREEN.value,
|
|
},
|
|
)
|
|
|
|
bash_cmd = export_cmd
|
|
if opts.shell:
|
|
bash_cmd += " ".join(cmd)
|
|
opts.shell = False
|
|
else:
|
|
bash_cmd += 'exec "$@"'
|
|
|
|
ssh_cmd = [
|
|
*self.ssh_cmd(
|
|
verbose_ssh=verbose_ssh, tty=tty, control_master=control_master
|
|
),
|
|
"--",
|
|
*sudo,
|
|
"bash",
|
|
"-c",
|
|
quote(bash_cmd),
|
|
"--",
|
|
" ".join(map(quote, cmd)),
|
|
]
|
|
|
|
return run(ssh_cmd, opts)
|
|
|
|
def nix_ssh_env(
|
|
self,
|
|
env: dict[str, str] | None = None,
|
|
control_master: bool = True,
|
|
) -> dict[str, str]:
|
|
if env is None:
|
|
env = {}
|
|
env["NIX_SSHOPTS"] = " ".join(
|
|
self.ssh_cmd_opts(control_master=control_master) # Renamed
|
|
)
|
|
return env
|
|
|
|
def ssh_cmd_opts(
|
|
self,
|
|
control_master: bool = True,
|
|
) -> list[str]:
|
|
ssh_opts = ["-A"] if self.forward_agent else []
|
|
if self.port:
|
|
ssh_opts.extend(["-p", str(self.port)])
|
|
for k, v in self.ssh_options.items():
|
|
ssh_opts.extend(["-o", f"{k}={shlex.quote(v)}"])
|
|
ssh_opts.extend(hostkey_to_ssh_opts(self.host_key_check))
|
|
if self.private_key:
|
|
ssh_opts.extend(["-i", str(self.private_key)])
|
|
|
|
if control_master:
|
|
if self._control_path_dir is None:
|
|
msg = "Bug! Control path directory is not set. Please use Remote.ssh_control_master() or set control_master to False."
|
|
raise ClanError(msg)
|
|
socket_path = self._control_path_dir / "socket"
|
|
ssh_opts.extend(
|
|
[
|
|
"-o",
|
|
"ControlMaster=auto",
|
|
"-o",
|
|
"ControlPersist=1m",
|
|
"-o",
|
|
f"ControlPath={socket_path}",
|
|
]
|
|
)
|
|
return ssh_opts
|
|
|
|
def ssh_url(self) -> str:
|
|
"""
|
|
Generates a standard SSH URL (ssh://[user@]host[:port]).
|
|
"""
|
|
url = "ssh://"
|
|
if self.user:
|
|
url += f"{self.user}@"
|
|
url += self.address
|
|
if self.port:
|
|
url += f":{self.port}"
|
|
return url
|
|
|
|
def ssh_cmd(
|
|
self, verbose_ssh: bool = False, tty: bool = False, control_master: bool = True
|
|
) -> list[str]:
|
|
packages = []
|
|
password_args = []
|
|
if self.password:
|
|
packages.append("sshpass")
|
|
password_args = ["sshpass", "-p", self.password]
|
|
|
|
current_ssh_opts = self.ssh_cmd_opts(control_master=control_master)
|
|
if verbose_ssh or self.verbose_ssh:
|
|
current_ssh_opts.extend(["-v"])
|
|
if tty:
|
|
current_ssh_opts.extend(["-t"])
|
|
|
|
if self.tor_socks:
|
|
packages.append("netcat")
|
|
current_ssh_opts.extend(
|
|
["-o", "ProxyCommand=nc -x 127.0.0.1:9050 -X 5 %h %p"]
|
|
)
|
|
|
|
cmd = [
|
|
*password_args,
|
|
"ssh",
|
|
self.target,
|
|
*current_ssh_opts,
|
|
]
|
|
return nix_shell(packages, cmd)
|
|
|
|
def check_sshpass_errorcode(self, res: subprocess.CompletedProcess) -> None:
|
|
"""
|
|
Check the return code of the sshpass command and raise an error if it indicates a failure.
|
|
Error codes are based on man sshpass(1) and may vary by version.
|
|
"""
|
|
if res.returncode == 0:
|
|
return
|
|
|
|
match res.returncode:
|
|
case 1:
|
|
msg = "Invalid command line argument"
|
|
raise ClanError(msg)
|
|
case 2:
|
|
msg = "Conflicting arguments given"
|
|
raise ClanError(msg)
|
|
case 3:
|
|
msg = "General runtime error"
|
|
raise ClanError(msg)
|
|
case 4:
|
|
msg = "Unrecognized response from ssh (parse error)"
|
|
raise ClanError(msg)
|
|
case 5:
|
|
msg = "Invalid/incorrect password"
|
|
raise ClanError(msg)
|
|
case 6:
|
|
msg = "Host public key is unknown. sshpass exits without confirming the new key. Try using --host-key-heck none"
|
|
raise ClanError(msg)
|
|
case 7:
|
|
msg = "IP public key changed. sshpass exits without confirming the new key."
|
|
raise ClanError(msg)
|
|
case _:
|
|
msg = f"SSH command failed with return code {res.returncode}"
|
|
raise ClanError(msg)
|
|
|
|
def interactive_ssh(self, command: list[str] | None = None) -> None:
|
|
ssh_cmd = self.ssh_cmd(tty=True, control_master=False)
|
|
if command:
|
|
ssh_cmd = [
|
|
*self.ssh_cmd(tty=True, control_master=False),
|
|
"--",
|
|
"bash",
|
|
"-c",
|
|
quote('exec "$@"'),
|
|
"--",
|
|
" ".join(map(quote, command)),
|
|
]
|
|
cmdlog.debug(
|
|
f"{indent_command(ssh_cmd)}",
|
|
extra={
|
|
"command_prefix": self.command_prefix,
|
|
"color": AnsiColor.GREEN.value,
|
|
},
|
|
)
|
|
res = subprocess.run(ssh_cmd, check=False)
|
|
|
|
# We only check the error code if a password is set, as sshpass is used.
|
|
# AS sshpass swallows all output.
|
|
if self.password:
|
|
self.check_sshpass_errorcode(res)
|
|
|
|
def check_machine_ssh_reachable(self) -> bool:
|
|
return check_machine_ssh_reachable(self).ok
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ConnectionOptions:
|
|
timeout: int = 2
|
|
retries: int = 5
|
|
|
|
|
|
@dataclass
|
|
class CheckResult:
|
|
ok: bool
|
|
reason: str | None = None
|
|
|
|
|
|
@API.register
|
|
def check_machine_ssh_login(
|
|
remote: Remote, opts: ConnectionOptions | None = None
|
|
) -> CheckResult:
|
|
"""Checks if a remote machine is reachable via SSH by attempting to run a simple command.
|
|
Args:
|
|
remote (Remote): The remote host to check for SSH login.
|
|
opts (ConnectionOptions, optional): Connection options such as timeout and number of retries.
|
|
If not provided, default values are used.
|
|
Returns:
|
|
CheckResult: An object indicating whether the SSH login is successful (`ok=True`) or not (`ok=False`),
|
|
and a reason if the check failed.
|
|
Usage:
|
|
result = check_machine_ssh_login(remote)
|
|
if result.ok:
|
|
print("SSH login successful")
|
|
else:
|
|
print(f"SSH login failed: {result.reason}")
|
|
"""
|
|
if opts is None:
|
|
opts = ConnectionOptions()
|
|
|
|
for _ in range(opts.retries):
|
|
with remote.ssh_control_master() as ssh:
|
|
try:
|
|
res = ssh.run(
|
|
["true"],
|
|
RunOpts(timeout=opts.timeout, needs_user_terminal=True),
|
|
)
|
|
return CheckResult(True)
|
|
except ClanCmdTimeoutError:
|
|
pass
|
|
except ClanCmdError as e:
|
|
if "Host key verification failed." in e.cmd.stderr:
|
|
raise ClanError(res.stderr.strip()) from e
|
|
else:
|
|
time.sleep(opts.timeout)
|
|
|
|
return CheckResult(False, f"failed after {opts.retries} attempts")
|
|
|
|
|
|
@API.register
|
|
def check_machine_ssh_reachable(
|
|
remote: Remote, opts: ConnectionOptions | None = None
|
|
) -> CheckResult:
|
|
"""
|
|
Checks if a remote machine is reachable via SSH by attempting to open a TCP connection
|
|
to the specified address and port.
|
|
Args:
|
|
remote (Remote): The remote host to check for SSH reachability.
|
|
opts (ConnectionOptions, optional): Connection options such as timeout and number of retries.
|
|
If not provided, default values are used.
|
|
Returns:
|
|
CheckResult: An object indicating whether the SSH port is reachable (`ok=True`) or not (`ok=False`),
|
|
and a reason if the check failed.
|
|
Usage:
|
|
result = check_machine_ssh_reachable(remote)
|
|
if result.ok:
|
|
print("SSH port is reachable")
|
|
print(f"SSH port is not reachable: {result.reason}")
|
|
"""
|
|
if opts is None:
|
|
opts = ConnectionOptions()
|
|
|
|
cmdlog.debug(
|
|
f"Checking SSH reachability for {remote.target} on port {remote.port or 22}",
|
|
)
|
|
|
|
address_family = socket.AF_INET6 if ":" in remote.address else socket.AF_INET
|
|
for _ in range(opts.retries):
|
|
with socket.socket(address_family, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(opts.timeout)
|
|
try:
|
|
sock.connect((remote.address, remote.port or 22))
|
|
return CheckResult(True)
|
|
except (TimeoutError, OSError):
|
|
pass
|
|
else:
|
|
time.sleep(opts.timeout)
|
|
|
|
return CheckResult(False, f"failed after {opts.retries} attempts")
|