Revert "clan-cli: Use Remote class in DeployInfo, add tests for qrcode parser and json parser"

This reverts commit 2ca3b5d698.
This commit is contained in:
lassulus
2025-06-09 11:28:15 +02:00
parent 2a576a604b
commit 2460ac970f
12 changed files with 151 additions and 301 deletions

View File

@@ -156,7 +156,7 @@ def install_machine(opts: InstallOptions) -> None:
def install_command(args: argparse.Namespace) -> None:
HostKeyCheck.from_str(args.host_key_check)
host_key_check = HostKeyCheck.from_str(args.host_key_check)
try:
# Only if the caller did not specify a target_host via args.target_host
# Find a suitable target_host that is reachable
@@ -165,17 +165,17 @@ def install_command(args: argparse.Namespace) -> None:
use_tor = False
if deploy_info and not args.target_host:
host = find_reachable_host(deploy_info)
host = find_reachable_host(deploy_info, host_key_check)
if host is None:
use_tor = True
target_host = deploy_info.tor.target
target_host = f"root@{deploy_info.tor}"
else:
target_host = host.target
if args.password:
password = args.password
elif deploy_info and deploy_info.addrs[0].password:
password = deploy_info.addrs[0].password
elif deploy_info and deploy_info.pwd:
password = deploy_info.pwd
else:
password = None

View File

