From 6b784f8623854f5315822f190d5adedb36ef4a81 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Fri, 13 Dec 2024 00:11:18 +0100 Subject: [PATCH] clan-cli: Rework 'clan ssh' command, improve Tor support. --- pkgs/clan-cli/clan_cli/__init__.py | 4 +- pkgs/clan-cli/clan_cli/errors.py | 10 + pkgs/clan-cli/clan_cli/facts/check.py | 4 +- pkgs/clan-cli/clan_cli/facts/generate.py | 5 +- pkgs/clan-cli/clan_cli/machines/install.py | 51 ++---- pkgs/clan-cli/clan_cli/nix/__init__.py | 6 +- pkgs/clan-cli/clan_cli/secrets/sops.py | 14 +- pkgs/clan-cli/clan_cli/ssh/cli.py | 137 -------------- pkgs/clan-cli/clan_cli/ssh/deploy_info.py | 127 +++++++++++++ pkgs/clan-cli/clan_cli/ssh/host.py | 52 +++++- pkgs/clan-cli/clan_cli/ssh/tor.py | 203 +++++++++++++++++++++ pkgs/clan-cli/tests/test_ssh_cli.py | 141 -------------- pyproject.toml | 1 + 13 files changed, 418 insertions(+), 337 deletions(-) delete mode 100644 pkgs/clan-cli/clan_cli/ssh/cli.py create mode 100644 pkgs/clan-cli/clan_cli/ssh/deploy_info.py create mode 100755 pkgs/clan-cli/clan_cli/ssh/tor.py delete mode 100644 pkgs/clan-cli/tests/test_ssh_cli.py diff --git a/pkgs/clan-cli/clan_cli/__init__.py b/pkgs/clan-cli/clan_cli/__init__.py index 85b6ac69c..279451212 100644 --- a/pkgs/clan-cli/clan_cli/__init__.py +++ b/pkgs/clan-cli/clan_cli/__init__.py @@ -30,7 +30,7 @@ from .flash import cli as flash_cli from .hyperlink import help_hyperlink from .machines import cli as machines from .profiler import profile -from .ssh import cli as ssh_cli +from .ssh import deploy_info as ssh_cli from .vars import cli as vars_cli log = logging.getLogger(__name__) @@ -432,7 +432,7 @@ def main() -> None: if debug: log.exception("Exited with error") else: - log.error("%s", e) # noqa: TRY400 + log.error("%s", e) sys.exit(1) except KeyboardInterrupt as ex: log.warning("Interrupted by user", exc_info=ex) diff --git a/pkgs/clan-cli/clan_cli/errors.py b/pkgs/clan-cli/clan_cli/errors.py index 7b1308079..9b48a6569 100644 --- a/pkgs/clan-cli/clan_cli/errors.py +++ b/pkgs/clan-cli/clan_cli/errors.py @@ -189,3 +189,13 @@ class ClanCmdError(ClanError): def __repr__(self) -> str: return f"ClanCmdError({self.cmd})" + + +class TorSocksError(ClanError): + def __init__(self, msg: str) -> None: + super().__init__(msg) + + +class TorConnectionError(ClanError): + def __init__(self, msg: str) -> None: + super().__init__(msg) diff --git a/pkgs/clan-cli/clan_cli/facts/check.py b/pkgs/clan-cli/clan_cli/facts/check.py index 8a37091c0..e769a74ea 100644 --- a/pkgs/clan-cli/clan_cli/facts/check.py +++ b/pkgs/clan-cli/clan_cli/facts/check.py @@ -25,14 +25,14 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool: secret_name = secret_fact["name"] if not secret_facts_store.exists(service, secret_name): machine.info( - f"Secret fact '{secret_fact}' for service '{service}' in machine {machine.name} is missing." + f"Secret fact '{secret_fact}' for service '{service}' is missing." ) missing_secret_facts.append((service, secret_name)) for public_fact in machine.facts_data[service]["public"]: if not public_facts_store.exists(service, public_fact): machine.info( - f"Public fact '{public_fact}' for service '{service}' in machine {machine.name} is missing." + f"Public fact '{public_fact}' for service '{service}' is missing." ) missing_public_facts.append((service, public_fact)) diff --git a/pkgs/clan-cli/clan_cli/facts/generate.py b/pkgs/clan-cli/clan_cli/facts/generate.py index ed202aa5a..19bcead3d 100644 --- a/pkgs/clan-cli/clan_cli/facts/generate.py +++ b/pkgs/clan-cli/clan_cli/facts/generate.py @@ -2,7 +2,6 @@ import argparse import importlib import logging import os -import subprocess import sys from collections.abc import Callable from pathlib import Path @@ -32,9 +31,9 @@ def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str: Read multi-line input from stdin. """ print(prompt, flush=True) - proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True, check=False) + proc = run(["cat"], RunOpts(check=False)) log.info("Input received. Processing...") - return proc.stdout + return proc.stdout.rstrip(os.linesep).rstrip() def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]: diff --git a/pkgs/clan-cli/clan_cli/machines/install.py b/pkgs/clan-cli/clan_cli/machines/install.py index 3a6049911..1f5e42ab5 100644 --- a/pkgs/clan-cli/clan_cli/machines/install.py +++ b/pkgs/clan-cli/clan_cli/machines/install.py @@ -1,6 +1,5 @@ import argparse import importlib -import json import logging import os import sys @@ -21,7 +20,7 @@ from clan_cli.facts.generate import generate_facts from clan_cli.machines.hardware import HardwareConfig from clan_cli.machines.machines import Machine from clan_cli.nix import nix_shell -from clan_cli.ssh.cli import is_ipv6, is_reachable, qrcode_scan +from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse from clan_cli.vars.generate import generate_vars log = logging.getLogger(__name__) @@ -36,7 +35,7 @@ class InstallOptions: kexec: str | None = None debug: bool = False no_reboot: bool = False - json_ssh_deploy: dict[str, str] | None = None + deploy_info: DeployInfo | None = None build_on_remote: bool = False nix_options: list[str] = field(default_factory=list) update_hardware_config: HardwareConfig = HardwareConfig.NONE @@ -45,6 +44,12 @@ class InstallOptions: @API.register def install_machine(opts: InstallOptions) -> None: + # TODO: Fixme, replace opts.machine and opts.flake with machine object + # remove opts.deploy_info, opts.target_host and populate machine object + if opts.deploy_info: + msg = "Deploy info has not been fully implemented yet" + raise NotImplementedError(msg) + machine = Machine(opts.machine, flake=opts.flake) machine.override_target_host = opts.target_host @@ -129,21 +134,15 @@ def install_command(args: argparse.Namespace) -> None: if args.flake is None: msg = "Could not find clan flake toplevel directory" raise ClanError(msg) - json_ssh_deploy = None - if args.json: - json_file = Path(args.json) - if json_file.is_file(): - json_ssh_deploy = json.loads(json_file.read_text()) - else: - json_ssh_deploy = json.loads(args.json) - elif args.png: - json_ssh_deploy = json.loads(qrcode_scan(args.png)) + deploy_info: DeployInfo | None = ssh_command_parse(args) - if json_ssh_deploy: - target_host = ( - f"root@{find_reachable_host_from_deploy_json(json_ssh_deploy)}" - ) - password = json_ssh_deploy["pass"] + if deploy_info: + host = find_reachable_host(deploy_info) + if host is None: + msg = f"Couldn't reach any host address: {deploy_info.addrs}" + raise ClanError(msg) + target_host = host.target + password = deploy_info.pwd elif args.target_host: target_host = args.target_host password = None @@ -174,7 +173,7 @@ def install_command(args: argparse.Namespace) -> None: kexec=args.kexec, debug=args.debug, no_reboot=args.no_reboot, - json_ssh_deploy=json_ssh_deploy, + deploy_info=deploy_info, nix_options=args.option, build_on_remote=args.build_on_remote, update_hardware_config=HardwareConfig(args.update_hardware_config), @@ -186,22 +185,6 @@ def install_command(args: argparse.Namespace) -> None: sys.exit(1) -def find_reachable_host_from_deploy_json(deploy_json: dict[str, str]) -> str: - host = None - for addr in deploy_json["addrs"]: - if is_reachable(addr): - host = f"[{addr}]" if is_ipv6(addr) else addr - break - if not host: - msg = f""" - Could not reach any of the host addresses provided in the json string. - Please doublecheck if they are reachable from your machine. - Try `ping [ADDR]` with one of the addresses: {deploy_json['addrs']} - """ - raise ClanError(msg) - return host - - def register_install_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--kexec", diff --git a/pkgs/clan-cli/clan_cli/nix/__init__.py b/pkgs/clan-cli/clan_cli/nix/__init__.py index 0afe3109d..e075e417b 100644 --- a/pkgs/clan-cli/clan_cli/nix/__init__.py +++ b/pkgs/clan-cli/clan_cli/nix/__init__.py @@ -97,7 +97,11 @@ def nix_metadata(flake_url: str | Path) -> dict[str, Any]: def nix_shell(packages: list[str], cmd: list[str]) -> list[str]: # we cannot use nix-shell inside the nix sandbox # in our tests we just make sure we have all the packages - if os.environ.get("IN_NIX_SANDBOX") or os.environ.get("CLAN_NO_DYNAMIC_DEPS"): + if ( + os.environ.get("IN_NIX_SANDBOX") + or os.environ.get("CLAN_NO_DYNAMIC_DEPS") + or len(packages) == 0 + ): return cmd return [ *nix_command(["shell", "--inputs-from", f"{nixpkgs_flake()!s}"]), diff --git a/pkgs/clan-cli/clan_cli/secrets/sops.py b/pkgs/clan-cli/clan_cli/secrets/sops.py index d0d5f5a1d..605c3861e 100644 --- a/pkgs/clan-cli/clan_cli/secrets/sops.py +++ b/pkgs/clan-cli/clan_cli/secrets/sops.py @@ -77,7 +77,7 @@ class KeyType(enum.Enum): except FileNotFoundError: return except Exception as ex: - log.warning(f"Could not read age keys from {key_path}: {ex}") + log.warning(f"Could not read age keys from {key_path}", exc_info=ex) # Sops will try every location, see age/keysource.go if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): @@ -246,14 +246,10 @@ def sops_run( def get_public_age_key(privkey: str) -> str: cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) - try: - res = subprocess.run( - cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True - ) - except subprocess.CalledProcessError as e: - msg = "Failed to get public key for age private key. Is the key malformed?" - raise ClanError(msg) from e - return res.stdout.strip() + + error_msg = "Failed to get public key for age private key. Is the key malformed?" + res = run(cmd, RunOpts(input=privkey.encode(), error_msg=error_msg)) + return res.stdout.rstrip(os.linesep).rstrip() def generate_private_key(out_file: Path | None = None) -> tuple[str, str]: diff --git a/pkgs/clan-cli/clan_cli/ssh/cli.py b/pkgs/clan-cli/clan_cli/ssh/cli.py deleted file mode 100644 index b01b4e253..000000000 --- a/pkgs/clan-cli/clan_cli/ssh/cli.py +++ /dev/null @@ -1,137 +0,0 @@ -import argparse -import ipaddress -import json -import logging -import socket -import subprocess -from pathlib import Path - -from clan_cli.nix import nix_shell - -log = logging.getLogger(__name__) - - -def ssh( - host: str, - user: str = "root", - password: str | None = None, - ssh_args: list[str] | None = None, - torify: bool = False, -) -> None: - if ssh_args is None: - ssh_args = [] - packages = ["nixpkgs#openssh"] - if torify: - packages.append("nixpkgs#tor") - - password_args = [] - if password: - packages.append("nixpkgs#sshpass") - password_args = [ - "sshpass", - "-p", - password, - ] - _ssh_args = [ - *ssh_args, - "ssh", - "-o", - "UserKnownHostsFile=/dev/null", - "-o", - "StrictHostKeyChecking=no", - f"{user}@{host}", - ] - - if password: - _ssh_args.append("-o") - _ssh_args.append("IdentitiesOnly=yes") - - cmd_args = [*password_args, *_ssh_args] - if torify: - cmd_args.insert(0, "torify") - - cmd = nix_shell(packages, cmd_args) - subprocess.run(cmd, check=True) - - -def qrcode_scan(picture_file: str) -> str: - return ( - subprocess.run( - nix_shell( - ["nixpkgs#zbar"], - [ - "zbarimg", - "--quiet", - "--raw", - picture_file, - ], - ), - stdout=subprocess.PIPE, - check=True, - ) - .stdout.decode() - .strip() - ) - - -def is_reachable(host: str) -> bool: - sock = socket.socket( - socket.AF_INET6 if ":" in host else socket.AF_INET, socket.SOCK_STREAM - ) - sock.settimeout(2) - try: - sock.connect((host, 22)) - sock.close() - except OSError: - return False - else: - return True - - -def connect_ssh_from_json(ssh_data: dict[str, str]) -> None: - for address in ssh_data["addrs"]: - log.debug(f"Trying to reach host on: {address}") - if is_reachable(address): - ssh(host=address, password=ssh_data["pass"]) - exit(0) - else: - log.debug(f"Could not reach host on {address}") - log.debug(f'Trying to reach host via torify on {ssh_data["tor"]}') - ssh(host=ssh_data["tor"], password=ssh_data["pass"], torify=True) - - -def is_ipv6(ip: str) -> bool: - try: - return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address) - except ValueError: - return False - - -def main(args: argparse.Namespace) -> None: - if args.json: - json_file = Path(args.json) - if json_file.is_file(): - ssh_data = json.loads(json_file.read_text()) - else: - ssh_data = json.loads(args.json) - connect_ssh_from_json(ssh_data) - elif args.png: - ssh_data = json.loads(qrcode_scan(args.png)) - connect_ssh_from_json(ssh_data) - - -def register_parser(parser: argparse.ArgumentParser) -> None: - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "-j", - "--json", - help="specify the json file for ssh data (generated by starting the clan installer)", - ) - group.add_argument( - "-P", - "--png", - help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)", - ) - # TODO pass all args we don't parse into ssh_args, currently it fails if arg starts with - - parser.add_argument("ssh_args", nargs="*", default=[]) - parser.set_defaults(func=main) diff --git a/pkgs/clan-cli/clan_cli/ssh/deploy_info.py b/pkgs/clan-cli/clan_cli/ssh/deploy_info.py new file mode 100644 index 000000000..17ecd60f2 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/ssh/deploy_info.py @@ -0,0 +1,127 @@ +import argparse +import ipaddress +import json +import logging +import shlex +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from clan_cli.cmd import run +from clan_cli.errors import ClanError, TorConnectionError, TorSocksError +from clan_cli.nix import nix_shell +from clan_cli.ssh.host import Host, is_ssh_reachable +from clan_cli.ssh.tor import TorTarget, ssh_tor_reachable + +log = logging.getLogger(__name__) + + +@dataclass +class DeployInfo: + addrs: list[str] + tor: str | None = None + pwd: str | None = None + + @staticmethod + def from_json(data: dict[str, Any]) -> "DeployInfo": + return DeployInfo(tor=data["tor"], pwd=data["pass"], addrs=data["addrs"]) + + +def is_ipv6(ip: str) -> bool: + try: + return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address) + except ValueError: + return False + + +def find_reachable_host(deploy_info: DeployInfo) -> Host | None: + host = None + for addr in deploy_info.addrs: + host_addr = f"[{addr}]" if is_ipv6(addr) else addr + host = Host(host=host_addr) + if is_ssh_reachable(host): + break + return host + + +def qrcode_scan(picture_file: Path) -> str: + cmd = nix_shell( + ["nixpkgs#zbar"], + [ + "zbarimg", + "--quiet", + "--raw", + str(picture_file), + ], + ) + res = run(cmd) + return res.stdout.strip() + + +def parse_qr_code(picture_file: Path) -> DeployInfo: + data = qrcode_scan(picture_file) + return DeployInfo.from_json(json.loads(data)) + + +def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None: + if host := find_reachable_host(deploy_info): + host.connect_ssh_shell(password=deploy_info.pwd) + else: + log.info("Could not reach host via clearnet 'addrs'") + log.info(f"Trying to reach host via tor '{deploy_info.tor}'") + if not deploy_info.tor: + msg = "No tor address provided, please provide a tor address." + raise ClanError(msg) + if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)): + host = Host(host=deploy_info.tor) + host.connect_ssh_shell(password=deploy_info.pwd, tor_socks=True) + else: + msg = "Could not reach host via tor either." + raise ClanError(msg) + + +def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None: + if args.json: + json_file = Path(args.json) + if json_file.is_file(): + data = json.loads(json_file.read_text()) + return DeployInfo.from_json(data) + data = json.loads(args.json) + return DeployInfo.from_json(data) + if args.png: + return parse_qr_code(Path(args.png)) + return None + + +def ssh_command(args: argparse.Namespace) -> None: + deploy_info = ssh_command_parse(args) + if not deploy_info: + msg = "No --json or --png data provided" + raise ClanError(msg) + try: + ssh_shell_from_deploy(deploy_info) + except TorSocksError as ex: + log.error(ex) + tor_cmd = nix_shell(["nixpkgs#tor"], ["tor"]) + log.error("Is Tor running? If not, you can start it by running:") + log.error(f"{' '.join(shlex.quote(arg) for arg in tor_cmd)}") + except TorConnectionError: + log.error("The onion address is not reachable via Tor.") + + +def register_parser(parser: argparse.ArgumentParser) -> None: + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-j", + "--json", + help="specify the json file for ssh data (generated by starting the clan installer)", + ) + group.add_argument( + "-P", + "--png", + help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)", + ) + parser.add_argument( + "--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments" + ) + parser.set_defaults(func=ssh_command) diff --git a/pkgs/clan-cli/clan_cli/ssh/host.py b/pkgs/clan-cli/clan_cli/ssh/host.py index 635787a93..688fd92f9 100644 --- a/pkgs/clan-cli/clan_cli/ssh/host.py +++ b/pkgs/clan-cli/clan_cli/ssh/host.py @@ -3,6 +3,8 @@ import logging import os import shlex +import socket +import subprocess from dataclasses import dataclass, field from shlex import quote from typing import Any @@ -10,6 +12,7 @@ from typing import Any from clan_cli.cmd import CmdOut, RunOpts, run from clan_cli.colors import AnsiColor from clan_cli.errors import ClanError +from clan_cli.nix import nix_shell from clan_cli.ssh.host_key import HostKeyCheck cmdlog = logging.getLogger(__name__) @@ -40,13 +43,6 @@ class Host: def target(self) -> str: return f"{self.user or 'root'}@{self.host}" - @property - def target_for_rsync(self) -> str: - host = self.host - if ":" in host: - host = f"[{host}]" - return f"{self.user or 'root'}@{host}" - def run_local( self, cmd: list[str], @@ -163,8 +159,20 @@ class Host: def ssh_cmd( self, verbose_ssh: bool = False, + tor_socks: bool = False, tty: bool = False, + password: str | None = None, ) -> list[str]: + packages = [] + password_args = [] + if password: + packages.append("nixpkgs#sshpass") + password_args = [ + "sshpass", + "-p", + password, + ] + ssh_opts = self.ssh_cmd_opts if verbose_ssh or self.verbose_ssh: ssh_opts.extend(["-v"]) @@ -176,8 +184,36 @@ class Host: if self.key: ssh_opts.extend(["-i", self.key]) - return [ + if tor_socks: + ssh_opts.append("-o") + ssh_opts.append("ProxyCommand=nc -x 127.0.0.1:9050 -X 5 %h %p") + + cmd = [ + *password_args, "ssh", self.target, *ssh_opts, ] + + return nix_shell(packages, cmd) + + def connect_ssh_shell( + self, *, password: str | None = None, tor_socks: bool = False + ) -> None: + cmd = self.ssh_cmd(tor_socks=tor_socks, password=password) + + subprocess.run(cmd) + + +def is_ssh_reachable(host: Host) -> bool: + sock = socket.socket( + socket.AF_INET6 if ":" in host.host else socket.AF_INET, socket.SOCK_STREAM + ) + sock.settimeout(2) + try: + sock.connect((host.host, host.port or 22)) + sock.close() + except OSError: + return False + else: + return True diff --git a/pkgs/clan-cli/clan_cli/ssh/tor.py b/pkgs/clan-cli/clan_cli/ssh/tor.py new file mode 100755 index 000000000..f323c81ac --- /dev/null +++ b/pkgs/clan-cli/clan_cli/ssh/tor.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import socket +import struct +from dataclasses import dataclass + +from clan_cli.cmd import run +from clan_cli.errors import TorConnectionError, TorSocksError +from clan_cli.nix import nix_shell + +log = logging.getLogger(__name__) + + +@dataclass +class TorTarget: + onion: str + port: int + proxy_host: str = "127.0.0.1" + proxy_port: int = 9050 + + +def connect_to_tor_socks(sock: socket.socket, target: TorTarget) -> socket.socket: + """ + Connects to a .onion host through Tor's SOCKS5 proxy using the standard library. + + Args: + target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port. + + Returns: + socket.socket: A socket connected to the .onion address via Tor. + """ + try: + # 1. Create a socket to the Tor SOCKS proxy + sock.connect((target.proxy_host, target.proxy_port)) + except ConnectionRefusedError as ex: + msg = f"Failed to connect to Tor SOCKS proxy at {target.proxy_host}:{target.proxy_port}: {ex}" + raise TorSocksError(msg) from ex + + # 2. SOCKS5 handshake + sock.sendall( + b"\x05\x01\x00" + ) # SOCKS version (0x05), number of authentication methods (0x01), no-authentication (0x00) + response = sock.recv(2) + + # Validate the SOCKS5 handshake response + if response != b"\x05\x00": # SOCKS version = 0x05, no-authentication = 0x00 + msg = f"SOCKS5 handshake failed, unexpected response: {response.hex()}" + raise TorSocksError(msg) + + # 3. Connection request + request = ( + b"\x05\x01\x00\x03" # SOCKS version, connect command, reserved, address type = domainname + + bytes([len(target.onion)]) + + target.onion.encode("utf-8") # Add domain name length and domain name + + struct.pack(">H", target.port) # Add destination port in network byte order + ) + sock.sendall(request) + + # Read the connection request response + response = sock.recv(10) + if response[1] != 0x00: # 0x00 indicates success + msg = f".onion address not reachable: {response[1]}" + raise TorConnectionError(msg) + + return sock + + +def fetch_onion_content(target: TorTarget) -> str: + """ + Fetches the HTTP response from a .onion service through a Tor SOCKS5 proxy. + + Args: + target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port. + + Returns: + str: The HTTP response text, or an error message if something goes wrong. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + # Connect to the .onion service via the SOCKS proxy + sock = connect_to_tor_socks(sock, target) + + # 1. Send an HTTP GET request + request = f"GET / HTTP/1.1\r\nHost: {target.onion}\r\nConnection: close\r\n\r\n" + sock.sendall(request.encode("utf-8")) + + # 2. Read the HTTP response + response = b"" + while True: + chunk = sock.recv(4096) + if not chunk: + break + response += chunk + + return response.decode("utf-8", errors="replace") + + +def spawn_tor() -> None: + """ + Spawns a Tor process using `nix-shell`. + """ + cmd_args = ["tor"] + packages = ["nixpkgs#tor"] + cmd = nix_shell(packages, cmd_args) + run(cmd) + + +def tor_online_test() -> bool: + """ + Tests if Tor is online by attempting to fetch content from a known .onion service. + Returns True if successful, False otherwise. + """ + target = TorTarget( + onion="duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion", port=80 + ) + try: + response = fetch_onion_content(target) + except TorConnectionError: + return False + else: + return "duckduckgo" in response + + +def ssh_tor_reachable(target: TorTarget) -> bool: + """ + Tests if SSH is reachable via Tor by attempting to connect to a known .onion service. + Returns True if successful, False otherwise. + """ + try: + response = fetch_onion_content(target) + except TorConnectionError: + return False + else: + return "SSH-" in response + + +def main() -> None: + """ + Main function to handle command-line arguments and execute the script. + """ + parser = argparse.ArgumentParser( + description="Interact with a .onion service through Tor SOCKS5 proxy." + ) + parser.add_argument( + "onion_url", type=str, help=".onion URL to connect to (e.g., 'example.onion')" + ) + parser.add_argument( + "--port", type=int, help="Port to connect to on the .onion URL (default: 80)" + ) + parser.add_argument( + "--proxy-host", + type=str, + default="127.0.0.1", + help="Address of the Tor SOCKS5 proxy (default: 127.0.0.1)", + ) + parser.add_argument( + "--proxy-port", + type=int, + default=9050, + help="Port of the Tor SOCKS5 proxy (default: 9050)", + ) + parser.add_argument( + "--ssh-tor-reachable", + action="store_true", + help="Test if SSH is reachable via Tor", + ) + + args = parser.parse_args() + default_port = 22 if args.ssh_tor_reachable else 80 + + # Create a TorTarget instance + target = TorTarget( + onion=args.onion_url, + port=args.port or default_port, + proxy_host=args.proxy_host, + proxy_port=args.proxy_port, + ) + + if args.ssh_tor_reachable: + print(f"Testing if SSH is reachable via Tor for {target.onion}...") + reachable = ssh_tor_reachable(target) + print(f"SSH is {'reachable' if reachable else 'not reachable'} via Tor.") + return + + print( + f"Connecting to {target.onion} on port {target.port} via proxy {target.proxy_host}:{target.proxy_port}..." + ) + try: + response = fetch_onion_content(target) + print("Response:") + print(response) + except TorSocksError: + log.error("Failed to connect to the Tor SOCKS proxy.") + log.error( + "Is Tor running? If not, you can start it by running 'tor' in a nix-shell." + ) + except TorConnectionError: + log.error("The onion address is not reachable via Tor.") + + +if __name__ == "__main__": + main() diff --git a/pkgs/clan-cli/tests/test_ssh_cli.py b/pkgs/clan-cli/tests/test_ssh_cli.py deleted file mode 100644 index a7be2adc2..000000000 --- a/pkgs/clan-cli/tests/test_ssh_cli.py +++ /dev/null @@ -1,141 +0,0 @@ -import os -import sys - -import clan_cli -import pytest -import pytest_subprocess.fake_process -from clan_cli.ssh import cli -from pytest_subprocess import utils -from stdout import CaptureOutput - - -def test_no_args( - monkeypatch: pytest.MonkeyPatch, - capture_output: CaptureOutput, -) -> None: - monkeypatch.setattr(sys, "argv", ["", "ssh"]) - with capture_output as output, pytest.raises(SystemExit): - clan_cli.main() - assert output.err.startswith("usage:") - - -# using fp fixture from pytest-subprocess -def test_ssh_no_pass( - fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch -) -> None: - host = "somehost" - user = "user" - if os.environ.get("IN_NIX_SANDBOX"): - monkeypatch.delenv("IN_NIX_SANDBOX") - cmd: list[str | utils.Any] = [ - "nix", - fp.any(), - "shell", - fp.any(), - "-c", - "ssh", - "-o", - "UserKnownHostsFile=/dev/null", - "-o", - "StrictHostKeyChecking=no", - f"{user}@{host}", - fp.any(), - ] - fp.register(cmd) - cli.ssh( - host=host, - user=user, - ) - assert fp.call_count(cmd) == 1 - - -def test_ssh_with_pass( - fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch -) -> None: - host = "somehost" - user = "user" - if os.environ.get("IN_NIX_SANDBOX"): - monkeypatch.delenv("IN_NIX_SANDBOX") - cmd: list[str | utils.Any] = [ - "nix", - fp.any(), - "shell", - fp.any(), - "-c", - "sshpass", - "-p", - fp.any(), - ] - fp.register(cmd) - cli.ssh( - host=host, - user=user, - password="XXX", - ) - assert fp.call_count(cmd) == 1 - - -def test_ssh_no_pass_with_torify( - fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch -) -> None: - host = "somehost" - user = "user" - if os.environ.get("IN_NIX_SANDBOX"): - monkeypatch.delenv("IN_NIX_SANDBOX") - cmd: list[str | utils.Any] = [ - "nix", - fp.any(), - "shell", - fp.any(), - "-c", - "torify", - "ssh", - "-o", - "UserKnownHostsFile=/dev/null", - "-o", - "StrictHostKeyChecking=no", - f"{user}@{host}", - fp.any(), - ] - fp.register(cmd) - cli.ssh( - host=host, - user=user, - torify=True, - ) - assert fp.call_count(cmd) == 1 - - -def test_ssh_with_pass_with_torify( - fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch -) -> None: - host = "somehost" - user = "user" - if os.environ.get("IN_NIX_SANDBOX"): - monkeypatch.delenv("IN_NIX_SANDBOX") - cmd: list[str | utils.Any] = [ - "nix", - fp.any(), - "shell", - fp.any(), - "-c", - "torify", - "sshpass", - "-p", - fp.any(), - ] - fp.register(cmd) - cli.ssh( - host=host, - user=user, - password="XXX", - torify=True, - ) - assert fp.call_count(cmd) == 1 - - -def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None: - cmd: list[str | utils.Any] = [fp.any()] - fp.register(cmd, stdout="https://test.test") - result = cli.qrcode_scan("test.png") - assert result == "https://test.test" diff --git a/pyproject.toml b/pyproject.toml index f2a66548c..080d859c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ lint.ignore = [ "A003", "ANN101", "ANN401", + "TRY400", "E402", "E501", "E731",