clan-cli: Use Remote class in DeployInfo, add tests for qrcode parser and json parser
This commit is contained in:
@@ -21,7 +21,6 @@ from clan_cli.completions import (
|
|||||||
from clan_cli.facts.generate import generate_facts
|
from clan_cli.facts.generate import generate_facts
|
||||||
from clan_cli.machines.hardware import HardwareConfig
|
from clan_cli.machines.hardware import HardwareConfig
|
||||||
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse
|
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse
|
||||||
from clan_cli.ssh.host_key import HostKeyCheck
|
|
||||||
from clan_cli.vars.generate import generate_vars
|
from clan_cli.vars.generate import generate_vars
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -156,7 +155,6 @@ def install_machine(opts: InstallOptions) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def install_command(args: argparse.Namespace) -> None:
|
def install_command(args: argparse.Namespace) -> None:
|
||||||
host_key_check = HostKeyCheck.from_str(args.host_key_check)
|
|
||||||
try:
|
try:
|
||||||
# Only if the caller did not specify a target_host via args.target_host
|
# Only if the caller did not specify a target_host via args.target_host
|
||||||
# Find a suitable target_host that is reachable
|
# Find a suitable target_host that is reachable
|
||||||
@@ -165,17 +163,17 @@ def install_command(args: argparse.Namespace) -> None:
|
|||||||
|
|
||||||
use_tor = False
|
use_tor = False
|
||||||
if deploy_info and not args.target_host:
|
if deploy_info and not args.target_host:
|
||||||
host = find_reachable_host(deploy_info, host_key_check)
|
host = find_reachable_host(deploy_info)
|
||||||
if host is None:
|
if host is None:
|
||||||
use_tor = True
|
use_tor = True
|
||||||
target_host = f"root@{deploy_info.tor}"
|
target_host = deploy_info.tor.target
|
||||||
else:
|
else:
|
||||||
target_host = host.target
|
target_host = host.target
|
||||||
|
|
||||||
if args.password:
|
if args.password:
|
||||||
password = args.password
|
password = args.password
|
||||||
elif deploy_info and deploy_info.pwd:
|
elif deploy_info and deploy_info.addrs[0].password:
|
||||||
password = deploy_info.pwd
|
password = deploy_info.addrs[0].password
|
||||||
else:
|
else:
|
||||||
password = None
|
password = None
|
||||||
|
|
||||||
|
|||||||
@@ -121,7 +121,7 @@ def deploy_machine(machine: Machine) -> None:
|
|||||||
upload_secrets(machine, sudo_host)
|
upload_secrets(machine, sudo_host)
|
||||||
upload_secret_vars(machine, sudo_host)
|
upload_secret_vars(machine, sudo_host)
|
||||||
|
|
||||||
path = upload_sources(machine, host)
|
path = upload_sources(machine, sudo_host)
|
||||||
|
|
||||||
nix_options = [
|
nix_options = [
|
||||||
"--show-trace",
|
"--show-trace",
|
||||||
|
|||||||
@@ -1,24 +1,19 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import ipaddress
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from clan_lib.async_run import AsyncRuntime
|
|
||||||
from clan_lib.cmd import run
|
from clan_lib.cmd import run
|
||||||
from clan_lib.errors import ClanError
|
from clan_lib.errors import ClanError
|
||||||
from clan_lib.machines.machines import Machine
|
|
||||||
from clan_lib.nix import nix_shell
|
from clan_lib.nix import nix_shell
|
||||||
from clan_lib.ssh.parse import parse_deployment_address
|
from clan_lib.ssh.remote import HostKeyCheck, Remote
|
||||||
from clan_lib.ssh.remote import Remote, is_ssh_reachable
|
|
||||||
|
|
||||||
from clan_cli.completions import (
|
from clan_cli.completions import (
|
||||||
add_dynamic_completer,
|
add_dynamic_completer,
|
||||||
complete_machines,
|
complete_machines,
|
||||||
)
|
)
|
||||||
from clan_cli.ssh.host_key import HostKeyCheck
|
|
||||||
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
|
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -26,46 +21,70 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class DeployInfo:
|
class DeployInfo:
|
||||||
addrs: list[str]
|
addrs: list[Remote]
|
||||||
tor: str | None = None
|
|
||||||
pwd: str | None = None
|
@property
|
||||||
|
def tor(self) -> Remote:
|
||||||
|
"""Return a list of Remote objects that are configured for Tor."""
|
||||||
|
addrs = [addr for addr in self.addrs if addr.tor_socks]
|
||||||
|
|
||||||
|
if not addrs:
|
||||||
|
msg = "No tor address provided, please provide a tor address."
|
||||||
|
raise ClanError(msg)
|
||||||
|
|
||||||
|
if len(addrs) > 1:
|
||||||
|
msg = "Multiple tor addresses provided, expected only one."
|
||||||
|
raise ClanError(msg)
|
||||||
|
return addrs[0]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_hostname(hostname: str, args: argparse.Namespace) -> "DeployInfo":
|
def from_hostnames(
|
||||||
m = Machine(hostname, flake=args.flake)
|
hostname: list[str], host_key_check: HostKeyCheck
|
||||||
return DeployInfo(addrs=[m.target_host_address])
|
) -> "DeployInfo":
|
||||||
|
remotes = []
|
||||||
|
for host in hostname:
|
||||||
|
if not host:
|
||||||
|
msg = "Hostname cannot be empty."
|
||||||
|
raise ClanError(msg)
|
||||||
|
remote = Remote.from_deployment_address(
|
||||||
|
machine_name="clan-installer",
|
||||||
|
address=host,
|
||||||
|
host_key_check=host_key_check,
|
||||||
|
)
|
||||||
|
remotes.append(remote)
|
||||||
|
return DeployInfo(addrs=remotes)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_json(data: dict[str, Any]) -> "DeployInfo":
|
def from_json(data: dict[str, Any], host_key_check: HostKeyCheck) -> "DeployInfo":
|
||||||
return DeployInfo(
|
addrs = []
|
||||||
tor=data.get("tor"), pwd=data.get("pass"), addrs=data.get("addrs", [])
|
password = data.get("pass")
|
||||||
|
|
||||||
|
for addr in data.get("addrs", []):
|
||||||
|
if isinstance(addr, str):
|
||||||
|
remote = Remote.from_deployment_address(
|
||||||
|
machine_name="clan-installer",
|
||||||
|
address=addr,
|
||||||
|
host_key_check=host_key_check,
|
||||||
|
password=password,
|
||||||
)
|
)
|
||||||
|
addrs.append(remote)
|
||||||
|
else:
|
||||||
def is_ipv6(ip: str) -> bool:
|
msg = f"Invalid address format: {addr}"
|
||||||
try:
|
raise ClanError(msg)
|
||||||
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
|
if tor_addr := data.get("tor"):
|
||||||
except ValueError:
|
remote = Remote.from_deployment_address(
|
||||||
return False
|
machine_name="clan-installer",
|
||||||
|
address=tor_addr,
|
||||||
|
host_key_check=host_key_check,
|
||||||
def find_reachable_host(
|
password=password,
|
||||||
deploy_info: DeployInfo, host_key_check: HostKeyCheck
|
tor_socks=True,
|
||||||
) -> Remote | None:
|
|
||||||
host = None
|
|
||||||
for addr in deploy_info.addrs:
|
|
||||||
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
|
|
||||||
host_ = parse_deployment_address(
|
|
||||||
machine_name="uknown", address=host_addr, host_key_check=host_key_check
|
|
||||||
)
|
)
|
||||||
if is_ssh_reachable(host_):
|
addrs.append(remote)
|
||||||
host = host_
|
|
||||||
break
|
|
||||||
|
|
||||||
return host
|
return DeployInfo(addrs=addrs)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
def qrcode_scan(picture_file: Path) -> str:
|
def from_qr_code(picture_file: Path, host_key_check: HostKeyCheck) -> "DeployInfo":
|
||||||
cmd = nix_shell(
|
cmd = nix_shell(
|
||||||
["zbar"],
|
["zbar"],
|
||||||
[
|
[
|
||||||
@@ -76,63 +95,74 @@ def qrcode_scan(picture_file: Path) -> str:
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
res = run(cmd)
|
res = run(cmd)
|
||||||
return res.stdout.strip()
|
data = res.stdout.strip()
|
||||||
|
return DeployInfo.from_json(json.loads(data), host_key_check=host_key_check)
|
||||||
|
|
||||||
|
|
||||||
def parse_qr_code(picture_file: Path) -> DeployInfo:
|
def find_reachable_host(deploy_info: DeployInfo) -> Remote | None:
|
||||||
data = qrcode_scan(picture_file)
|
host = None
|
||||||
return DeployInfo.from_json(json.loads(data))
|
for addr in deploy_info.addrs:
|
||||||
|
if addr.is_ssh_reachable():
|
||||||
|
host = addr
|
||||||
|
break
|
||||||
|
|
||||||
|
return host
|
||||||
|
|
||||||
|
|
||||||
def ssh_shell_from_deploy(
|
def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None:
|
||||||
deploy_info: DeployInfo, runtime: AsyncRuntime, host_key_check: HostKeyCheck
|
if host := find_reachable_host(deploy_info):
|
||||||
) -> None:
|
|
||||||
if host := find_reachable_host(deploy_info, host_key_check):
|
|
||||||
host.interactive_ssh()
|
host.interactive_ssh()
|
||||||
else:
|
return
|
||||||
|
|
||||||
log.info("Could not reach host via clearnet 'addrs'")
|
log.info("Could not reach host via clearnet 'addrs'")
|
||||||
log.info(f"Trying to reach host via tor '{deploy_info.tor}'")
|
log.info(f"Trying to reach host via tor '{deploy_info}'")
|
||||||
spawn_tor(runtime)
|
|
||||||
if not deploy_info.tor:
|
tor_addrs = [addr for addr in deploy_info.addrs if addr.tor_socks]
|
||||||
|
if not tor_addrs:
|
||||||
msg = "No tor address provided, please provide a tor address."
|
msg = "No tor address provided, please provide a tor address."
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)):
|
|
||||||
host = Remote(
|
with spawn_tor():
|
||||||
address=deploy_info.tor,
|
for tor_addr in tor_addrs:
|
||||||
user="root",
|
log.info(f"Trying to reach host via tor address: {tor_addr}")
|
||||||
password=deploy_info.pwd,
|
if ssh_tor_reachable(
|
||||||
tor_socks=True,
|
TorTarget(
|
||||||
command_prefix="tor",
|
onion=tor_addr.address, port=tor_addr.port if tor_addr.port else 22
|
||||||
)
|
)
|
||||||
else:
|
):
|
||||||
msg = "Could not reach host via tor either."
|
log.info(
|
||||||
raise ClanError(msg)
|
"Host reachable via tor address, starting interactive ssh session."
|
||||||
|
)
|
||||||
|
tor_addr.interactive_ssh()
|
||||||
|
return
|
||||||
|
|
||||||
|
log.error("Could not reach host via tor address.")
|
||||||
|
|
||||||
|
|
||||||
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
|
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
|
||||||
|
host_key_check = HostKeyCheck.from_str(args.host_key_check)
|
||||||
if args.json:
|
if args.json:
|
||||||
json_file = Path(args.json)
|
json_file = Path(args.json)
|
||||||
if json_file.is_file():
|
if json_file.is_file():
|
||||||
data = json.loads(json_file.read_text())
|
data = json.loads(json_file.read_text())
|
||||||
return DeployInfo.from_json(data)
|
return DeployInfo.from_json(data, host_key_check)
|
||||||
data = json.loads(args.json)
|
data = json.loads(args.json)
|
||||||
return DeployInfo.from_json(data)
|
return DeployInfo.from_json(data, host_key_check)
|
||||||
if args.png:
|
if args.png:
|
||||||
return parse_qr_code(Path(args.png))
|
return DeployInfo.from_qr_code(Path(args.png), host_key_check)
|
||||||
|
|
||||||
if hasattr(args, "machines"):
|
if hasattr(args, "machines"):
|
||||||
return DeployInfo.from_hostname(args.machines[0], args)
|
return DeployInfo.from_hostnames(args.machines, host_key_check)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def ssh_command(args: argparse.Namespace) -> None:
|
def ssh_command(args: argparse.Namespace) -> None:
|
||||||
host_key_check = HostKeyCheck.from_str(args.host_key_check)
|
|
||||||
deploy_info = ssh_command_parse(args)
|
deploy_info = ssh_command_parse(args)
|
||||||
if not deploy_info:
|
if not deploy_info:
|
||||||
msg = "No MACHINE, --json or --png data provided"
|
msg = "No MACHINE, --json or --png data provided"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
with AsyncRuntime() as runtime:
|
ssh_shell_from_deploy(deploy_info)
|
||||||
ssh_shell_from_deploy(deploy_info, runtime, host_key_check)
|
|
||||||
|
|
||||||
|
|
||||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
@@ -157,13 +187,10 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
"--png",
|
"--png",
|
||||||
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
|
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--host-key-check",
|
"--host-key-check",
|
||||||
choices=["strict", "ask", "tofu", "none"],
|
choices=["strict", "ask", "tofu", "none"],
|
||||||
default="ask",
|
default="tofu",
|
||||||
help="Host key (.ssh/known_hosts) check mode.",
|
help="Host key (.ssh/known_hosts) check mode.",
|
||||||
)
|
)
|
||||||
parser.set_defaults(func=ssh_command)
|
parser.set_defaults(func=ssh_command)
|
||||||
|
|||||||
81
pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py
Normal file
81
pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from clan_lib.cmd import RunOpts, run
|
||||||
|
from clan_lib.nix import nix_shell
|
||||||
|
from clan_lib.ssh.remote import HostKeyCheck, Remote
|
||||||
|
|
||||||
|
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host
|
||||||
|
|
||||||
|
|
||||||
|
def test_qrcode_scan(temp_dir: Path) -> None:
|
||||||
|
data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}'
|
||||||
|
img_path = temp_dir / "qrcode.png"
|
||||||
|
cmd = nix_shell(
|
||||||
|
["qrencode"],
|
||||||
|
[
|
||||||
|
"qrencode",
|
||||||
|
"-o",
|
||||||
|
str(img_path),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
run(cmd, RunOpts(input=data.encode()))
|
||||||
|
|
||||||
|
# Call the qrcode_scan function
|
||||||
|
deploy_info = DeployInfo.from_qr_code(img_path, HostKeyCheck.NONE)
|
||||||
|
|
||||||
|
host = deploy_info.addrs[0]
|
||||||
|
assert host.address == "192.168.122.86"
|
||||||
|
assert host.user == "root"
|
||||||
|
assert host.password == "scabbed-defender-headlock"
|
||||||
|
|
||||||
|
tor_host = deploy_info.addrs[1]
|
||||||
|
assert (
|
||||||
|
tor_host.address
|
||||||
|
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||||
|
)
|
||||||
|
assert tor_host.tor_socks is True
|
||||||
|
assert tor_host.password == "scabbed-defender-headlock"
|
||||||
|
assert tor_host.user == "root"
|
||||||
|
assert (
|
||||||
|
tor_host.address
|
||||||
|
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_from_json() -> None:
|
||||||
|
data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}'
|
||||||
|
deploy_info = DeployInfo.from_json(json.loads(data), HostKeyCheck.NONE)
|
||||||
|
|
||||||
|
host = deploy_info.addrs[0]
|
||||||
|
assert host.password == "scabbed-defender-headlock"
|
||||||
|
assert host.address == "192.168.122.86"
|
||||||
|
|
||||||
|
tor_host = deploy_info.addrs[1]
|
||||||
|
assert (
|
||||||
|
tor_host.address
|
||||||
|
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||||
|
)
|
||||||
|
assert tor_host.tor_socks is True
|
||||||
|
assert tor_host.password == "scabbed-defender-headlock"
|
||||||
|
assert tor_host.user == "root"
|
||||||
|
assert (
|
||||||
|
tor_host.address
|
||||||
|
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.with_core
|
||||||
|
def test_find_reachable_host(hosts: list[Remote]) -> None:
|
||||||
|
host = hosts[0]
|
||||||
|
deploy_info = DeployInfo.from_hostnames(
|
||||||
|
["172.19.1.2", host.ssh_url()], HostKeyCheck.NONE
|
||||||
|
)
|
||||||
|
|
||||||
|
assert deploy_info.addrs[0].address == "172.19.1.2"
|
||||||
|
|
||||||
|
remote = find_reachable_host(deploy_info=deploy_info)
|
||||||
|
|
||||||
|
assert remote is not None
|
||||||
|
assert remote.ssh_url() == host.ssh_url()
|
||||||
@@ -5,10 +5,11 @@ import logging
|
|||||||
import socket
|
import socket
|
||||||
import struct
|
import struct
|
||||||
import time
|
import time
|
||||||
|
from collections.abc import Iterator
|
||||||
|
from contextlib import contextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from subprocess import Popen
|
||||||
|
|
||||||
from clan_lib.async_run import AsyncRuntime
|
|
||||||
from clan_lib.cmd import Log, RunOpts, run
|
|
||||||
from clan_lib.errors import TorConnectionError, TorSocksError
|
from clan_lib.errors import TorConnectionError, TorSocksError
|
||||||
from clan_lib.nix import nix_shell
|
from clan_lib.nix import nix_shell
|
||||||
|
|
||||||
@@ -108,32 +109,31 @@ def is_tor_running() -> bool:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def spawn_tor(runtime: AsyncRuntime) -> None:
|
@contextmanager
|
||||||
|
def spawn_tor() -> Iterator[None]:
|
||||||
"""
|
"""
|
||||||
Spawns a Tor process using `nix-shell` if Tor is not already running.
|
Spawns a Tor process using `nix-shell` if Tor is not already running.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def start_tor() -> None:
|
|
||||||
"""Starts Tor process using nix-shell."""
|
|
||||||
cmd_args = ["tor", "--HardwareAccel", "1"]
|
|
||||||
packages = ["tor"]
|
|
||||||
cmd = nix_shell(packages, cmd_args)
|
|
||||||
runtime.async_run(None, run, cmd, RunOpts(log=Log.BOTH))
|
|
||||||
log.debug("Attempting to start Tor")
|
|
||||||
|
|
||||||
# Check if Tor is already running
|
# Check if Tor is already running
|
||||||
if is_tor_running():
|
if is_tor_running():
|
||||||
log.info("Tor is running")
|
log.info("Tor is running")
|
||||||
return
|
return
|
||||||
|
cmd_args = ["tor", "--HardwareAccel", "1"]
|
||||||
# Attempt to start Tor
|
packages = ["tor"]
|
||||||
start_tor()
|
cmd = nix_shell(packages, cmd_args)
|
||||||
|
process = Popen(cmd)
|
||||||
# Continuously check if Tor has started
|
try:
|
||||||
while not is_tor_running():
|
while not is_tor_running():
|
||||||
log.debug("Waiting for Tor to start...")
|
log.debug("Waiting for Tor to start...")
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
log.info("Tor is now running")
|
log.info("Tor is now running")
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
log.info("Terminating Tor process...")
|
||||||
|
process.terminate()
|
||||||
|
process.wait()
|
||||||
|
log.info("Tor process terminated")
|
||||||
|
|
||||||
|
|
||||||
def tor_online_test() -> bool:
|
def tor_online_test() -> bool:
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
"netcat",
|
"netcat",
|
||||||
"nix",
|
"nix",
|
||||||
"nixos-anywhere",
|
"nixos-anywhere",
|
||||||
|
"qrencode",
|
||||||
"openssh",
|
"openssh",
|
||||||
"pass",
|
"pass",
|
||||||
"qemu",
|
"qemu",
|
||||||
|
|||||||
@@ -19,7 +19,13 @@ def parse_deployment_address(
|
|||||||
forward_agent: bool = True,
|
forward_agent: bool = True,
|
||||||
meta: dict[str, Any] | None = None,
|
meta: dict[str, Any] | None = None,
|
||||||
private_key: Path | None = None,
|
private_key: Path | None = None,
|
||||||
|
password: str | None = None,
|
||||||
|
tor_socks: bool = False,
|
||||||
) -> "Remote":
|
) -> "Remote":
|
||||||
|
if address.startswith("ssh://"):
|
||||||
|
# Strip the `ssh://` prefix if it exists
|
||||||
|
address = address[len("ssh://") :]
|
||||||
|
|
||||||
parts = address.split("?", maxsplit=1)
|
parts = address.split("?", maxsplit=1)
|
||||||
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
|
endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "")
|
||||||
|
|
||||||
@@ -66,8 +72,10 @@ def parse_deployment_address(
|
|||||||
user=user,
|
user=user,
|
||||||
port=port,
|
port=port,
|
||||||
private_key=private_key,
|
private_key=private_key,
|
||||||
|
password=password,
|
||||||
host_key_check=host_key_check,
|
host_key_check=host_key_check,
|
||||||
command_prefix=machine_name,
|
command_prefix=machine_name,
|
||||||
forward_agent=forward_agent,
|
forward_agent=forward_agent,
|
||||||
ssh_options=options,
|
ssh_options=options,
|
||||||
|
tor_socks=tor_socks,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
# ruff: noqa: SLF001
|
# ruff: noqa: SLF001
|
||||||
|
import ipaddress
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import shlex
|
import shlex
|
||||||
@@ -47,6 +48,12 @@ class Remote:
|
|||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return self.target
|
return self.target
|
||||||
|
|
||||||
|
def is_ipv6(self) -> bool:
|
||||||
|
try:
|
||||||
|
return isinstance(ipaddress.ip_address(self.address), ipaddress.IPv6Address)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def target(self) -> str:
|
def target(self) -> str:
|
||||||
return f"{self.user}@{self.address}"
|
return f"{self.user}@{self.address}"
|
||||||
@@ -60,6 +67,8 @@ class Remote:
|
|||||||
host_key_check: HostKeyCheck,
|
host_key_check: HostKeyCheck,
|
||||||
forward_agent: bool = True,
|
forward_agent: bool = True,
|
||||||
private_key: Path | None = None,
|
private_key: Path | None = None,
|
||||||
|
password: str | None = None,
|
||||||
|
tor_socks: bool = False,
|
||||||
) -> "Remote":
|
) -> "Remote":
|
||||||
"""
|
"""
|
||||||
Parse a deployment address and return a Host object.
|
Parse a deployment address and return a Host object.
|
||||||
@@ -71,6 +80,8 @@ class Remote:
|
|||||||
host_key_check=host_key_check,
|
host_key_check=host_key_check,
|
||||||
forward_agent=forward_agent,
|
forward_agent=forward_agent,
|
||||||
private_key=private_key,
|
private_key=private_key,
|
||||||
|
password=password,
|
||||||
|
tor_socks=tor_socks,
|
||||||
)
|
)
|
||||||
|
|
||||||
def run_local(
|
def run_local(
|
||||||
@@ -297,6 +308,18 @@ class Remote:
|
|||||||
)
|
)
|
||||||
return ssh_opts
|
return ssh_opts
|
||||||
|
|
||||||
|
def ssh_url(self) -> str:
|
||||||
|
"""
|
||||||
|
Generates a standard SSH URL (ssh://[user@]host[:port]).
|
||||||
|
"""
|
||||||
|
url = "ssh://"
|
||||||
|
if self.user:
|
||||||
|
url += f"{self.user}@"
|
||||||
|
url += self.address
|
||||||
|
if self.port:
|
||||||
|
url += f":{self.port}"
|
||||||
|
return url
|
||||||
|
|
||||||
def ssh_cmd(
|
def ssh_cmd(
|
||||||
self, verbose_ssh: bool = False, tty: bool = False, control_master: bool = True
|
self, verbose_ssh: bool = False, tty: bool = False, control_master: bool = True
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
@@ -326,18 +349,53 @@ class Remote:
|
|||||||
]
|
]
|
||||||
return nix_shell(packages, cmd)
|
return nix_shell(packages, cmd)
|
||||||
|
|
||||||
|
def check_sshpass_errorcode(self, res: subprocess.CompletedProcess) -> None:
|
||||||
|
"""
|
||||||
|
Check the return code of the sshpass command and raise an error if it indicates a failure.
|
||||||
|
Error codes are based on man sshpass(1) and may vary by version.
|
||||||
|
"""
|
||||||
|
if res.returncode == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
match res.returncode:
|
||||||
|
case 1:
|
||||||
|
msg = "Invalid command line argument"
|
||||||
|
raise ClanError(msg)
|
||||||
|
case 2:
|
||||||
|
msg = "Conflicting arguments given"
|
||||||
|
raise ClanError(msg)
|
||||||
|
case 3:
|
||||||
|
msg = "General runtime error"
|
||||||
|
raise ClanError(msg)
|
||||||
|
case 4:
|
||||||
|
msg = "Unrecognized response from ssh (parse error)"
|
||||||
|
raise ClanError(msg)
|
||||||
|
case 5:
|
||||||
|
msg = "Invalid/incorrect password"
|
||||||
|
raise ClanError(msg)
|
||||||
|
case 6:
|
||||||
|
msg = "Host public key is unknown. sshpass exits without confirming the new key. Try using --host-key-heck none"
|
||||||
|
raise ClanError(msg)
|
||||||
|
case 7:
|
||||||
|
msg = "IP public key changed. sshpass exits without confirming the new key."
|
||||||
|
raise ClanError(msg)
|
||||||
|
case _:
|
||||||
|
msg = f"SSH command failed with return code {res.returncode}"
|
||||||
|
raise ClanError(msg)
|
||||||
|
|
||||||
def interactive_ssh(self) -> None:
|
def interactive_ssh(self) -> None:
|
||||||
cmd_list = self.ssh_cmd(tty=True, control_master=False)
|
cmd_list = self.ssh_cmd(tty=True, control_master=False)
|
||||||
subprocess.run(cmd_list)
|
res = subprocess.run(cmd_list, check=False)
|
||||||
|
|
||||||
|
self.check_sshpass_errorcode(res)
|
||||||
|
|
||||||
def is_ssh_reachable(host: Remote) -> bool:
|
def is_ssh_reachable(self) -> bool:
|
||||||
address_family = socket.AF_INET6 if ":" in host.address else socket.AF_INET
|
address_family = socket.AF_INET6 if ":" in self.address else socket.AF_INET
|
||||||
with socket.socket(address_family, socket.SOCK_STREAM) as sock:
|
with socket.socket(address_family, socket.SOCK_STREAM) as sock:
|
||||||
sock.settimeout(2)
|
sock.settimeout(2)
|
||||||
try:
|
try:
|
||||||
sock.connect((host.address, host.port or 22))
|
sock.connect((self.address, self.port or 22))
|
||||||
except OSError:
|
except OSError:
|
||||||
return False
|
return False
|
||||||
else:
|
|
||||||
return True
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user