@@ -121,7 +121,7 @@ def deploy_machine(machine: Machine) -> None:
upload_secrets(machine, sudo_host)
upload_secret_vars(machine, sudo_host)
path = upload_sources(machine, sudo_host)
path = upload_sources(machine, host)
nix_options = [
"--show-trace",

View File

@@ -1,19 +1,24 @@
import argparse
import ipaddress
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from clan_lib.async_run import AsyncRuntime
from clan_lib.cmd import run
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_shell
from clan_lib.ssh.remote import HostKeyCheck, Remote
from clan_lib.ssh.parse import parse_deployment_address
from clan_lib.ssh.remote import Remote, is_ssh_reachable
from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
)
from clan_cli.ssh.host_key import HostKeyCheck
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
log = logging.getLogger(__name__)
@@ -21,148 +26,113 @@ log = logging.getLogger(__name__)
@dataclass
class DeployInfo:
addrs: list[Remote]
@property
def tor(self) -> Remote:
"""Return a list of Remote objects that are configured for Tor."""
addrs = [addr for addr in self.addrs if addr.tor_socks]
if not addrs:
msg = "No tor address provided, please provide a tor address."
raise ClanError(msg)
if len(addrs) > 1:
msg = "Multiple tor addresses provided, expected only one."
raise ClanError(msg)
return addrs[0]
addrs: list[str]
tor: str | None = None
pwd: str | None = None
@staticmethod
def from_hostnames(
hostname: list[str], host_key_check: HostKeyCheck
) -> "DeployInfo":
remotes = []
for host in hostname:
if not host:
msg = "Hostname cannot be empty."
raise ClanError(msg)
remote = Remote.from_deployment_address(
machine_name="clan-installer",
address=host,
host_key_check=host_key_check,
)
remotes.append(remote)
return DeployInfo(addrs=remotes)
def from_hostname(hostname: str, args: argparse.Namespace) -> "DeployInfo":
m = Machine(hostname, flake=args.flake)
return DeployInfo(addrs=[m.target_host_address])
@staticmethod
def from_json(data: dict[str, Any], host_key_check: HostKeyCheck) -> "DeployInfo":
addrs = []
password = data.get("pass")
for addr in data.get("addrs", []):
if isinstance(addr, str):
remote = Remote.from_deployment_address(
machine_name="clan-installer",
address=addr,
host_key_check=host_key_check,
password=password,
)
addrs.append(remote)
else:
msg = f"Invalid address format: {addr}"
raise ClanError(msg)
if tor_addr := data.get("tor"):
remote = Remote.from_deployment_address(
machine_name="clan-installer",
address=tor_addr,
host_key_check=host_key_check,
password=password,
tor_socks=True,
)
addrs.append(remote)
return DeployInfo(addrs=addrs)
@staticmethod
def from_qr_code(picture_file: Path, host_key_check: HostKeyCheck) -> "DeployInfo":
cmd = nix_shell(
["zbar"],
[
"zbarimg",
"--quiet",
"--raw",
str(picture_file),
],
def from_json(data: dict[str, Any]) -> "DeployInfo":
return DeployInfo(
tor=data.get("tor"), pwd=data.get("pass"), addrs=data.get("addrs", [])
)
res = run(cmd)
data = res.stdout.strip()
return DeployInfo.from_json(json.loads(data), host_key_check=host_key_check)
def find_reachable_host(deploy_info: DeployInfo) -> Remote | None:
def is_ipv6(ip: str) -> bool:
try:
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
except ValueError:
return False
def find_reachable_host(
deploy_info: DeployInfo, host_key_check: HostKeyCheck
) -> Remote | None:
host = None
for addr in deploy_info.addrs:
if addr.is_ssh_reachable():
host = addr
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
host_ = parse_deployment_address(
machine_name="uknown", address=host_addr, host_key_check=host_key_check
)
if is_ssh_reachable(host_):
host = host_
break
return host
def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None:
if host := find_reachable_host(deploy_info):
def qrcode_scan(picture_file: Path) -> str:
cmd = nix_shell(
["zbar"],
[
"zbarimg",
"--quiet",
"--raw",
str(picture_file),
],
)
res = run(cmd)
return res.stdout.strip()
def parse_qr_code(picture_file: Path) -> DeployInfo:
data = qrcode_scan(picture_file)
return DeployInfo.from_json(json.loads(data))
def ssh_shell_from_deploy(
deploy_info: DeployInfo, runtime: AsyncRuntime, host_key_check: HostKeyCheck
) -> None:
if host := find_reachable_host(deploy_info, host_key_check):
host.interactive_ssh()
return
log.info("Could not reach host via clearnet 'addrs'")
log.info(f"Trying to reach host via tor '{deploy_info}'")
tor_addrs = [addr for addr in deploy_info.addrs if addr.tor_socks]
if not tor_addrs:
msg = "No tor address provided, please provide a tor address."
raise ClanError(msg)
with spawn_tor():
for tor_addr in tor_addrs:
log.info(f"Trying to reach host via tor address: {tor_addr}")
if ssh_tor_reachable(
TorTarget(
onion=tor_addr.address, port=tor_addr.port if tor_addr.port else 22
)
):
log.info(
"Host reachable via tor address, starting interactive ssh session."
)
tor_addr.interactive_ssh()
return
log.error("Could not reach host via tor address.")
else:
log.info("Could not reach host via clearnet 'addrs'")
log.info(f"Trying to reach host via tor '{deploy_info.tor}'")
spawn_tor(runtime)
if not deploy_info.tor:
msg = "No tor address provided, please provide a tor address."
raise ClanError(msg)
if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)):
host = Remote(
address=deploy_info.tor,
user="root",
password=deploy_info.pwd,
tor_socks=True,
command_prefix="tor",
)
else:
msg = "Could not reach host via tor either."
raise ClanError(msg)
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
host_key_check = HostKeyCheck.from_str(args.host_key_check)
if args.json:
json_file = Path(args.json)
if json_file.is_file():
data = json.loads(json_file.read_text())
return DeployInfo.from_json(data, host_key_check)
return DeployInfo.from_json(data)
data = json.loads(args.json)
return DeployInfo.from_json(data, host_key_check)
return DeployInfo.from_json(data)
if args.png:
return DeployInfo.from_qr_code(Path(args.png), host_key_check)
return parse_qr_code(Path(args.png))
if hasattr(args, "machines"):
return DeployInfo.from_hostnames(args.machines, host_key_check)
return DeployInfo.from_hostname(args.machines[0], args)
return None
def ssh_command(args: argparse.Namespace) -> None:
host_key_check = HostKeyCheck.from_str(args.host_key_check)
deploy_info = ssh_command_parse(args)
if not deploy_info:
msg = "No MACHINE, --json or --png data provided"
raise ClanError(msg)
ssh_shell_from_deploy(deploy_info)
with AsyncRuntime() as runtime:
ssh_shell_from_deploy(deploy_info, runtime, host_key_check)
def register_parser(parser: argparse.ArgumentParser) -> None:
@@ -187,10 +157,13 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
"--png",
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
)
parser.add_argument(
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
)
parser.add_argument(
"--host-key-check",
choices=["strict", "ask", "tofu", "none"],
default="tofu",
default="ask",
help="Host key (.ssh/known_hosts) check mode.",
)
parser.set_defaults(func=ssh_command)

