clan-cli: Rework 'clan ssh' command, improve Tor support.

This commit is contained in:
Qubasa
2024-12-13 00:11:18 +01:00
parent 74ef015002
commit e490207d12
13 changed files with 418 additions and 337 deletions

View File

@@ -30,7 +30,7 @@ from .flash import cli as flash_cli
from .hyperlink import help_hyperlink from .hyperlink import help_hyperlink
from .machines import cli as machines from .machines import cli as machines
from .profiler import profile from .profiler import profile
from .ssh import cli as ssh_cli from .ssh import deploy_info as ssh_cli
from .vars import cli as vars_cli from .vars import cli as vars_cli
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -432,7 +432,7 @@ def main() -> None:
if debug: if debug:
log.exception("Exited with error") log.exception("Exited with error")
else: else:
log.error("%s", e) # noqa: TRY400 log.error("%s", e)
sys.exit(1) sys.exit(1)
except KeyboardInterrupt as ex: except KeyboardInterrupt as ex:
log.warning("Interrupted by user", exc_info=ex) log.warning("Interrupted by user", exc_info=ex)

View File

@@ -189,3 +189,13 @@ class ClanCmdError(ClanError):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"ClanCmdError({self.cmd})" return f"ClanCmdError({self.cmd})"
class TorSocksError(ClanError):
def __init__(self, msg: str) -> None:
super().__init__(msg)
class TorConnectionError(ClanError):
def __init__(self, msg: str) -> None:
super().__init__(msg)

View File

@@ -25,14 +25,14 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool:
secret_name = secret_fact["name"] secret_name = secret_fact["name"]
if not secret_facts_store.exists(service, secret_name): if not secret_facts_store.exists(service, secret_name):
machine.info( machine.info(
f"Secret fact '{secret_fact}' for service '{service}' in machine {machine.name} is missing." f"Secret fact '{secret_fact}' for service '{service}' is missing."
) )
missing_secret_facts.append((service, secret_name)) missing_secret_facts.append((service, secret_name))
for public_fact in machine.facts_data[service]["public"]: for public_fact in machine.facts_data[service]["public"]:
if not public_facts_store.exists(service, public_fact): if not public_facts_store.exists(service, public_fact):
machine.info( machine.info(
f"Public fact '{public_fact}' for service '{service}' in machine {machine.name} is missing." f"Public fact '{public_fact}' for service '{service}' is missing."
) )
missing_public_facts.append((service, public_fact)) missing_public_facts.append((service, public_fact))

View File

@@ -2,7 +2,6 @@ import argparse
import importlib import importlib
import logging import logging
import os import os
import subprocess
import sys import sys
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path from pathlib import Path
@@ -32,9 +31,9 @@ def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
Read multi-line input from stdin. Read multi-line input from stdin.
""" """
print(prompt, flush=True) print(prompt, flush=True)
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True, check=False) proc = run(["cat"], RunOpts(check=False))
log.info("Input received. Processing...") log.info("Input received. Processing...")
return proc.stdout return proc.stdout.rstrip(os.linesep).rstrip()
def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]: def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]:

View File

