clan-cli: Fix clan install command and multiple other issues

This commit is contained in:
Qubasa
2024-12-18 15:29:08 +01:00
parent 695ec0b71c
commit a452cc1a02
5 changed files with 65 additions and 47 deletions

View File

@@ -11,6 +11,8 @@ from clan_cli.cmd import run
from clan_cli.errors import ClanError
from clan_cli.nix import nix_shell
from clan_cli.ssh.host import Host, is_ssh_reachable
from clan_cli.ssh.host_key import HostKeyCheck
from clan_cli.ssh.parse import parse_deployment_address
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
log = logging.getLogger(__name__)
@@ -24,7 +26,9 @@ class DeployInfo:
@staticmethod
def from_json(data: dict[str, Any]) -> "DeployInfo":
return DeployInfo(tor=data["tor"], pwd=data["pass"], addrs=data["addrs"])
return DeployInfo(
tor=data.get("tor"), pwd=data.get("pass"), addrs=data.get("addrs", [])
)
def is_ipv6(ip: str) -> bool:
@@ -34,11 +38,15 @@ def is_ipv6(ip: str) -> bool:
return False
def find_reachable_host(deploy_info: DeployInfo) -> Host | None:
def find_reachable_host(
deploy_info: DeployInfo, host_key_check: HostKeyCheck
) -> Host | None:
host = None
for addr in deploy_info.addrs:
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
host = Host(host=host_addr)
host = parse_deployment_address(
machine_name="uknown", host=host_addr, host_key_check=host_key_check
)
if is_ssh_reachable(host):
break
return host
@@ -63,8 +71,10 @@ def parse_qr_code(picture_file: Path) -> DeployInfo:
return DeployInfo.from_json(json.loads(data))
def ssh_shell_from_deploy(deploy_info: DeployInfo, runtime: AsyncRuntime) -> None:
if host := find_reachable_host(deploy_info):
def ssh_shell_from_deploy(
deploy_info: DeployInfo, runtime: AsyncRuntime, host_key_check: HostKeyCheck
) -> None:
if host := find_reachable_host(deploy_info, host_key_check):
host.connect_ssh_shell(password=deploy_info.pwd)
else:
log.info("Could not reach host via clearnet 'addrs'")
@@ -95,13 +105,14 @@ def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
def ssh_command(args: argparse.Namespace) -> None:
host_key_check = HostKeyCheck.from_str(args.host_key_check)
deploy_info = ssh_command_parse(args)
if not deploy_info:
msg = "No --json or --png data provided"
raise ClanError(msg)
with AsyncRuntime() as runtime:
ssh_shell_from_deploy(deploy_info, runtime)
ssh_shell_from_deploy(deploy_info, runtime, host_key_check)
def register_parser(parser: argparse.ArgumentParser) -> None:
@@ -119,4 +130,10 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
)
parser.add_argument(
"--host-key-check",
choices=["strict", "ask", "tofu", "none"],
default="ask",
help="Host key (.ssh/known_hosts) check mode.",
)
parser.set_defaults(func=ssh_command)

View File

@@ -207,14 +207,14 @@ class Host:
def is_ssh_reachable(host: Host) -> bool:
sock = socket.socket(
with socket.socket(
socket.AF_INET6 if ":" in host.host else socket.AF_INET, socket.SOCK_STREAM
)
sock.settimeout(2)
try:
sock.connect((host.host, host.port or 22))
sock.close()
except OSError:
return False
else:
return True
) as sock:
sock.settimeout(2)
try:
sock.connect((host.host, host.port or 22))
sock.close()
except OSError:
return False
else:
return True