View File

@@ -1,70 +0,0 @@
import json
from pathlib import Path
import pytest
from clan_lib.ssh.remote import HostKeyCheck, Remote
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host
def test_qrcode_scan(test_root: Path) -> None:
# Create a dummy QR code image file
picture_file = test_root / "data" / "clan_installer_qrcode.png"
# Call the qrcode_scan function
deploy_info = DeployInfo.from_qr_code(picture_file, HostKeyCheck.NONE)
host = deploy_info.addrs[0]
assert host.address == "192.168.122.86"
assert host.user == "root"
assert host.password == "scabbed-defender-headlock"
tor_host = deploy_info.addrs[1]
assert (
tor_host.address
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
)
assert tor_host.tor_socks is True
assert tor_host.password == "scabbed-defender-headlock"
assert tor_host.user == "root"
assert (
tor_host.address
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
)
def test_from_json() -> None:
data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}'
deploy_info = DeployInfo.from_json(json.loads(data), HostKeyCheck.NONE)
host = deploy_info.addrs[0]
assert host.password == "scabbed-defender-headlock"
assert host.address == "192.168.122.86"
tor_host = deploy_info.addrs[1]
assert (
tor_host.address
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
)
assert tor_host.tor_socks is True
assert tor_host.password == "scabbed-defender-headlock"
assert tor_host.user == "root"
assert (
tor_host.address
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
)
@pytest.mark.with_core
def test_find_reachable_host(hosts: list[Remote]) -> None:
host = hosts[0]
deploy_info = DeployInfo.from_hostnames(
["172.19.1.2", host.ssh_url()], HostKeyCheck.NONE
)
assert deploy_info.addrs[0].address == "172.19.1.2"
remote = find_reachable_host(deploy_info=deploy_info)
assert remote is not None
assert remote.ssh_url() == host.ssh_url()

View File

@@ -5,11 +5,10 @@ import logging
import socket
import struct
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from subprocess import Popen
from clan_lib.async_run import AsyncRuntime
from clan_lib.cmd import Log, RunOpts, run
from clan_lib.errors import TorConnectionError, TorSocksError
from clan_lib.nix import nix_shell
@@ -109,31 +108,32 @@ def is_tor_running() -> bool:
return True
@contextmanager
def spawn_tor() -> Iterator[None]:
def spawn_tor(runtime: AsyncRuntime) -> None:
"""
Spawns a Tor process using `nix-shell` if Tor is not already running.
"""
def start_tor() -> None:
"""Starts Tor process using nix-shell."""
cmd_args = ["tor", "--HardwareAccel", "1"]
packages = ["tor"]
cmd = nix_shell(packages, cmd_args)
runtime.async_run(None, run, cmd, RunOpts(log=Log.BOTH))
log.debug("Attempting to start Tor")
# Check if Tor is already running
if is_tor_running():
log.info("Tor is running")
return
cmd_args = ["tor", "--HardwareAccel", "1"]
packages = ["tor"]
cmd = nix_shell(packages, cmd_args)
process = Popen(cmd)
try:
while not is_tor_running():
log.debug("Waiting for Tor to start...")
time.sleep(0.2)
log.info("Tor is now running")
yield
finally:
log.info("Terminating Tor process...")
process.terminate()
process.wait()
log.info("Tor process terminated")
# Attempt to start Tor
start_tor()
# Continuously check if Tor has started
while not is_tor_running():
log.debug("Waiting for Tor to start...")
time.sleep(0.2)
log.info("Tor is now running")
def tor_online_test() -> bool:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB