refactor: generalize Tor support to SOCKS5 proxy in network module

- Replace Tor-specific implementation with generic SOCKS5 proxy support
- Change `tor_socks` boolean to `socks_port` and `socks_wrapper` parameters
- Move Tor functionality to clan_lib.network.tor submodule
- Add connection context managers to NetworkTechnologyBase
- Improve network abstraction with proper remote() and connection() methods
- Update all callers to use new SOCKS5 proxy interface
- Fix network ping command to properly handle connection contexts

This allows for more flexible proxy configurations beyond just Tor,
while maintaining backward compatibility for Tor usage.
This commit is contained in:
lassulus
2025-07-24 22:26:36 +02:00
parent 9e85c64139
commit 1a5b77d47a
17 changed files with 504 additions and 465 deletions

View File

@@ -35,7 +35,7 @@ def install_command(args: argparse.Namespace) -> None:
use_tor = False
if deploy_info:
host = find_reachable_host(deploy_info)
if host is None or host.tor_socks:
if host is None or host.socks_port:
use_tor = True
target_host_str = deploy_info.tor.target
else:
@@ -74,7 +74,9 @@ def install_command(args: argparse.Namespace) -> None:
target_host = target_host.override(password=password)
if use_tor:
target_host = target_host.override(tor_socks=True)
target_host = target_host.override(
socks_port=9050, socks_wrapper=["torify"]
)
return run_machine_install(
InstallOptions(

View File

@@ -27,28 +27,27 @@ def ping_command(args: argparse.Namespace) -> None:
networks_to_check = sorted(networks.items(), key=lambda x: -x[1].priority)
found = False
results = []
for net_name, network in networks_to_check:
if machine in network.peers:
found = True
# Check if network technology is running
if not network.is_running():
results.append(f"{machine} ({net_name}): network not running")
continue
# Check if peer is online
ping = network.ping(machine)
results.append(f"{machine} ({net_name}): {ping}")
with network.module.connection(network) as network:
log.info(f"Pinging '{machine}' in network '{net_name}' ...")
res = ""
# Check if peer is online
ping = network.ping(machine)
if ping is None:
res = "not reachable"
log.info(f"{machine} ({net_name}): {res}")
else:
res = f"reachable, ping: {ping:.2f} ms"
log.info(f"{machine} ({net_name}): {res}")
break
if not found:
msg = f"Machine '{machine}' not found in any network"
raise ClanError(msg)
# Print all results
for result in results:
print(result)
def register_ping_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(

View File

@@ -1,4 +1,5 @@
import argparse
import contextlib
import json
import logging
import textwrap
@@ -9,6 +10,7 @@ from typing import Any, get_args
from clan_lib.cmd import run
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_lib.network.tor.lib import spawn_tor
from clan_lib.nix import nix_shell
from clan_lib.ssh.remote import HostKeyCheck, Remote
@@ -16,7 +18,6 @@ from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
)
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
log = logging.getLogger(__name__)
@@ -27,15 +28,15 @@ class DeployInfo:
@property
def tor(self) -> Remote:
"""Return a list of Remote objects that are configured for Tor."""
addrs = [addr for addr in self.addrs if addr.tor_socks]
"""Return a list of Remote objects that are configured for SOCKS5 proxy."""
addrs = [addr for addr in self.addrs if addr.socks_port]
if not addrs:
msg = "No tor address provided, please provide a tor address."
msg = "No socks5 proxy address provided, please provide a socks5 proxy address."
raise ClanError(msg)
if len(addrs) > 1:
msg = "Multiple tor addresses provided, expected only one."
msg = "Multiple socks5 proxy addresses provided, expected only one."
raise ClanError(msg)
return addrs[0]
@@ -76,7 +77,12 @@ class DeployInfo:
remote = Remote.from_ssh_uri(
machine_name="clan-installer",
address=tor_addr,
).override(host_key_check=host_key_check, tor_socks=True, password=password)
).override(
host_key_check=host_key_check,
socks_port=9050,
socks_wrapper=["torify"],
password=password,
)
addrs.append(remote)
return DeployInfo(addrs=addrs)
@@ -103,7 +109,8 @@ def find_reachable_host(deploy_info: DeployInfo) -> Remote | None:
return deploy_info.addrs[0]
for addr in deploy_info.addrs:
if addr.check_machine_ssh_reachable():
with contextlib.suppress(ClanError):
addr.check_machine_ssh_reachable()
return addr
return None
@@ -129,7 +136,7 @@ def ssh_shell_from_deploy(
log.info("Could not reach host via clearnet 'addrs'")
log.info(f"Trying to reach host via tor '{deploy_info}'")
tor_addrs = [addr for addr in deploy_info.addrs if addr.tor_socks]
tor_addrs = [addr for addr in deploy_info.addrs if addr.socks_port]
if not tor_addrs:
msg = "No tor address provided, please provide a tor address."
raise ClanError(msg)
@@ -137,11 +144,10 @@ def ssh_shell_from_deploy(
with spawn_tor():
for tor_addr in tor_addrs:
log.info(f"Trying to reach host via tor address: {tor_addr}")
if ssh_tor_reachable(
TorTarget(
onion=tor_addr.address, port=tor_addr.port if tor_addr.port else 22
)
):
with contextlib.suppress(ClanError):
tor_addr.check_machine_ssh_reachable()
log.info(
"Host reachable via tor address, starting interactive ssh session."
)

View File

@@ -37,7 +37,7 @@ def test_qrcode_scan(temp_dir: Path) -> None:
tor_host.address
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
)
assert tor_host.tor_socks is True
assert tor_host.socks_port == 9050
assert tor_host.password == "scabbed-defender-headlock"
assert tor_host.user == "root"
assert (
@@ -59,7 +59,7 @@ def test_from_json() -> None:
tor_host.address
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
)
assert tor_host.tor_socks is True
assert tor_host.socks_port == 9050
assert tor_host.password == "scabbed-defender-headlock"
assert tor_host.user == "root"
assert (

View File

@@ -1,233 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import socket
import struct
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from subprocess import Popen
from clan_lib.errors import TorConnectionError, TorSocksError
from clan_lib.nix import nix_shell
log = logging.getLogger(__name__)
@dataclass
class TorTarget:
onion: str
port: int
proxy_host: str = "127.0.0.1"
proxy_port: int = 9050
def connect_to_tor_socks(sock: socket.socket, target: TorTarget) -> socket.socket:
"""
Connects to a .onion host through Tor's SOCKS5 proxy using the standard library.
Args:
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
Returns:
socket.socket: A socket connected to the .onion address via Tor.
"""
try:
# 1. Create a socket to the Tor SOCKS proxy
sock.connect((target.proxy_host, target.proxy_port))
except ConnectionRefusedError as ex:
msg = f"Failed to connect to Tor SOCKS proxy at {target.proxy_host}:{target.proxy_port}: {ex}"
raise TorSocksError(msg) from ex
# 2. SOCKS5 handshake
sock.sendall(
b"\x05\x01\x00"
) # SOCKS version (0x05), number of authentication methods (0x01), no-authentication (0x00)
response = sock.recv(2)
# Validate the SOCKS5 handshake response
if response != b"\x05\x00": # SOCKS version = 0x05, no-authentication = 0x00
msg = f"SOCKS5 handshake failed, unexpected response: {response.hex()}"
raise TorSocksError(msg)
# 3. Connection request
request = (
b"\x05\x01\x00\x03" # SOCKS version, connect command, reserved, address type = domainname
+ bytes([len(target.onion)])
+ target.onion.encode("utf-8") # Add domain name length and domain name
+ struct.pack(">H", target.port) # Add destination port in network byte order
)
sock.sendall(request)
# Read the connection request response
response = sock.recv(10)
if response[1] != 0x00: # 0x00 indicates success
msg = f".onion address not reachable: {response[1]}"
raise TorConnectionError(msg)
return sock
def fetch_onion_content(target: TorTarget) -> str:
"""
Fetches the HTTP response from a .onion service through a Tor SOCKS5 proxy.
Args:
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
Returns:
str: The HTTP response text, or an error message if something goes wrong.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Connect to the .onion service via the SOCKS proxy
sock = connect_to_tor_socks(sock, target)
# 1. Send an HTTP GET request
request = f"GET / HTTP/1.1\r\nHost: {target.onion}\r\nConnection: close\r\n\r\n"
sock.sendall(request.encode("utf-8"))
# 2. Read the HTTP response
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
return response.decode("utf-8", errors="replace")
def is_tor_running() -> bool:
"""Checks if Tor is online."""
try:
tor_online_test()
except TorSocksError:
return False
else:
return True
@contextmanager
def spawn_tor() -> Iterator[None]:
"""
Spawns a Tor process using `nix-shell` if Tor is not already running.
"""
# Check if Tor is already running
if is_tor_running():
log.info("Tor is running")
return
cmd_args = ["tor", "--HardwareAccel", "1"]
packages = ["tor"]
cmd = nix_shell(packages, cmd_args)
process = Popen(cmd)
try:
while not is_tor_running():
log.debug("Waiting for Tor to start...")
time.sleep(0.2)
log.info("Tor is now running")
yield
finally:
log.info("Terminating Tor process...")
process.terminate()
process.wait()
log.info("Tor process terminated")
def tor_online_test() -> bool:
"""
Tests if Tor is online by attempting to fetch content from a known .onion service.
Returns True if successful, False otherwise.
"""
target = TorTarget(
onion="duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion", port=80
)
try:
response = fetch_onion_content(target)
except TorConnectionError:
return False
else:
return "duckduckgo" in response
def ssh_tor_reachable(target: TorTarget) -> bool:
"""
Tests if SSH is reachable via Tor by attempting to connect to a known .onion service.
Returns True if successful, False otherwise.
"""
try:
response = fetch_onion_content(target)
except TorConnectionError:
return False
else:
return "SSH-" in response
def main() -> None:
"""
Main function to handle command-line arguments and execute the script.
"""
parser = argparse.ArgumentParser(
description="Interact with a .onion service through Tor SOCKS5 proxy."
)
parser.add_argument(
"onion_url", type=str, help=".onion URL to connect to (e.g., 'example.onion')"
)
parser.add_argument(
"--port", type=int, help="Port to connect to on the .onion URL (default: 80)"
)
parser.add_argument(
"--proxy-host",
type=str,
default="127.0.0.1",
help="Address of the Tor SOCKS5 proxy (default: 127.0.0.1)",
)
parser.add_argument(
"--proxy-port",
type=int,
default=9050,
help="Port of the Tor SOCKS5 proxy (default: 9050)",
)
parser.add_argument(
"--ssh-tor-reachable",
action="store_true",
help="Test if SSH is reachable via Tor",
)
args = parser.parse_args()
default_port = 22 if args.ssh_tor_reachable else 80
# Create a TorTarget instance
target = TorTarget(
onion=args.onion_url,
port=args.port or default_port,
proxy_host=args.proxy_host,
proxy_port=args.proxy_port,
)
if args.ssh_tor_reachable:
print(f"Testing if SSH is reachable via Tor for {target.onion}...")
reachable = ssh_tor_reachable(target)
print(f"SSH is {'reachable' if reachable else 'not reachable'} via Tor.")
return
print(
f"Connecting to {target.onion} on port {target.port} via proxy {target.proxy_host}:{target.proxy_port}..."
)
try:
response = fetch_onion_content(target)
print("Response:")
print(response)
except TorSocksError:
log.error("Failed to connect to the Tor SOCKS proxy.")
log.error(
"Is Tor running? If not, you can start it by running 'tor' in a nix-shell."
)
except TorConnectionError:
log.error("The onion address is not reachable via Tor.")
if __name__ == "__main__":
main()