From ee802ad723a6dde20977bc21cdf2151c7f8cf795 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Mon, 9 Jun 2025 19:27:01 +0200 Subject: [PATCH] clan-cli: Use Remote class in DeployInfo, add tests for qrcode parser and json parser --- pkgs/clan-cli/clan_cli/machines/install.py | 10 +- pkgs/clan-cli/clan_cli/machines/update.py | 2 +- pkgs/clan-cli/clan_cli/ssh/deploy_info.py | 193 ++++++++++-------- .../clan-cli/clan_cli/ssh/test_deploy_info.py | 81 ++++++++ pkgs/clan-cli/clan_cli/ssh/tor.py | 40 ++-- .../clan_lib/nix/allowed-packages.json | 1 + pkgs/clan-cli/clan_lib/ssh/parse.py | 8 + pkgs/clan-cli/clan_lib/ssh/remote.py | 80 +++++++- 8 files changed, 294 insertions(+), 121 deletions(-) create mode 100644 pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py diff --git a/pkgs/clan-cli/clan_cli/machines/install.py b/pkgs/clan-cli/clan_cli/machines/install.py index 508edda01..1355aae4c 100644 --- a/pkgs/clan-cli/clan_cli/machines/install.py +++ b/pkgs/clan-cli/clan_cli/machines/install.py @@ -21,7 +21,6 @@ from clan_cli.completions import ( from clan_cli.facts.generate import generate_facts from clan_cli.machines.hardware import HardwareConfig from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse -from clan_cli.ssh.host_key import HostKeyCheck from clan_cli.vars.generate import generate_vars log = logging.getLogger(__name__) @@ -156,7 +155,6 @@ def install_machine(opts: InstallOptions) -> None: def install_command(args: argparse.Namespace) -> None: - host_key_check = HostKeyCheck.from_str(args.host_key_check) try: # Only if the caller did not specify a target_host via args.target_host # Find a suitable target_host that is reachable @@ -165,17 +163,17 @@ def install_command(args: argparse.Namespace) -> None: use_tor = False if deploy_info and not args.target_host: - host = find_reachable_host(deploy_info, host_key_check) + host = find_reachable_host(deploy_info) if host is None: use_tor = True - target_host = f"root@{deploy_info.tor}" + target_host = deploy_info.tor.target else: target_host = host.target if args.password: password = args.password - elif deploy_info and deploy_info.pwd: - password = deploy_info.pwd + elif deploy_info and deploy_info.addrs[0].password: + password = deploy_info.addrs[0].password else: password = None diff --git a/pkgs/clan-cli/clan_cli/machines/update.py b/pkgs/clan-cli/clan_cli/machines/update.py index baa0a34e4..4e1dd8912 100644 --- a/pkgs/clan-cli/clan_cli/machines/update.py +++ b/pkgs/clan-cli/clan_cli/machines/update.py @@ -121,7 +121,7 @@ def deploy_machine(machine: Machine) -> None: upload_secrets(machine, sudo_host) upload_secret_vars(machine, sudo_host) - path = upload_sources(machine, host) + path = upload_sources(machine, sudo_host) nix_options = [ "--show-trace", diff --git a/pkgs/clan-cli/clan_cli/ssh/deploy_info.py b/pkgs/clan-cli/clan_cli/ssh/deploy_info.py index 0a99988d0..fdb94945f 100644 --- a/pkgs/clan-cli/clan_cli/ssh/deploy_info.py +++ b/pkgs/clan-cli/clan_cli/ssh/deploy_info.py @@ -1,24 +1,19 @@ import argparse -import ipaddress import json import logging from dataclasses import dataclass from pathlib import Path from typing import Any -from clan_lib.async_run import AsyncRuntime from clan_lib.cmd import run from clan_lib.errors import ClanError -from clan_lib.machines.machines import Machine from clan_lib.nix import nix_shell -from clan_lib.ssh.parse import parse_deployment_address -from clan_lib.ssh.remote import Remote, is_ssh_reachable +from clan_lib.ssh.remote import HostKeyCheck, Remote from clan_cli.completions import ( add_dynamic_completer, complete_machines, ) -from clan_cli.ssh.host_key import HostKeyCheck from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable log = logging.getLogger(__name__) @@ -26,113 +21,148 @@ log = logging.getLogger(__name__) @dataclass class DeployInfo: - addrs: list[str] - tor: str | None = None - pwd: str | None = None + addrs: list[Remote] + + @property + def tor(self) -> Remote: + """Return a list of Remote objects that are configured for Tor.""" + addrs = [addr for addr in self.addrs if addr.tor_socks] + + if not addrs: + msg = "No tor address provided, please provide a tor address." + raise ClanError(msg) + + if len(addrs) > 1: + msg = "Multiple tor addresses provided, expected only one." + raise ClanError(msg) + return addrs[0] @staticmethod - def from_hostname(hostname: str, args: argparse.Namespace) -> "DeployInfo": - m = Machine(hostname, flake=args.flake) - return DeployInfo(addrs=[m.target_host_address]) + def from_hostnames( + hostname: list[str], host_key_check: HostKeyCheck + ) -> "DeployInfo": + remotes = [] + for host in hostname: + if not host: + msg = "Hostname cannot be empty." + raise ClanError(msg) + remote = Remote.from_deployment_address( + machine_name="clan-installer", + address=host, + host_key_check=host_key_check, + ) + remotes.append(remote) + return DeployInfo(addrs=remotes) @staticmethod - def from_json(data: dict[str, Any]) -> "DeployInfo": - return DeployInfo( - tor=data.get("tor"), pwd=data.get("pass"), addrs=data.get("addrs", []) + def from_json(data: dict[str, Any], host_key_check: HostKeyCheck) -> "DeployInfo": + addrs = [] + password = data.get("pass") + + for addr in data.get("addrs", []): + if isinstance(addr, str): + remote = Remote.from_deployment_address( + machine_name="clan-installer", + address=addr, + host_key_check=host_key_check, + password=password, + ) + addrs.append(remote) + else: + msg = f"Invalid address format: {addr}" + raise ClanError(msg) + if tor_addr := data.get("tor"): + remote = Remote.from_deployment_address( + machine_name="clan-installer", + address=tor_addr, + host_key_check=host_key_check, + password=password, + tor_socks=True, + ) + addrs.append(remote) + + return DeployInfo(addrs=addrs) + + @staticmethod + def from_qr_code(picture_file: Path, host_key_check: HostKeyCheck) -> "DeployInfo": + cmd = nix_shell( + ["zbar"], + [ + "zbarimg", + "--quiet", + "--raw", + str(picture_file), + ], ) + res = run(cmd) + data = res.stdout.strip() + return DeployInfo.from_json(json.loads(data), host_key_check=host_key_check) -def is_ipv6(ip: str) -> bool: - try: - return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address) - except ValueError: - return False - - -def find_reachable_host( - deploy_info: DeployInfo, host_key_check: HostKeyCheck -) -> Remote | None: +def find_reachable_host(deploy_info: DeployInfo) -> Remote | None: host = None for addr in deploy_info.addrs: - host_addr = f"[{addr}]" if is_ipv6(addr) else addr - host_ = parse_deployment_address( - machine_name="uknown", address=host_addr, host_key_check=host_key_check - ) - if is_ssh_reachable(host_): - host = host_ + if addr.is_ssh_reachable(): + host = addr break return host -def qrcode_scan(picture_file: Path) -> str: - cmd = nix_shell( - ["zbar"], - [ - "zbarimg", - "--quiet", - "--raw", - str(picture_file), - ], - ) - res = run(cmd) - return res.stdout.strip() - - -def parse_qr_code(picture_file: Path) -> DeployInfo: - data = qrcode_scan(picture_file) - return DeployInfo.from_json(json.loads(data)) - - -def ssh_shell_from_deploy( - deploy_info: DeployInfo, runtime: AsyncRuntime, host_key_check: HostKeyCheck -) -> None: - if host := find_reachable_host(deploy_info, host_key_check): +def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None: + if host := find_reachable_host(deploy_info): host.interactive_ssh() - else: - log.info("Could not reach host via clearnet 'addrs'") - log.info(f"Trying to reach host via tor '{deploy_info.tor}'") - spawn_tor(runtime) - if not deploy_info.tor: - msg = "No tor address provided, please provide a tor address." - raise ClanError(msg) - if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)): - host = Remote( - address=deploy_info.tor, - user="root", - password=deploy_info.pwd, - tor_socks=True, - command_prefix="tor", - ) - else: - msg = "Could not reach host via tor either." - raise ClanError(msg) + return + + log.info("Could not reach host via clearnet 'addrs'") + log.info(f"Trying to reach host via tor '{deploy_info}'") + + tor_addrs = [addr for addr in deploy_info.addrs if addr.tor_socks] + if not tor_addrs: + msg = "No tor address provided, please provide a tor address." + raise ClanError(msg) + + with spawn_tor(): + for tor_addr in tor_addrs: + log.info(f"Trying to reach host via tor address: {tor_addr}") + if ssh_tor_reachable( + TorTarget( + onion=tor_addr.address, port=tor_addr.port if tor_addr.port else 22 + ) + ): + log.info( + "Host reachable via tor address, starting interactive ssh session." + ) + tor_addr.interactive_ssh() + return + + log.error("Could not reach host via tor address.") def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None: + host_key_check = HostKeyCheck.from_str(args.host_key_check) if args.json: json_file = Path(args.json) if json_file.is_file(): data = json.loads(json_file.read_text()) - return DeployInfo.from_json(data) + return DeployInfo.from_json(data, host_key_check) data = json.loads(args.json) - return DeployInfo.from_json(data) + return DeployInfo.from_json(data, host_key_check) if args.png: - return parse_qr_code(Path(args.png)) + return DeployInfo.from_qr_code(Path(args.png), host_key_check) + if hasattr(args, "machines"): - return DeployInfo.from_hostname(args.machines[0], args) + return DeployInfo.from_hostnames(args.machines, host_key_check) return None def ssh_command(args: argparse.Namespace) -> None: - host_key_check = HostKeyCheck.from_str(args.host_key_check) deploy_info = ssh_command_parse(args) if not deploy_info: msg = "No MACHINE, --json or --png data provided" raise ClanError(msg) - with AsyncRuntime() as runtime: - ssh_shell_from_deploy(deploy_info, runtime, host_key_check) + ssh_shell_from_deploy(deploy_info) def register_parser(parser: argparse.ArgumentParser) -> None: @@ -157,13 +187,10 @@ def register_parser(parser: argparse.ArgumentParser) -> None: "--png", help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)", ) - parser.add_argument( - "--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments" - ) parser.add_argument( "--host-key-check", choices=["strict", "ask", "tofu", "none"], - default="ask", + default="tofu", help="Host key (.ssh/known_hosts) check mode.", ) parser.set_defaults(func=ssh_command) diff --git a/pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py b/pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py new file mode 100644 index 000000000..859d580a3 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py @@ -0,0 +1,81 @@ +import json +from pathlib import Path + +import pytest +from clan_lib.cmd import RunOpts, run +from clan_lib.nix import nix_shell +from clan_lib.ssh.remote import HostKeyCheck, Remote + +from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host + + +def test_qrcode_scan(temp_dir: Path) -> None: + data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}' + img_path = temp_dir / "qrcode.png" + cmd = nix_shell( + ["qrencode"], + [ + "qrencode", + "-o", + str(img_path), + ], + ) + run(cmd, RunOpts(input=data.encode())) + + # Call the qrcode_scan function + deploy_info = DeployInfo.from_qr_code(img_path, HostKeyCheck.NONE) + + host = deploy_info.addrs[0] + assert host.address == "192.168.122.86" + assert host.user == "root" + assert host.password == "scabbed-defender-headlock" + + tor_host = deploy_info.addrs[1] + assert ( + tor_host.address + == "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion" + ) + assert tor_host.tor_socks is True + assert tor_host.password == "scabbed-defender-headlock" + assert tor_host.user == "root" + assert ( + tor_host.address + == "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion" + ) + + +def test_from_json() -> None: + data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}' + deploy_info = DeployInfo.from_json(json.loads(data), HostKeyCheck.NONE) + + host = deploy_info.addrs[0] + assert host.password == "scabbed-defender-headlock" + assert host.address == "192.168.122.86" + + tor_host = deploy_info.addrs[1] + assert ( + tor_host.address + == "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion" + ) + assert tor_host.tor_socks is True + assert tor_host.password == "scabbed-defender-headlock" + assert tor_host.user == "root" + assert ( + tor_host.address + == "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion" + ) + + +@pytest.mark.with_core +def test_find_reachable_host(hosts: list[Remote]) -> None: + host = hosts[0] + deploy_info = DeployInfo.from_hostnames( + ["172.19.1.2", host.ssh_url()], HostKeyCheck.NONE + ) + + assert deploy_info.addrs[0].address == "172.19.1.2" + + remote = find_reachable_host(deploy_info=deploy_info) + + assert remote is not None + assert remote.ssh_url() == host.ssh_url() diff --git a/pkgs/clan-cli/clan_cli/ssh/tor.py b/pkgs/clan-cli/clan_cli/ssh/tor.py index 3a87f1d11..992e731f5 100755 --- a/pkgs/clan-cli/clan_cli/ssh/tor.py +++ b/pkgs/clan-cli/clan_cli/ssh/tor.py @@ -5,10 +5,11 @@ import logging import socket import struct import time +from collections.abc import Iterator +from contextlib import contextmanager from dataclasses import dataclass +from subprocess import Popen -from clan_lib.async_run import AsyncRuntime -from clan_lib.cmd import Log, RunOpts, run from clan_lib.errors import TorConnectionError, TorSocksError from clan_lib.nix import nix_shell @@ -108,32 +109,31 @@ def is_tor_running() -> bool: return True -def spawn_tor(runtime: AsyncRuntime) -> None: +@contextmanager +def spawn_tor() -> Iterator[None]: """ Spawns a Tor process using `nix-shell` if Tor is not already running. """ - def start_tor() -> None: - """Starts Tor process using nix-shell.""" - cmd_args = ["tor", "--HardwareAccel", "1"] - packages = ["tor"] - cmd = nix_shell(packages, cmd_args) - runtime.async_run(None, run, cmd, RunOpts(log=Log.BOTH)) - log.debug("Attempting to start Tor") - # Check if Tor is already running if is_tor_running(): log.info("Tor is running") return - - # Attempt to start Tor - start_tor() - - # Continuously check if Tor has started - while not is_tor_running(): - log.debug("Waiting for Tor to start...") - time.sleep(0.2) - log.info("Tor is now running") + cmd_args = ["tor", "--HardwareAccel", "1"] + packages = ["tor"] + cmd = nix_shell(packages, cmd_args) + process = Popen(cmd) + try: + while not is_tor_running(): + log.debug("Waiting for Tor to start...") + time.sleep(0.2) + log.info("Tor is now running") + yield + finally: + log.info("Terminating Tor process...") + process.terminate() + process.wait() + log.info("Tor process terminated") def tor_online_test() -> bool: diff --git a/pkgs/clan-cli/clan_lib/nix/allowed-packages.json b/pkgs/clan-cli/clan_lib/nix/allowed-packages.json index 73ed0f459..d723bc8e1 100644 --- a/pkgs/clan-cli/clan_lib/nix/allowed-packages.json +++ b/pkgs/clan-cli/clan_lib/nix/allowed-packages.json @@ -19,6 +19,7 @@ "netcat", "nix", "nixos-anywhere", + "qrencode", "openssh", "pass", "qemu", diff --git a/pkgs/clan-cli/clan_lib/ssh/parse.py b/pkgs/clan-cli/clan_lib/ssh/parse.py index ccac18499..928763117 100644 --- a/pkgs/clan-cli/clan_lib/ssh/parse.py +++ b/pkgs/clan-cli/clan_lib/ssh/parse.py @@ -19,7 +19,13 @@ def parse_deployment_address( forward_agent: bool = True, meta: dict[str, Any] | None = None, private_key: Path | None = None, + password: str | None = None, + tor_socks: bool = False, ) -> "Remote": + if address.startswith("ssh://"): + # Strip the `ssh://` prefix if it exists + address = address[len("ssh://") :] + parts = address.split("?", maxsplit=1) endpoint, maybe_options = parts if len(parts) == 2 else (parts[0], "") @@ -66,8 +72,10 @@ def parse_deployment_address( user=user, port=port, private_key=private_key, + password=password, host_key_check=host_key_check, command_prefix=machine_name, forward_agent=forward_agent, ssh_options=options, + tor_socks=tor_socks, ) diff --git a/pkgs/clan-cli/clan_lib/ssh/remote.py b/pkgs/clan-cli/clan_lib/ssh/remote.py index 6f9cc677e..c75b037d2 100644 --- a/pkgs/clan-cli/clan_lib/ssh/remote.py +++ b/pkgs/clan-cli/clan_lib/ssh/remote.py @@ -1,4 +1,5 @@ # ruff: noqa: SLF001 +import ipaddress import logging import os import shlex @@ -47,6 +48,12 @@ class Remote: def __str__(self) -> str: return self.target + def is_ipv6(self) -> bool: + try: + return isinstance(ipaddress.ip_address(self.address), ipaddress.IPv6Address) + except ValueError: + return False + @property def target(self) -> str: return f"{self.user}@{self.address}" @@ -60,6 +67,8 @@ class Remote: host_key_check: HostKeyCheck, forward_agent: bool = True, private_key: Path | None = None, + password: str | None = None, + tor_socks: bool = False, ) -> "Remote": """ Parse a deployment address and return a Host object. @@ -71,6 +80,8 @@ class Remote: host_key_check=host_key_check, forward_agent=forward_agent, private_key=private_key, + password=password, + tor_socks=tor_socks, ) def run_local( @@ -297,6 +308,18 @@ class Remote: ) return ssh_opts + def ssh_url(self) -> str: + """ + Generates a standard SSH URL (ssh://[user@]host[:port]). + """ + url = "ssh://" + if self.user: + url += f"{self.user}@" + url += self.address + if self.port: + url += f":{self.port}" + return url + def ssh_cmd( self, verbose_ssh: bool = False, tty: bool = False, control_master: bool = True ) -> list[str]: @@ -326,18 +349,53 @@ class Remote: ] return nix_shell(packages, cmd) + def check_sshpass_errorcode(self, res: subprocess.CompletedProcess) -> None: + """ + Check the return code of the sshpass command and raise an error if it indicates a failure. + Error codes are based on man sshpass(1) and may vary by version. + """ + if res.returncode == 0: + return + + match res.returncode: + case 1: + msg = "Invalid command line argument" + raise ClanError(msg) + case 2: + msg = "Conflicting arguments given" + raise ClanError(msg) + case 3: + msg = "General runtime error" + raise ClanError(msg) + case 4: + msg = "Unrecognized response from ssh (parse error)" + raise ClanError(msg) + case 5: + msg = "Invalid/incorrect password" + raise ClanError(msg) + case 6: + msg = "Host public key is unknown. sshpass exits without confirming the new key. Try using --host-key-heck none" + raise ClanError(msg) + case 7: + msg = "IP public key changed. sshpass exits without confirming the new key." + raise ClanError(msg) + case _: + msg = f"SSH command failed with return code {res.returncode}" + raise ClanError(msg) + def interactive_ssh(self) -> None: cmd_list = self.ssh_cmd(tty=True, control_master=False) - subprocess.run(cmd_list) + res = subprocess.run(cmd_list, check=False) + self.check_sshpass_errorcode(res) -def is_ssh_reachable(host: Remote) -> bool: - address_family = socket.AF_INET6 if ":" in host.address else socket.AF_INET - with socket.socket(address_family, socket.SOCK_STREAM) as sock: - sock.settimeout(2) - try: - sock.connect((host.address, host.port or 22)) - except OSError: - return False - else: - return True + def is_ssh_reachable(self) -> bool: + address_family = socket.AF_INET6 if ":" in self.address else socket.AF_INET + with socket.socket(address_family, socket.SOCK_STREAM) as sock: + sock.settimeout(2) + try: + sock.connect((self.address, self.port or 22)) + except OSError: + return False + + return True