Merge pull request 'clan-cli: Rework 'clan ssh' command, improve Tor support.' (#2604) from Qubasa/clan-core:Qubasa-main into main
This commit is contained in:
@@ -30,7 +30,7 @@ from .flash import cli as flash_cli
|
||||
from .hyperlink import help_hyperlink
|
||||
from .machines import cli as machines
|
||||
from .profiler import profile
|
||||
from .ssh import cli as ssh_cli
|
||||
from .ssh import deploy_info as ssh_cli
|
||||
from .vars import cli as vars_cli
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -432,7 +432,7 @@ def main() -> None:
|
||||
if debug:
|
||||
log.exception("Exited with error")
|
||||
else:
|
||||
log.error("%s", e) # noqa: TRY400
|
||||
log.error("%s", e)
|
||||
sys.exit(1)
|
||||
except KeyboardInterrupt as ex:
|
||||
log.warning("Interrupted by user", exc_info=ex)
|
||||
|
||||
@@ -189,3 +189,13 @@ class ClanCmdError(ClanError):
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"ClanCmdError({self.cmd})"
|
||||
|
||||
|
||||
class TorSocksError(ClanError):
|
||||
def __init__(self, msg: str) -> None:
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class TorConnectionError(ClanError):
|
||||
def __init__(self, msg: str) -> None:
|
||||
super().__init__(msg)
|
||||
|
||||
@@ -25,14 +25,14 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool:
|
||||
secret_name = secret_fact["name"]
|
||||
if not secret_facts_store.exists(service, secret_name):
|
||||
machine.info(
|
||||
f"Secret fact '{secret_fact}' for service '{service}' in machine {machine.name} is missing."
|
||||
f"Secret fact '{secret_fact}' for service '{service}' is missing."
|
||||
)
|
||||
missing_secret_facts.append((service, secret_name))
|
||||
|
||||
for public_fact in machine.facts_data[service]["public"]:
|
||||
if not public_facts_store.exists(service, public_fact):
|
||||
machine.info(
|
||||
f"Public fact '{public_fact}' for service '{service}' in machine {machine.name} is missing."
|
||||
f"Public fact '{public_fact}' for service '{service}' is missing."
|
||||
)
|
||||
missing_public_facts.append((service, public_fact))
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@ import argparse
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
from pathlib import Path
|
||||
@@ -32,9 +31,9 @@ def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
|
||||
Read multi-line input from stdin.
|
||||
"""
|
||||
print(prompt, flush=True)
|
||||
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True, check=False)
|
||||
proc = run(["cat"], RunOpts(check=False))
|
||||
log.info("Input received. Processing...")
|
||||
return proc.stdout
|
||||
return proc.stdout.rstrip(os.linesep).rstrip()
|
||||
|
||||
|
||||
def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import argparse
|
||||
import importlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
@@ -21,7 +20,7 @@ from clan_cli.facts.generate import generate_facts
|
||||
from clan_cli.machines.hardware import HardwareConfig
|
||||
from clan_cli.machines.machines import Machine
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.ssh.cli import is_ipv6, is_reachable, qrcode_scan
|
||||
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse
|
||||
from clan_cli.vars.generate import generate_vars
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -36,7 +35,7 @@ class InstallOptions:
|
||||
kexec: str | None = None
|
||||
debug: bool = False
|
||||
no_reboot: bool = False
|
||||
json_ssh_deploy: dict[str, str] | None = None
|
||||
deploy_info: DeployInfo | None = None
|
||||
build_on_remote: bool = False
|
||||
nix_options: list[str] = field(default_factory=list)
|
||||
update_hardware_config: HardwareConfig = HardwareConfig.NONE
|
||||
@@ -45,6 +44,12 @@ class InstallOptions:
|
||||
|
||||
@API.register
|
||||
def install_machine(opts: InstallOptions) -> None:
|
||||
# TODO: Fixme, replace opts.machine and opts.flake with machine object
|
||||
# remove opts.deploy_info, opts.target_host and populate machine object
|
||||
if opts.deploy_info:
|
||||
msg = "Deploy info has not been fully implemented yet"
|
||||
raise NotImplementedError(msg)
|
||||
|
||||
machine = Machine(opts.machine, flake=opts.flake)
|
||||
machine.override_target_host = opts.target_host
|
||||
|
||||
@@ -129,21 +134,15 @@ def install_command(args: argparse.Namespace) -> None:
|
||||
if args.flake is None:
|
||||
msg = "Could not find clan flake toplevel directory"
|
||||
raise ClanError(msg)
|
||||
json_ssh_deploy = None
|
||||
if args.json:
|
||||
json_file = Path(args.json)
|
||||
if json_file.is_file():
|
||||
json_ssh_deploy = json.loads(json_file.read_text())
|
||||
else:
|
||||
json_ssh_deploy = json.loads(args.json)
|
||||
elif args.png:
|
||||
json_ssh_deploy = json.loads(qrcode_scan(args.png))
|
||||
deploy_info: DeployInfo | None = ssh_command_parse(args)
|
||||
|
||||
if json_ssh_deploy:
|
||||
target_host = (
|
||||
f"root@{find_reachable_host_from_deploy_json(json_ssh_deploy)}"
|
||||
)
|
||||
password = json_ssh_deploy["pass"]
|
||||
if deploy_info:
|
||||
host = find_reachable_host(deploy_info)
|
||||
if host is None:
|
||||
msg = f"Couldn't reach any host address: {deploy_info.addrs}"
|
||||
raise ClanError(msg)
|
||||
target_host = host.target
|
||||
password = deploy_info.pwd
|
||||
elif args.target_host:
|
||||
target_host = args.target_host
|
||||
password = None
|
||||
@@ -174,7 +173,7 @@ def install_command(args: argparse.Namespace) -> None:
|
||||
kexec=args.kexec,
|
||||
debug=args.debug,
|
||||
no_reboot=args.no_reboot,
|
||||
json_ssh_deploy=json_ssh_deploy,
|
||||
deploy_info=deploy_info,
|
||||
nix_options=args.option,
|
||||
build_on_remote=args.build_on_remote,
|
||||
update_hardware_config=HardwareConfig(args.update_hardware_config),
|
||||
@@ -186,22 +185,6 @@ def install_command(args: argparse.Namespace) -> None:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def find_reachable_host_from_deploy_json(deploy_json: dict[str, str]) -> str:
|
||||
host = None
|
||||
for addr in deploy_json["addrs"]:
|
||||
if is_reachable(addr):
|
||||
host = f"[{addr}]" if is_ipv6(addr) else addr
|
||||
break
|
||||
if not host:
|
||||
msg = f"""
|
||||
Could not reach any of the host addresses provided in the json string.
|
||||
Please doublecheck if they are reachable from your machine.
|
||||
Try `ping [ADDR]` with one of the addresses: {deploy_json['addrs']}
|
||||
"""
|
||||
raise ClanError(msg)
|
||||
return host
|
||||
|
||||
|
||||
def register_install_parser(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument(
|
||||
"--kexec",
|
||||
|
||||
@@ -97,7 +97,11 @@ def nix_metadata(flake_url: str | Path) -> dict[str, Any]:
|
||||
def nix_shell(packages: list[str], cmd: list[str]) -> list[str]:
|
||||
# we cannot use nix-shell inside the nix sandbox
|
||||
# in our tests we just make sure we have all the packages
|
||||
if os.environ.get("IN_NIX_SANDBOX") or os.environ.get("CLAN_NO_DYNAMIC_DEPS"):
|
||||
if (
|
||||
os.environ.get("IN_NIX_SANDBOX")
|
||||
or os.environ.get("CLAN_NO_DYNAMIC_DEPS")
|
||||
or len(packages) == 0
|
||||
):
|
||||
return cmd
|
||||
return [
|
||||
*nix_command(["shell", "--inputs-from", f"{nixpkgs_flake()!s}"]),
|
||||
|
||||
@@ -77,7 +77,7 @@ class KeyType(enum.Enum):
|
||||
except FileNotFoundError:
|
||||
return
|
||||
except Exception as ex:
|
||||
log.warning(f"Could not read age keys from {key_path}: {ex}")
|
||||
log.warning(f"Could not read age keys from {key_path}", exc_info=ex)
|
||||
|
||||
# Sops will try every location, see age/keysource.go
|
||||
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
|
||||
@@ -246,14 +246,10 @@ def sops_run(
|
||||
|
||||
def get_public_age_key(privkey: str) -> str:
|
||||
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
|
||||
try:
|
||||
res = subprocess.run(
|
||||
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True
|
||||
)
|
||||
except subprocess.CalledProcessError as e:
|
||||
msg = "Failed to get public key for age private key. Is the key malformed?"
|
||||
raise ClanError(msg) from e
|
||||
return res.stdout.strip()
|
||||
|
||||
error_msg = "Failed to get public key for age private key. Is the key malformed?"
|
||||
res = run(cmd, RunOpts(input=privkey.encode(), error_msg=error_msg))
|
||||
return res.stdout.rstrip(os.linesep).rstrip()
|
||||
|
||||
|
||||
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:
|
||||
|
||||
@@ -1,137 +0,0 @@
|
||||
import argparse
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def ssh(
|
||||
host: str,
|
||||
user: str = "root",
|
||||
password: str | None = None,
|
||||
ssh_args: list[str] | None = None,
|
||||
torify: bool = False,
|
||||
) -> None:
|
||||
if ssh_args is None:
|
||||
ssh_args = []
|
||||
packages = ["nixpkgs#openssh"]
|
||||
if torify:
|
||||
packages.append("nixpkgs#tor")
|
||||
|
||||
password_args = []
|
||||
if password:
|
||||
packages.append("nixpkgs#sshpass")
|
||||
password_args = [
|
||||
"sshpass",
|
||||
"-p",
|
||||
password,
|
||||
]
|
||||
_ssh_args = [
|
||||
*ssh_args,
|
||||
"ssh",
|
||||
"-o",
|
||||
"UserKnownHostsFile=/dev/null",
|
||||
"-o",
|
||||
"StrictHostKeyChecking=no",
|
||||
f"{user}@{host}",
|
||||
]
|
||||
|
||||
if password:
|
||||
_ssh_args.append("-o")
|
||||
_ssh_args.append("IdentitiesOnly=yes")
|
||||
|
||||
cmd_args = [*password_args, *_ssh_args]
|
||||
if torify:
|
||||
cmd_args.insert(0, "torify")
|
||||
|
||||
cmd = nix_shell(packages, cmd_args)
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
|
||||
def qrcode_scan(picture_file: str) -> str:
|
||||
return (
|
||||
subprocess.run(
|
||||
nix_shell(
|
||||
["nixpkgs#zbar"],
|
||||
[
|
||||
"zbarimg",
|
||||
"--quiet",
|
||||
"--raw",
|
||||
picture_file,
|
||||
],
|
||||
),
|
||||
stdout=subprocess.PIPE,
|
||||
check=True,
|
||||
)
|
||||
.stdout.decode()
|
||||
.strip()
|
||||
)
|
||||
|
||||
|
||||
def is_reachable(host: str) -> bool:
|
||||
sock = socket.socket(
|
||||
socket.AF_INET6 if ":" in host else socket.AF_INET, socket.SOCK_STREAM
|
||||
)
|
||||
sock.settimeout(2)
|
||||
try:
|
||||
sock.connect((host, 22))
|
||||
sock.close()
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def connect_ssh_from_json(ssh_data: dict[str, str]) -> None:
|
||||
for address in ssh_data["addrs"]:
|
||||
log.debug(f"Trying to reach host on: {address}")
|
||||
if is_reachable(address):
|
||||
ssh(host=address, password=ssh_data["pass"])
|
||||
exit(0)
|
||||
else:
|
||||
log.debug(f"Could not reach host on {address}")
|
||||
log.debug(f'Trying to reach host via torify on {ssh_data["tor"]}')
|
||||
ssh(host=ssh_data["tor"], password=ssh_data["pass"], torify=True)
|
||||
|
||||
|
||||
def is_ipv6(ip: str) -> bool:
|
||||
try:
|
||||
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def main(args: argparse.Namespace) -> None:
|
||||
if args.json:
|
||||
json_file = Path(args.json)
|
||||
if json_file.is_file():
|
||||
ssh_data = json.loads(json_file.read_text())
|
||||
else:
|
||||
ssh_data = json.loads(args.json)
|
||||
connect_ssh_from_json(ssh_data)
|
||||
elif args.png:
|
||||
ssh_data = json.loads(qrcode_scan(args.png))
|
||||
connect_ssh_from_json(ssh_data)
|
||||
|
||||
|
||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument(
|
||||
"-j",
|
||||
"--json",
|
||||
help="specify the json file for ssh data (generated by starting the clan installer)",
|
||||
)
|
||||
group.add_argument(
|
||||
"-P",
|
||||
"--png",
|
||||
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
|
||||
)
|
||||
# TODO pass all args we don't parse into ssh_args, currently it fails if arg starts with -
|
||||
parser.add_argument("ssh_args", nargs="*", default=[])
|
||||
parser.set_defaults(func=main)
|
||||
127
pkgs/clan-cli/clan_cli/ssh/deploy_info.py
Normal file
127
pkgs/clan-cli/clan_cli/ssh/deploy_info.py
Normal file
@@ -0,0 +1,127 @@
|
||||
import argparse
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
import shlex
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.errors import ClanError, TorConnectionError, TorSocksError
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.ssh.host import Host, is_ssh_reachable
|
||||
from clan_cli.ssh.tor import TorTarget, ssh_tor_reachable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DeployInfo:
|
||||
addrs: list[str]
|
||||
tor: str | None = None
|
||||
pwd: str | None = None
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: dict[str, Any]) -> "DeployInfo":
|
||||
return DeployInfo(tor=data["tor"], pwd=data["pass"], addrs=data["addrs"])
|
||||
|
||||
|
||||
def is_ipv6(ip: str) -> bool:
|
||||
try:
|
||||
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def find_reachable_host(deploy_info: DeployInfo) -> Host | None:
|
||||
host = None
|
||||
for addr in deploy_info.addrs:
|
||||
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
|
||||
host = Host(host=host_addr)
|
||||
if is_ssh_reachable(host):
|
||||
break
|
||||
return host
|
||||
|
||||
|
||||
def qrcode_scan(picture_file: Path) -> str:
|
||||
cmd = nix_shell(
|
||||
["nixpkgs#zbar"],
|
||||
[
|
||||
"zbarimg",
|
||||
"--quiet",
|
||||
"--raw",
|
||||
str(picture_file),
|
||||
],
|
||||
)
|
||||
res = run(cmd)
|
||||
return res.stdout.strip()
|
||||
|
||||
|
||||
def parse_qr_code(picture_file: Path) -> DeployInfo:
|
||||
data = qrcode_scan(picture_file)
|
||||
return DeployInfo.from_json(json.loads(data))
|
||||
|
||||
|
||||
def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None:
|
||||
if host := find_reachable_host(deploy_info):
|
||||
host.connect_ssh_shell(password=deploy_info.pwd)
|
||||
else:
|
||||
log.info("Could not reach host via clearnet 'addrs'")
|
||||
log.info(f"Trying to reach host via tor '{deploy_info.tor}'")
|
||||
if not deploy_info.tor:
|
||||
msg = "No tor address provided, please provide a tor address."
|
||||
raise ClanError(msg)
|
||||
if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)):
|
||||
host = Host(host=deploy_info.tor)
|
||||
host.connect_ssh_shell(password=deploy_info.pwd, tor_socks=True)
|
||||
else:
|
||||
msg = "Could not reach host via tor either."
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
|
||||
if args.json:
|
||||
json_file = Path(args.json)
|
||||
if json_file.is_file():
|
||||
data = json.loads(json_file.read_text())
|
||||
return DeployInfo.from_json(data)
|
||||
data = json.loads(args.json)
|
||||
return DeployInfo.from_json(data)
|
||||
if args.png:
|
||||
return parse_qr_code(Path(args.png))
|
||||
return None
|
||||
|
||||
|
||||
def ssh_command(args: argparse.Namespace) -> None:
|
||||
deploy_info = ssh_command_parse(args)
|
||||
if not deploy_info:
|
||||
msg = "No --json or --png data provided"
|
||||
raise ClanError(msg)
|
||||
try:
|
||||
ssh_shell_from_deploy(deploy_info)
|
||||
except TorSocksError as ex:
|
||||
log.error(ex)
|
||||
tor_cmd = nix_shell(["nixpkgs#tor"], ["tor"])
|
||||
log.error("Is Tor running? If not, you can start it by running:")
|
||||
log.error(f"{' '.join(shlex.quote(arg) for arg in tor_cmd)}")
|
||||
except TorConnectionError:
|
||||
log.error("The onion address is not reachable via Tor.")
|
||||
|
||||
|
||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument(
|
||||
"-j",
|
||||
"--json",
|
||||
help="specify the json file for ssh data (generated by starting the clan installer)",
|
||||
)
|
||||
group.add_argument(
|
||||
"-P",
|
||||
"--png",
|
||||
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
|
||||
)
|
||||
parser.set_defaults(func=ssh_command)
|
||||
@@ -3,6 +3,8 @@
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import socket
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from shlex import quote
|
||||
from typing import Any
|
||||
@@ -10,6 +12,7 @@ from typing import Any
|
||||
from clan_cli.cmd import CmdOut, RunOpts, run
|
||||
from clan_cli.colors import AnsiColor
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_shell
|
||||
from clan_cli.ssh.host_key import HostKeyCheck
|
||||
|
||||
cmdlog = logging.getLogger(__name__)
|
||||
@@ -40,13 +43,6 @@ class Host:
|
||||
def target(self) -> str:
|
||||
return f"{self.user or 'root'}@{self.host}"
|
||||
|
||||
@property
|
||||
def target_for_rsync(self) -> str:
|
||||
host = self.host
|
||||
if ":" in host:
|
||||
host = f"[{host}]"
|
||||
return f"{self.user or 'root'}@{host}"
|
||||
|
||||
def run_local(
|
||||
self,
|
||||
cmd: list[str],
|
||||
@@ -163,8 +159,20 @@ class Host:
|
||||
def ssh_cmd(
|
||||
self,
|
||||
verbose_ssh: bool = False,
|
||||
tor_socks: bool = False,
|
||||
tty: bool = False,
|
||||
password: str | None = None,
|
||||
) -> list[str]:
|
||||
packages = []
|
||||
password_args = []
|
||||
if password:
|
||||
packages.append("nixpkgs#sshpass")
|
||||
password_args = [
|
||||
"sshpass",
|
||||
"-p",
|
||||
password,
|
||||
]
|
||||
|
||||
ssh_opts = self.ssh_cmd_opts
|
||||
if verbose_ssh or self.verbose_ssh:
|
||||
ssh_opts.extend(["-v"])
|
||||
@@ -176,8 +184,36 @@ class Host:
|
||||
if self.key:
|
||||
ssh_opts.extend(["-i", self.key])
|
||||
|
||||
return [
|
||||
if tor_socks:
|
||||
ssh_opts.append("-o")
|
||||
ssh_opts.append("ProxyCommand=nc -x 127.0.0.1:9050 -X 5 %h %p")
|
||||
|
||||
cmd = [
|
||||
*password_args,
|
||||
"ssh",
|
||||
self.target,
|
||||
*ssh_opts,
|
||||
]
|
||||
|
||||
return nix_shell(packages, cmd)
|
||||
|
||||
def connect_ssh_shell(
|
||||
self, *, password: str | None = None, tor_socks: bool = False
|
||||
) -> None:
|
||||
cmd = self.ssh_cmd(tor_socks=tor_socks, password=password)
|
||||
|
||||
subprocess.run(cmd)
|
||||
|
||||
|
||||
def is_ssh_reachable(host: Host) -> bool:
|
||||
sock = socket.socket(
|
||||
socket.AF_INET6 if ":" in host.host else socket.AF_INET, socket.SOCK_STREAM
|
||||
)
|
||||
sock.settimeout(2)
|
||||
try:
|
||||
sock.connect((host.host, host.port or 22))
|
||||
sock.close()
|
||||
except OSError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
203
pkgs/clan-cli/clan_cli/ssh/tor.py
Executable file
203
pkgs/clan-cli/clan_cli/ssh/tor.py
Executable file
@@ -0,0 +1,203 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import socket
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.errors import TorConnectionError, TorSocksError
|
||||
from clan_cli.nix import nix_shell
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TorTarget:
|
||||
onion: str
|
||||
port: int
|
||||
proxy_host: str = "127.0.0.1"
|
||||
proxy_port: int = 9050
|
||||
|
||||
|
||||
def connect_to_tor_socks(sock: socket.socket, target: TorTarget) -> socket.socket:
|
||||
"""
|
||||
Connects to a .onion host through Tor's SOCKS5 proxy using the standard library.
|
||||
|
||||
Args:
|
||||
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
|
||||
|
||||
Returns:
|
||||
socket.socket: A socket connected to the .onion address via Tor.
|
||||
"""
|
||||
try:
|
||||
# 1. Create a socket to the Tor SOCKS proxy
|
||||
sock.connect((target.proxy_host, target.proxy_port))
|
||||
except ConnectionRefusedError as ex:
|
||||
msg = f"Failed to connect to Tor SOCKS proxy at {target.proxy_host}:{target.proxy_port}: {ex}"
|
||||
raise TorSocksError(msg) from ex
|
||||
|
||||
# 2. SOCKS5 handshake
|
||||
sock.sendall(
|
||||
b"\x05\x01\x00"
|
||||
) # SOCKS version (0x05), number of authentication methods (0x01), no-authentication (0x00)
|
||||
response = sock.recv(2)
|
||||
|
||||
# Validate the SOCKS5 handshake response
|
||||
if response != b"\x05\x00": # SOCKS version = 0x05, no-authentication = 0x00
|
||||
msg = f"SOCKS5 handshake failed, unexpected response: {response.hex()}"
|
||||
raise TorSocksError(msg)
|
||||
|
||||
# 3. Connection request
|
||||
request = (
|
||||
b"\x05\x01\x00\x03" # SOCKS version, connect command, reserved, address type = domainname
|
||||
+ bytes([len(target.onion)])
|
||||
+ target.onion.encode("utf-8") # Add domain name length and domain name
|
||||
+ struct.pack(">H", target.port) # Add destination port in network byte order
|
||||
)
|
||||
sock.sendall(request)
|
||||
|
||||
# Read the connection request response
|
||||
response = sock.recv(10)
|
||||
if response[1] != 0x00: # 0x00 indicates success
|
||||
msg = f".onion address not reachable: {response[1]}"
|
||||
raise TorConnectionError(msg)
|
||||
|
||||
return sock
|
||||
|
||||
|
||||
def fetch_onion_content(target: TorTarget) -> str:
|
||||
"""
|
||||
Fetches the HTTP response from a .onion service through a Tor SOCKS5 proxy.
|
||||
|
||||
Args:
|
||||
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
|
||||
|
||||
Returns:
|
||||
str: The HTTP response text, or an error message if something goes wrong.
|
||||
"""
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
# Connect to the .onion service via the SOCKS proxy
|
||||
sock = connect_to_tor_socks(sock, target)
|
||||
|
||||
# 1. Send an HTTP GET request
|
||||
request = f"GET / HTTP/1.1\r\nHost: {target.onion}\r\nConnection: close\r\n\r\n"
|
||||
sock.sendall(request.encode("utf-8"))
|
||||
|
||||
# 2. Read the HTTP response
|
||||
response = b""
|
||||
while True:
|
||||
chunk = sock.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
response += chunk
|
||||
|
||||
return response.decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def spawn_tor() -> None:
|
||||
"""
|
||||
Spawns a Tor process using `nix-shell`.
|
||||
"""
|
||||
cmd_args = ["tor"]
|
||||
packages = ["nixpkgs#tor"]
|
||||
cmd = nix_shell(packages, cmd_args)
|
||||
run(cmd)
|
||||
|
||||
|
||||
def tor_online_test() -> bool:
|
||||
"""
|
||||
Tests if Tor is online by attempting to fetch content from a known .onion service.
|
||||
Returns True if successful, False otherwise.
|
||||
"""
|
||||
target = TorTarget(
|
||||
onion="duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion", port=80
|
||||
)
|
||||
try:
|
||||
response = fetch_onion_content(target)
|
||||
except TorConnectionError:
|
||||
return False
|
||||
else:
|
||||
return "duckduckgo" in response
|
||||
|
||||
|
||||
def ssh_tor_reachable(target: TorTarget) -> bool:
|
||||
"""
|
||||
Tests if SSH is reachable via Tor by attempting to connect to a known .onion service.
|
||||
Returns True if successful, False otherwise.
|
||||
"""
|
||||
try:
|
||||
response = fetch_onion_content(target)
|
||||
except TorConnectionError:
|
||||
return False
|
||||
else:
|
||||
return "SSH-" in response
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Main function to handle command-line arguments and execute the script.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Interact with a .onion service through Tor SOCKS5 proxy."
|
||||
)
|
||||
parser.add_argument(
|
||||
"onion_url", type=str, help=".onion URL to connect to (e.g., 'example.onion')"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port", type=int, help="Port to connect to on the .onion URL (default: 80)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-host",
|
||||
type=str,
|
||||
default="127.0.0.1",
|
||||
help="Address of the Tor SOCKS5 proxy (default: 127.0.0.1)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--proxy-port",
|
||||
type=int,
|
||||
default=9050,
|
||||
help="Port of the Tor SOCKS5 proxy (default: 9050)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ssh-tor-reachable",
|
||||
action="store_true",
|
||||
help="Test if SSH is reachable via Tor",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
default_port = 22 if args.ssh_tor_reachable else 80
|
||||
|
||||
# Create a TorTarget instance
|
||||
target = TorTarget(
|
||||
onion=args.onion_url,
|
||||
port=args.port or default_port,
|
||||
proxy_host=args.proxy_host,
|
||||
proxy_port=args.proxy_port,
|
||||
)
|
||||
|
||||
if args.ssh_tor_reachable:
|
||||
print(f"Testing if SSH is reachable via Tor for {target.onion}...")
|
||||
reachable = ssh_tor_reachable(target)
|
||||
print(f"SSH is {'reachable' if reachable else 'not reachable'} via Tor.")
|
||||
return
|
||||
|
||||
print(
|
||||
f"Connecting to {target.onion} on port {target.port} via proxy {target.proxy_host}:{target.proxy_port}..."
|
||||
)
|
||||
try:
|
||||
response = fetch_onion_content(target)
|
||||
print("Response:")
|
||||
print(response)
|
||||
except TorSocksError:
|
||||
log.error("Failed to connect to the Tor SOCKS proxy.")
|
||||
log.error(
|
||||
"Is Tor running? If not, you can start it by running 'tor' in a nix-shell."
|
||||
)
|
||||
except TorConnectionError:
|
||||
log.error("The onion address is not reachable via Tor.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,141 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
|
||||
import clan_cli
|
||||
import pytest
|
||||
import pytest_subprocess.fake_process
|
||||
from clan_cli.ssh import cli
|
||||
from pytest_subprocess import utils
|
||||
from stdout import CaptureOutput
|
||||
|
||||
|
||||
def test_no_args(
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
capture_output: CaptureOutput,
|
||||
) -> None:
|
||||
monkeypatch.setattr(sys, "argv", ["", "ssh"])
|
||||
with capture_output as output, pytest.raises(SystemExit):
|
||||
clan_cli.main()
|
||||
assert output.err.startswith("usage:")
|
||||
|
||||
|
||||
# using fp fixture from pytest-subprocess
|
||||
def test_ssh_no_pass(
|
||||
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
host = "somehost"
|
||||
user = "user"
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||
cmd: list[str | utils.Any] = [
|
||||
"nix",
|
||||
fp.any(),
|
||||
"shell",
|
||||
fp.any(),
|
||||
"-c",
|
||||
"ssh",
|
||||
"-o",
|
||||
"UserKnownHostsFile=/dev/null",
|
||||
"-o",
|
||||
"StrictHostKeyChecking=no",
|
||||
f"{user}@{host}",
|
||||
fp.any(),
|
||||
]
|
||||
fp.register(cmd)
|
||||
cli.ssh(
|
||||
host=host,
|
||||
user=user,
|
||||
)
|
||||
assert fp.call_count(cmd) == 1
|
||||
|
||||
|
||||
def test_ssh_with_pass(
|
||||
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
host = "somehost"
|
||||
user = "user"
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||
cmd: list[str | utils.Any] = [
|
||||
"nix",
|
||||
fp.any(),
|
||||
"shell",
|
||||
fp.any(),
|
||||
"-c",
|
||||
"sshpass",
|
||||
"-p",
|
||||
fp.any(),
|
||||
]
|
||||
fp.register(cmd)
|
||||
cli.ssh(
|
||||
host=host,
|
||||
user=user,
|
||||
password="XXX",
|
||||
)
|
||||
assert fp.call_count(cmd) == 1
|
||||
|
||||
|
||||
def test_ssh_no_pass_with_torify(
|
||||
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
host = "somehost"
|
||||
user = "user"
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||
cmd: list[str | utils.Any] = [
|
||||
"nix",
|
||||
fp.any(),
|
||||
"shell",
|
||||
fp.any(),
|
||||
"-c",
|
||||
"torify",
|
||||
"ssh",
|
||||
"-o",
|
||||
"UserKnownHostsFile=/dev/null",
|
||||
"-o",
|
||||
"StrictHostKeyChecking=no",
|
||||
f"{user}@{host}",
|
||||
fp.any(),
|
||||
]
|
||||
fp.register(cmd)
|
||||
cli.ssh(
|
||||
host=host,
|
||||
user=user,
|
||||
torify=True,
|
||||
)
|
||||
assert fp.call_count(cmd) == 1
|
||||
|
||||
|
||||
def test_ssh_with_pass_with_torify(
|
||||
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
|
||||
) -> None:
|
||||
host = "somehost"
|
||||
user = "user"
|
||||
if os.environ.get("IN_NIX_SANDBOX"):
|
||||
monkeypatch.delenv("IN_NIX_SANDBOX")
|
||||
cmd: list[str | utils.Any] = [
|
||||
"nix",
|
||||
fp.any(),
|
||||
"shell",
|
||||
fp.any(),
|
||||
"-c",
|
||||
"torify",
|
||||
"sshpass",
|
||||
"-p",
|
||||
fp.any(),
|
||||
]
|
||||
fp.register(cmd)
|
||||
cli.ssh(
|
||||
host=host,
|
||||
user=user,
|
||||
password="XXX",
|
||||
torify=True,
|
||||
)
|
||||
assert fp.call_count(cmd) == 1
|
||||
|
||||
|
||||
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
|
||||
cmd: list[str | utils.Any] = [fp.any()]
|
||||
fp.register(cmd, stdout="https://test.test")
|
||||
result = cli.qrcode_scan("test.png")
|
||||
assert result == "https://test.test"
|
||||
@@ -48,6 +48,7 @@ lint.ignore = [
|
||||
"A003",
|
||||
"ANN101",
|
||||
"ANN401",
|
||||
"TRY400",
|
||||
"E402",
|
||||
"E501",
|
||||
"E731",
|
||||
|
||||
Reference in New Issue
Block a user