@@ -1,6 +1,5 @@
import argparse import argparse
import importlib import importlib
import json
import logging import logging
import os import os
import sys import sys
@@ -21,7 +20,7 @@ from clan_cli.facts.generate import generate_facts
from clan_cli.machines.hardware import HardwareConfig from clan_cli.machines.hardware import HardwareConfig
from clan_cli.machines.machines import Machine from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell from clan_cli.nix import nix_shell
from clan_cli.ssh.cli import is_ipv6, is_reachable, qrcode_scan from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse
from clan_cli.vars.generate import generate_vars from clan_cli.vars.generate import generate_vars
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -36,7 +35,7 @@ class InstallOptions:
kexec: str | None = None kexec: str | None = None
debug: bool = False debug: bool = False
no_reboot: bool = False no_reboot: bool = False
json_ssh_deploy: dict[str, str] | None = None deploy_info: DeployInfo | None = None
build_on_remote: bool = False build_on_remote: bool = False
nix_options: list[str] = field(default_factory=list) nix_options: list[str] = field(default_factory=list)
update_hardware_config: HardwareConfig = HardwareConfig.NONE update_hardware_config: HardwareConfig = HardwareConfig.NONE
@@ -45,6 +44,12 @@ class InstallOptions:
@API.register @API.register
def install_machine(opts: InstallOptions) -> None: def install_machine(opts: InstallOptions) -> None:
# TODO: Fixme, replace opts.machine and opts.flake with machine object
# remove opts.deploy_info, opts.target_host and populate machine object
if opts.deploy_info:
msg = "Deploy info has not been fully implemented yet"
raise NotImplementedError(msg)
machine = Machine(opts.machine, flake=opts.flake) machine = Machine(opts.machine, flake=opts.flake)
machine.override_target_host = opts.target_host machine.override_target_host = opts.target_host
@@ -129,21 +134,15 @@ def install_command(args: argparse.Namespace) -> None:
if args.flake is None: if args.flake is None:
msg = "Could not find clan flake toplevel directory" msg = "Could not find clan flake toplevel directory"
raise ClanError(msg) raise ClanError(msg)
json_ssh_deploy = None deploy_info: DeployInfo | None = ssh_command_parse(args)
if args.json:
json_file = Path(args.json)
if json_file.is_file():
json_ssh_deploy = json.loads(json_file.read_text())
else:
json_ssh_deploy = json.loads(args.json)
elif args.png:
json_ssh_deploy = json.loads(qrcode_scan(args.png))
if json_ssh_deploy: if deploy_info:
target_host = ( host = find_reachable_host(deploy_info)
f"root@{find_reachable_host_from_deploy_json(json_ssh_deploy)}" if host is None:
) msg = f"Couldn't reach any host address: {deploy_info.addrs}"
password = json_ssh_deploy["pass"] raise ClanError(msg)
target_host = host.target
password = deploy_info.pwd
elif args.target_host: elif args.target_host:
target_host = args.target_host target_host = args.target_host
password = None password = None
@@ -174,7 +173,7 @@ def install_command(args: argparse.Namespace) -> None:
kexec=args.kexec, kexec=args.kexec,
debug=args.debug, debug=args.debug,
no_reboot=args.no_reboot, no_reboot=args.no_reboot,
json_ssh_deploy=json_ssh_deploy, deploy_info=deploy_info,
nix_options=args.option, nix_options=args.option,
build_on_remote=args.build_on_remote, build_on_remote=args.build_on_remote,
update_hardware_config=HardwareConfig(args.update_hardware_config), update_hardware_config=HardwareConfig(args.update_hardware_config),
@@ -186,22 +185,6 @@ def install_command(args: argparse.Namespace) -> None:
sys.exit(1) sys.exit(1)
def find_reachable_host_from_deploy_json(deploy_json: dict[str, str]) -> str:
host = None
for addr in deploy_json["addrs"]:
if is_reachable(addr):
host = f"[{addr}]" if is_ipv6(addr) else addr
break
if not host:
msg = f"""
Could not reach any of the host addresses provided in the json string.
Please doublecheck if they are reachable from your machine.
Try `ping [ADDR]` with one of the addresses: {deploy_json['addrs']}
"""
raise ClanError(msg)
return host
def register_install_parser(parser: argparse.ArgumentParser) -> None: def register_install_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument( parser.add_argument(
"--kexec", "--kexec",

View File

@@ -97,7 +97,11 @@ def nix_metadata(flake_url: str | Path) -> dict[str, Any]:
def nix_shell(packages: list[str], cmd: list[str]) -> list[str]: def nix_shell(packages: list[str], cmd: list[str]) -> list[str]:
# we cannot use nix-shell inside the nix sandbox # we cannot use nix-shell inside the nix sandbox
# in our tests we just make sure we have all the packages # in our tests we just make sure we have all the packages
if os.environ.get("IN_NIX_SANDBOX") or os.environ.get("CLAN_NO_DYNAMIC_DEPS"): if (
os.environ.get("IN_NIX_SANDBOX")
or os.environ.get("CLAN_NO_DYNAMIC_DEPS")
or len(packages) == 0
):
return cmd return cmd
return [ return [
*nix_command(["shell", "--inputs-from", f"{nixpkgs_flake()!s}"]), *nix_command(["shell", "--inputs-from", f"{nixpkgs_flake()!s}"]),

View File

@@ -77,7 +77,7 @@ class KeyType(enum.Enum):
except FileNotFoundError: except FileNotFoundError:
return return
except Exception as ex: except Exception as ex:
log.warning(f"Could not read age keys from {key_path}: {ex}") log.warning(f"Could not read age keys from {key_path}", exc_info=ex)
# Sops will try every location, see age/keysource.go # Sops will try every location, see age/keysource.go
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"): if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
@@ -246,14 +246,10 @@ def sops_run(
def get_public_age_key(privkey: str) -> str: def get_public_age_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"]) cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try:
res = subprocess.run( error_msg = "Failed to get public key for age private key. Is the key malformed?"
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True res = run(cmd, RunOpts(input=privkey.encode(), error_msg=error_msg))
) return res.stdout.rstrip(os.linesep).rstrip()
except subprocess.CalledProcessError as e:
msg = "Failed to get public key for age private key. Is the key malformed?"
raise ClanError(msg) from e
return res.stdout.strip()
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]: def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:

View File

@@ -1,137 +0,0 @@
import argparse
import ipaddress
import json
import logging
import socket
import subprocess
from pathlib import Path
from clan_cli.nix import nix_shell
log = logging.getLogger(__name__)
def ssh(
host: str,
user: str = "root",
password: str | None = None,
ssh_args: list[str] | None = None,
torify: bool = False,
) -> None:
if ssh_args is None:
ssh_args = []
packages = ["nixpkgs#openssh"]
if torify:
packages.append("nixpkgs#tor")
password_args = []
if password:
packages.append("nixpkgs#sshpass")
password_args = [
"sshpass",
"-p",
password,
]
_ssh_args = [
*ssh_args,
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
]
if password:
_ssh_args.append("-o")
_ssh_args.append("IdentitiesOnly=yes")
cmd_args = [*password_args, *_ssh_args]
if torify:
cmd_args.insert(0, "torify")
cmd = nix_shell(packages, cmd_args)
subprocess.run(cmd, check=True)
def qrcode_scan(picture_file: str) -> str:
return (
subprocess.run(
nix_shell(
["nixpkgs#zbar"],
[
"zbarimg",
"--quiet",
"--raw",
picture_file,
],
),
stdout=subprocess.PIPE,
check=True,
)
.stdout.decode()
.strip()
)
def is_reachable(host: str) -> bool:
sock = socket.socket(
socket.AF_INET6 if ":" in host else socket.AF_INET, socket.SOCK_STREAM
)
sock.settimeout(2)
try:
sock.connect((host, 22))
sock.close()
except OSError:
return False
else:
return True
def connect_ssh_from_json(ssh_data: dict[str, str]) -> None:
for address in ssh_data["addrs"]:
log.debug(f"Trying to reach host on: {address}")
if is_reachable(address):
ssh(host=address, password=ssh_data["pass"])
exit(0)
else:
log.debug(f"Could not reach host on {address}")
log.debug(f'Trying to reach host via torify on {ssh_data["tor"]}')
ssh(host=ssh_data["tor"], password=ssh_data["pass"], torify=True)
def is_ipv6(ip: str) -> bool:
try:
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
except ValueError:
return False
def main(args: argparse.Namespace) -> None:
if args.json:
json_file = Path(args.json)
if json_file.is_file():
ssh_data = json.loads(json_file.read_text())
else:
ssh_data = json.loads(args.json)
connect_ssh_from_json(ssh_data)
elif args.png:
ssh_data = json.loads(qrcode_scan(args.png))
connect_ssh_from_json(ssh_data)
def register_parser(parser: argparse.ArgumentParser) -> None:
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-j",
"--json",
help="specify the json file for ssh data (generated by starting the clan installer)",
)
group.add_argument(
"-P",
"--png",
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
)
# TODO pass all args we don't parse into ssh_args, currently it fails if arg starts with -
parser.add_argument("ssh_args", nargs="*", default=[])
parser.set_defaults(func=main)

View File

@@ -0,0 +1,127 @@
import argparse
import ipaddress
import json
import logging
import shlex
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from clan_cli.cmd import run
from clan_cli.errors import ClanError, TorConnectionError, TorSocksError
from clan_cli.nix import nix_shell
from clan_cli.ssh.host import Host, is_ssh_reachable
from clan_cli.ssh.tor import TorTarget, ssh_tor_reachable
log = logging.getLogger(__name__)
@dataclass
class DeployInfo:
addrs: list[str]
tor: str | None = None
pwd: str | None = None
@staticmethod
def from_json(data: dict[str, Any]) -> "DeployInfo":
return DeployInfo(tor=data["tor"], pwd=data["pass"], addrs=data["addrs"])
def is_ipv6(ip: str) -> bool:
try:
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
except ValueError:
return False
def find_reachable_host(deploy_info: DeployInfo) -> Host | None:
host = None
for addr in deploy_info.addrs:
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
host = Host(host=host_addr)
if is_ssh_reachable(host):
break
return host
def qrcode_scan(picture_file: Path) -> str:
cmd = nix_shell(
["nixpkgs#zbar"],
[
"zbarimg",
"--quiet",
"--raw",
str(picture_file),
],
)
res = run(cmd)
return res.stdout.strip()
def parse_qr_code(picture_file: Path) -> DeployInfo:
data = qrcode_scan(picture_file)
return DeployInfo.from_json(json.loads(data))
def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None:
if host := find_reachable_host(deploy_info):
host.connect_ssh_shell(password=deploy_info.pwd)
else:
log.info("Could not reach host via clearnet 'addrs'")
log.info(f"Trying to reach host via tor '{deploy_info.tor}'")
if not deploy_info.tor:
msg = "No tor address provided, please provide a tor address."
raise ClanError(msg)
if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)):
host = Host(host=deploy_info.tor)
host.connect_ssh_shell(password=deploy_info.pwd, tor_socks=True)
else:
msg = "Could not reach host via tor either."
raise ClanError(msg)
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
if args.json:
json_file = Path(args.json)
if json_file.is_file():
data = json.loads(json_file.read_text())
return DeployInfo.from_json(data)
data = json.loads(args.json)
return DeployInfo.from_json(data)
if args.png:
return parse_qr_code(Path(args.png))
return None
def ssh_command(args: argparse.Namespace) -> None:
deploy_info = ssh_command_parse(args)
if not deploy_info:
msg = "No --json or --png data provided"
raise ClanError(msg)
try:
ssh_shell_from_deploy(deploy_info)
except TorSocksError as ex:
log.error(ex)
tor_cmd = nix_shell(["nixpkgs#tor"], ["tor"])
log.error("Is Tor running? If not, you can start it by running:")
log.error(f"{' '.join(shlex.quote(arg) for arg in tor_cmd)}")
except TorConnectionError:
log.error("The onion address is not reachable via Tor.")
def register_parser(parser: argparse.ArgumentParser) -> None:
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-j",
"--json",
help="specify the json file for ssh data (generated by starting the clan installer)",
)
group.add_argument(
"-P",
"--png",
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
)
parser.add_argument(
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
)
parser.set_defaults(func=ssh_command)

View File

@@ -3,6 +3,8 @@
import logging import logging
import os import os
import shlex import shlex
import socket
import subprocess
from dataclasses import dataclass, field from dataclasses import dataclass, field
from shlex import quote from shlex import quote
from typing import Any from typing import Any
@@ -10,6 +12,7 @@ from typing import Any
from clan_cli.cmd import CmdOut, RunOpts, run from clan_cli.cmd import CmdOut, RunOpts, run
from clan_cli.colors import AnsiColor from clan_cli.colors import AnsiColor
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
from clan_cli.nix import nix_shell
from clan_cli.ssh.host_key import HostKeyCheck from clan_cli.ssh.host_key import HostKeyCheck
cmdlog = logging.getLogger(__name__) cmdlog = logging.getLogger(__name__)
@@ -40,13 +43,6 @@ class Host:
def target(self) -> str: def target(self) -> str:
return f"{self.user or 'root'}@{self.host}" return f"{self.user or 'root'}@{self.host}"
@property
def target_for_rsync(self) -> str:
host = self.host
if ":" in host:
host = f"[{host}]"
return f"{self.user or 'root'}@{host}"
def run_local( def run_local(
self, self,
cmd: list[str], cmd: list[str],
@@ -163,8 +159,20 @@ class Host:
def ssh_cmd( def ssh_cmd(
self, self,
verbose_ssh: bool = False, verbose_ssh: bool = False,
tor_socks: bool = False,
tty: bool = False, tty: bool = False,
password: str | None = None,
) -> list[str]: ) -> list[str]:
packages = []
password_args = []
if password:
packages.append("nixpkgs#sshpass")
password_args = [
"sshpass",
"-p",
password,
]
ssh_opts = self.ssh_cmd_opts ssh_opts = self.ssh_cmd_opts
if verbose_ssh or self.verbose_ssh: if verbose_ssh or self.verbose_ssh:
ssh_opts.extend(["-v"]) ssh_opts.extend(["-v"])
@@ -176,8 +184,36 @@ class Host:
if self.key: if self.key:
ssh_opts.extend(["-i", self.key]) ssh_opts.extend(["-i", self.key])
return [ if tor_socks:
ssh_opts.append("-o")
ssh_opts.append("ProxyCommand=nc -x 127.0.0.1:9050 -X 5 %h %p")
cmd = [
*password_args,
"ssh", "ssh",
self.target, self.target,
*ssh_opts, *ssh_opts,
] ]
return nix_shell(packages, cmd)
def connect_ssh_shell(
self, *, password: str | None = None, tor_socks: bool = False
) -> None:
cmd = self.ssh_cmd(tor_socks=tor_socks, password=password)
subprocess.run(cmd)
def is_ssh_reachable(host: Host) -> bool:
sock = socket.socket(
socket.AF_INET6 if ":" in host.host else socket.AF_INET, socket.SOCK_STREAM
)
sock.settimeout(2)
try:
sock.connect((host.host, host.port or 22))
sock.close()
except OSError:
return False
else:
return True

203
pkgs/clan-cli/clan_cli/ssh/tor.py Executable file
View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
import argparse
import logging
import socket
import struct
from dataclasses import dataclass
from clan_cli.cmd import run
from clan_cli.errors import TorConnectionError, TorSocksError
from clan_cli.nix import nix_shell
log = logging.getLogger(__name__)
@dataclass
class TorTarget:
onion: str
port: int
proxy_host: str = "127.0.0.1"
proxy_port: int = 9050
def connect_to_tor_socks(sock: socket.socket, target: TorTarget) -> socket.socket:
"""
Connects to a .onion host through Tor's SOCKS5 proxy using the standard library.
Args:
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
Returns:
socket.socket: A socket connected to the .onion address via Tor.
"""
try:
# 1. Create a socket to the Tor SOCKS proxy
sock.connect((target.proxy_host, target.proxy_port))
except ConnectionRefusedError as ex:
msg = f"Failed to connect to Tor SOCKS proxy at {target.proxy_host}:{target.proxy_port}: {ex}"
raise TorSocksError(msg) from ex
# 2. SOCKS5 handshake
sock.sendall(
b"\x05\x01\x00"
) # SOCKS version (0x05), number of authentication methods (0x01), no-authentication (0x00)
response = sock.recv(2)
# Validate the SOCKS5 handshake response
if response != b"\x05\x00": # SOCKS version = 0x05, no-authentication = 0x00
msg = f"SOCKS5 handshake failed, unexpected response: {response.hex()}"
raise TorSocksError(msg)
# 3. Connection request
request = (
b"\x05\x01\x00\x03" # SOCKS version, connect command, reserved, address type = domainname
+ bytes([len(target.onion)])
+ target.onion.encode("utf-8") # Add domain name length and domain name
+ struct.pack(">H", target.port) # Add destination port in network byte order
)
sock.sendall(request)
# Read the connection request response
response = sock.recv(10)
if response[1] != 0x00: # 0x00 indicates success
msg = f".onion address not reachable: {response[1]}"
raise TorConnectionError(msg)
return sock
def fetch_onion_content(target: TorTarget) -> str:
"""
Fetches the HTTP response from a .onion service through a Tor SOCKS5 proxy.
Args:
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
Returns:
str: The HTTP response text, or an error message if something goes wrong.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Connect to the .onion service via the SOCKS proxy
sock = connect_to_tor_socks(sock, target)
# 1. Send an HTTP GET request
request = f"GET / HTTP/1.1\r\nHost: {target.onion}\r\nConnection: close\r\n\r\n"
sock.sendall(request.encode("utf-8"))
# 2. Read the HTTP response
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
return response.decode("utf-8", errors="replace")
def spawn_tor() -> None:
"""
Spawns a Tor process using `nix-shell`.
"""
cmd_args = ["tor"]
packages = ["nixpkgs#tor"]
cmd = nix_shell(packages, cmd_args)
run(cmd)
def tor_online_test() -> bool:
"""
Tests if Tor is online by attempting to fetch content from a known .onion service.
Returns True if successful, False otherwise.
"""
target = TorTarget(
onion="duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion", port=80
)
try:
response = fetch_onion_content(target)
except TorConnectionError:
return False
else:
return "duckduckgo" in response
def ssh_tor_reachable(target: TorTarget) -> bool:
"""
Tests if SSH is reachable via Tor by attempting to connect to a known .onion service.
Returns True if successful, False otherwise.
"""
try:
response = fetch_onion_content(target)
except TorConnectionError:
return False
else:
return "SSH-" in response
def main() -> None:
"""
Main function to handle command-line arguments and execute the script.
"""
parser = argparse.ArgumentParser(
description="Interact with a .onion service through Tor SOCKS5 proxy."
)
parser.add_argument(
"onion_url", type=str, help=".onion URL to connect to (e.g., 'example.onion')"
)
parser.add_argument(
"--port", type=int, help="Port to connect to on the .onion URL (default: 80)"
)
parser.add_argument(
"--proxy-host",
type=str,
default="127.0.0.1",
help="Address of the Tor SOCKS5 proxy (default: 127.0.0.1)",
)
parser.add_argument(
"--proxy-port",
type=int,
default=9050,
help="Port of the Tor SOCKS5 proxy (default: 9050)",
)
parser.add_argument(
"--ssh-tor-reachable",
action="store_true",
help="Test if SSH is reachable via Tor",
)
args = parser.parse_args()
default_port = 22 if args.ssh_tor_reachable else 80
# Create a TorTarget instance
target = TorTarget(
onion=args.onion_url,
port=args.port or default_port,
proxy_host=args.proxy_host,
proxy_port=args.proxy_port,
)
if args.ssh_tor_reachable:
print(f"Testing if SSH is reachable via Tor for {target.onion}...")
reachable = ssh_tor_reachable(target)
print(f"SSH is {'reachable' if reachable else 'not reachable'} via Tor.")
return
print(
f"Connecting to {target.onion} on port {target.port} via proxy {target.proxy_host}:{target.proxy_port}..."
)
try:
response = fetch_onion_content(target)
print("Response:")
print(response)
except TorSocksError:
log.error("Failed to connect to the Tor SOCKS proxy.")
log.error(
"Is Tor running? If not, you can start it by running 'tor' in a nix-shell."
)
except TorConnectionError:
log.error("The onion address is not reachable via Tor.")
if __name__ == "__main__":
main()

View File

@@ -1,141 +0,0 @@
import os
import sys
import clan_cli
import pytest
import pytest_subprocess.fake_process
from clan_cli.ssh import cli
from pytest_subprocess import utils
from stdout import CaptureOutput
def test_no_args(
monkeypatch: pytest.MonkeyPatch,
capture_output: CaptureOutput,
) -> None:
monkeypatch.setattr(sys, "argv", ["", "ssh"])
with capture_output as output, pytest.raises(SystemExit):
clan_cli.main()
assert output.err.startswith("usage:")
# using fp fixture from pytest-subprocess
def test_ssh_no_pass(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
)
assert fp.call_count(cmd) == 1
def test_ssh_with_pass(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"sshpass",
"-p",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
password="XXX",
)
assert fp.call_count(cmd) == 1
def test_ssh_no_pass_with_torify(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"torify",
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
torify=True,
)
assert fp.call_count(cmd) == 1
def test_ssh_with_pass_with_torify(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"torify",
"sshpass",
"-p",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
password="XXX",
torify=True,
)
assert fp.call_count(cmd) == 1
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
cmd: list[str | utils.Any] = [fp.any()]
fp.register(cmd, stdout="https://test.test")
result = cli.qrcode_scan("test.png")
assert result == "https://test.test"

View File

@@ -48,6 +48,7 @@ lint.ignore = [
"A003", "A003",
"ANN101", "ANN101",
"ANN401", "ANN401",
"TRY400",
"E402", "E402",
"E501", "E501",
"E731", "E731",