clan-cli: Use Remote class in DeployInfo, add tests for qrcode parser and json parser
This commit is contained in:
@@ -1,24 +1,19 @@
|
||||
import argparse
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_lib.async_run import AsyncRuntime
|
||||
from clan_lib.cmd import run
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.machines.machines import Machine
|
||||
from clan_lib.nix import nix_shell
|
||||
from clan_lib.ssh.parse import parse_deployment_address
|
||||
from clan_lib.ssh.remote import Remote, is_ssh_reachable
|
||||
from clan_lib.ssh.remote import HostKeyCheck, Remote
|
||||
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
)
|
||||
from clan_cli.ssh.host_key import HostKeyCheck
|
||||
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -26,113 +21,148 @@ log = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class DeployInfo:
|
||||
addrs: list[str]
|
||||
tor: str | None = None
|
||||
pwd: str | None = None
|
||||
addrs: list[Remote]
|
||||
|
||||
@property
|
||||
def tor(self) -> Remote:
|
||||
"""Return a list of Remote objects that are configured for Tor."""
|
||||
addrs = [addr for addr in self.addrs if addr.tor_socks]
|
||||
|
||||
if not addrs:
|
||||
msg = "No tor address provided, please provide a tor address."
|
||||
raise ClanError(msg)
|
||||
|
||||
if len(addrs) > 1:
|
||||
msg = "Multiple tor addresses provided, expected only one."
|
||||
raise ClanError(msg)
|
||||
return addrs[0]
|
||||
|
||||
@staticmethod
|
||||
def from_hostname(hostname: str, args: argparse.Namespace) -> "DeployInfo":
|
||||
m = Machine(hostname, flake=args.flake)
|
||||
return DeployInfo(addrs=[m.target_host_address])
|
||||
def from_hostnames(
|
||||
hostname: list[str], host_key_check: HostKeyCheck
|
||||
) -> "DeployInfo":
|
||||
remotes = []
|
||||
for host in hostname:
|
||||
if not host:
|
||||
msg = "Hostname cannot be empty."
|
||||
raise ClanError(msg)
|
||||
remote = Remote.from_deployment_address(
|
||||
machine_name="clan-installer",
|
||||
address=host,
|
||||
host_key_check=host_key_check,
|
||||
)
|
||||
remotes.append(remote)
|
||||
return DeployInfo(addrs=remotes)
|
||||
|
||||
@staticmethod
|
||||
def from_json(data: dict[str, Any]) -> "DeployInfo":
|
||||
return DeployInfo(
|
||||
tor=data.get("tor"), pwd=data.get("pass"), addrs=data.get("addrs", [])
|
||||
def from_json(data: dict[str, Any], host_key_check: HostKeyCheck) -> "DeployInfo":
|
||||
addrs = []
|
||||
password = data.get("pass")
|
||||
|
||||
for addr in data.get("addrs", []):
|
||||
if isinstance(addr, str):
|
||||
remote = Remote.from_deployment_address(
|
||||
machine_name="clan-installer",
|
||||
address=addr,
|
||||
host_key_check=host_key_check,
|
||||
password=password,
|
||||
)
|
||||
addrs.append(remote)
|
||||
else:
|
||||
msg = f"Invalid address format: {addr}"
|
||||
raise ClanError(msg)
|
||||
if tor_addr := data.get("tor"):
|
||||
remote = Remote.from_deployment_address(
|
||||
machine_name="clan-installer",
|
||||
address=tor_addr,
|
||||
host_key_check=host_key_check,
|
||||
password=password,
|
||||
tor_socks=True,
|
||||
)
|
||||
addrs.append(remote)
|
||||
|
||||
return DeployInfo(addrs=addrs)
|
||||
|
||||
@staticmethod
|
||||
def from_qr_code(picture_file: Path, host_key_check: HostKeyCheck) -> "DeployInfo":
|
||||
cmd = nix_shell(
|
||||
["zbar"],
|
||||
[
|
||||
"zbarimg",
|
||||
"--quiet",
|
||||
"--raw",
|
||||
str(picture_file),
|
||||
],
|
||||
)
|
||||
res = run(cmd)
|
||||
data = res.stdout.strip()
|
||||
return DeployInfo.from_json(json.loads(data), host_key_check=host_key_check)
|
||||
|
||||
|
||||
def is_ipv6(ip: str) -> bool:
|
||||
try:
|
||||
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def find_reachable_host(
|
||||
deploy_info: DeployInfo, host_key_check: HostKeyCheck
|
||||
) -> Remote | None:
|
||||
def find_reachable_host(deploy_info: DeployInfo) -> Remote | None:
|
||||
host = None
|
||||
for addr in deploy_info.addrs:
|
||||
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
|
||||
host_ = parse_deployment_address(
|
||||
machine_name="uknown", address=host_addr, host_key_check=host_key_check
|
||||
)
|
||||
if is_ssh_reachable(host_):
|
||||
host = host_
|
||||
if addr.is_ssh_reachable():
|
||||
host = addr
|
||||
break
|
||||
|
||||
return host
|
||||
|
||||
|
||||
def qrcode_scan(picture_file: Path) -> str:
|
||||
cmd = nix_shell(
|
||||
["zbar"],
|
||||
[
|
||||
"zbarimg",
|
||||
"--quiet",
|
||||
"--raw",
|
||||
str(picture_file),
|
||||
],
|
||||
)
|
||||
res = run(cmd)
|
||||
return res.stdout.strip()
|
||||
|
||||
|
||||
def parse_qr_code(picture_file: Path) -> DeployInfo:
|
||||
data = qrcode_scan(picture_file)
|
||||
return DeployInfo.from_json(json.loads(data))
|
||||
|
||||
|
||||
def ssh_shell_from_deploy(
|
||||
deploy_info: DeployInfo, runtime: AsyncRuntime, host_key_check: HostKeyCheck
|
||||
) -> None:
|
||||
if host := find_reachable_host(deploy_info, host_key_check):
|
||||
def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None:
|
||||
if host := find_reachable_host(deploy_info):
|
||||
host.interactive_ssh()
|
||||
else:
|
||||
log.info("Could not reach host via clearnet 'addrs'")
|
||||
log.info(f"Trying to reach host via tor '{deploy_info.tor}'")
|
||||
spawn_tor(runtime)
|
||||
if not deploy_info.tor:
|
||||
msg = "No tor address provided, please provide a tor address."
|
||||
raise ClanError(msg)
|
||||
if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)):
|
||||
host = Remote(
|
||||
address=deploy_info.tor,
|
||||
user="root",
|
||||
password=deploy_info.pwd,
|
||||
tor_socks=True,
|
||||
command_prefix="tor",
|
||||
)
|
||||
else:
|
||||
msg = "Could not reach host via tor either."
|
||||
raise ClanError(msg)
|
||||
return
|
||||
|
||||
log.info("Could not reach host via clearnet 'addrs'")
|
||||
log.info(f"Trying to reach host via tor '{deploy_info}'")
|
||||
|
||||
tor_addrs = [addr for addr in deploy_info.addrs if addr.tor_socks]
|
||||
if not tor_addrs:
|
||||
msg = "No tor address provided, please provide a tor address."
|
||||
raise ClanError(msg)
|
||||
|
||||
with spawn_tor():
|
||||
for tor_addr in tor_addrs:
|
||||
log.info(f"Trying to reach host via tor address: {tor_addr}")
|
||||
if ssh_tor_reachable(
|
||||
TorTarget(
|
||||
onion=tor_addr.address, port=tor_addr.port if tor_addr.port else 22
|
||||
)
|
||||
):
|
||||
log.info(
|
||||
"Host reachable via tor address, starting interactive ssh session."
|
||||
)
|
||||
tor_addr.interactive_ssh()
|
||||
return
|
||||
|
||||
log.error("Could not reach host via tor address.")
|
||||
|
||||
|
||||
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
|
||||
host_key_check = HostKeyCheck.from_str(args.host_key_check)
|
||||
if args.json:
|
||||
json_file = Path(args.json)
|
||||
if json_file.is_file():
|
||||
data = json.loads(json_file.read_text())
|
||||
return DeployInfo.from_json(data)
|
||||
return DeployInfo.from_json(data, host_key_check)
|
||||
data = json.loads(args.json)
|
||||
return DeployInfo.from_json(data)
|
||||
return DeployInfo.from_json(data, host_key_check)
|
||||
if args.png:
|
||||
return parse_qr_code(Path(args.png))
|
||||
return DeployInfo.from_qr_code(Path(args.png), host_key_check)
|
||||
|
||||
if hasattr(args, "machines"):
|
||||
return DeployInfo.from_hostname(args.machines[0], args)
|
||||
return DeployInfo.from_hostnames(args.machines, host_key_check)
|
||||
return None
|
||||
|
||||
|
||||
def ssh_command(args: argparse.Namespace) -> None:
|
||||
host_key_check = HostKeyCheck.from_str(args.host_key_check)
|
||||
deploy_info = ssh_command_parse(args)
|
||||
if not deploy_info:
|
||||
msg = "No MACHINE, --json or --png data provided"
|
||||
raise ClanError(msg)
|
||||
|
||||
with AsyncRuntime() as runtime:
|
||||
ssh_shell_from_deploy(deploy_info, runtime, host_key_check)
|
||||
ssh_shell_from_deploy(deploy_info)
|
||||
|
||||
|
||||
def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
@@ -157,13 +187,10 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
"--png",
|
||||
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host-key-check",
|
||||
choices=["strict", "ask", "tofu", "none"],
|
||||
default="ask",
|
||||
default="tofu",
|
||||
help="Host key (.ssh/known_hosts) check mode.",
|
||||
)
|
||||
parser.set_defaults(func=ssh_command)
|
||||
|
||||
70
pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py
Normal file
70
pkgs/clan-cli/clan_cli/ssh/test_deploy_info.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from clan_lib.ssh.remote import HostKeyCheck, Remote
|
||||
|
||||
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host
|
||||
|
||||
|
||||
def test_qrcode_scan(test_root: Path) -> None:
|
||||
# Create a dummy QR code image file
|
||||
picture_file = test_root / "data" / "clan_installer_qrcode.png"
|
||||
|
||||
# Call the qrcode_scan function
|
||||
deploy_info = DeployInfo.from_qr_code(picture_file, HostKeyCheck.NONE)
|
||||
|
||||
host = deploy_info.addrs[0]
|
||||
assert host.address == "192.168.122.86"
|
||||
assert host.user == "root"
|
||||
assert host.password == "scabbed-defender-headlock"
|
||||
|
||||
tor_host = deploy_info.addrs[1]
|
||||
assert (
|
||||
tor_host.address
|
||||
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||
)
|
||||
assert tor_host.tor_socks is True
|
||||
assert tor_host.password == "scabbed-defender-headlock"
|
||||
assert tor_host.user == "root"
|
||||
assert (
|
||||
tor_host.address
|
||||
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||
)
|
||||
|
||||
|
||||
def test_from_json() -> None:
|
||||
data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}'
|
||||
deploy_info = DeployInfo.from_json(json.loads(data), HostKeyCheck.NONE)
|
||||
|
||||
host = deploy_info.addrs[0]
|
||||
assert host.password == "scabbed-defender-headlock"
|
||||
assert host.address == "192.168.122.86"
|
||||
|
||||
tor_host = deploy_info.addrs[1]
|
||||
assert (
|
||||
tor_host.address
|
||||
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||
)
|
||||
assert tor_host.tor_socks is True
|
||||
assert tor_host.password == "scabbed-defender-headlock"
|
||||
assert tor_host.user == "root"
|
||||
assert (
|
||||
tor_host.address
|
||||
== "qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.with_core
|
||||
def test_find_reachable_host(hosts: list[Remote]) -> None:
|
||||
host = hosts[0]
|
||||
deploy_info = DeployInfo.from_hostnames(
|
||||
["172.19.1.2", host.ssh_url()], HostKeyCheck.NONE
|
||||
)
|
||||
|
||||
assert deploy_info.addrs[0].address == "172.19.1.2"
|
||||
|
||||
remote = find_reachable_host(deploy_info=deploy_info)
|
||||
|
||||
assert remote is not None
|
||||
assert remote.ssh_url() == host.ssh_url()
|
||||
@@ -5,10 +5,11 @@ import logging
|
||||
import socket
|
||||
import struct
|
||||
import time
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from subprocess import Popen
|
||||
|
||||
from clan_lib.async_run import AsyncRuntime
|
||||
from clan_lib.cmd import Log, RunOpts, run
|
||||
from clan_lib.errors import TorConnectionError, TorSocksError
|
||||
from clan_lib.nix import nix_shell
|
||||
|
||||
@@ -108,32 +109,31 @@ def is_tor_running() -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def spawn_tor(runtime: AsyncRuntime) -> None:
|
||||
@contextmanager
|
||||
def spawn_tor() -> Iterator[None]:
|
||||
"""
|
||||
Spawns a Tor process using `nix-shell` if Tor is not already running.
|
||||
"""
|
||||
|
||||
def start_tor() -> None:
|
||||
"""Starts Tor process using nix-shell."""
|
||||
cmd_args = ["tor", "--HardwareAccel", "1"]
|
||||
packages = ["tor"]
|
||||
cmd = nix_shell(packages, cmd_args)
|
||||
runtime.async_run(None, run, cmd, RunOpts(log=Log.BOTH))
|
||||
log.debug("Attempting to start Tor")
|
||||
|
||||
# Check if Tor is already running
|
||||
if is_tor_running():
|
||||
log.info("Tor is running")
|
||||
return
|
||||
|
||||
# Attempt to start Tor
|
||||
start_tor()
|
||||
|
||||
# Continuously check if Tor has started
|
||||
while not is_tor_running():
|
||||
log.debug("Waiting for Tor to start...")
|
||||
time.sleep(0.2)
|
||||
log.info("Tor is now running")
|
||||
cmd_args = ["tor", "--HardwareAccel", "1"]
|
||||
packages = ["tor"]
|
||||
cmd = nix_shell(packages, cmd_args)
|
||||
process = Popen(cmd)
|
||||
try:
|
||||
while not is_tor_running():
|
||||
log.debug("Waiting for Tor to start...")
|
||||
time.sleep(0.2)
|
||||
log.info("Tor is now running")
|
||||
yield
|
||||
finally:
|
||||
log.info("Terminating Tor process...")
|
||||
process.terminate()
|
||||
process.wait()
|
||||
log.info("Tor process terminated")
|
||||
|
||||
|
||||
def tor_online_test() -> bool:
|
||||
|
||||
Reference in New Issue
Block a user