clan-cli: Rework 'clan ssh' command, improve Tor support.

This commit is contained in:
Qubasa
2024-12-13 00:11:18 +01:00
parent 442147c4bd
commit 6b784f8623
13 changed files with 418 additions and 337 deletions

View File

@@ -30,7 +30,7 @@ from .flash import cli as flash_cli
from .hyperlink import help_hyperlink
from .machines import cli as machines
from .profiler import profile
from .ssh import cli as ssh_cli
from .ssh import deploy_info as ssh_cli
from .vars import cli as vars_cli
log = logging.getLogger(__name__)
@@ -432,7 +432,7 @@ def main() -> None:
if debug:
log.exception("Exited with error")
else:
log.error("%s", e) # noqa: TRY400
log.error("%s", e)
sys.exit(1)
except KeyboardInterrupt as ex:
log.warning("Interrupted by user", exc_info=ex)

View File

@@ -189,3 +189,13 @@ class ClanCmdError(ClanError):
def __repr__(self) -> str:
return f"ClanCmdError({self.cmd})"
class TorSocksError(ClanError):
def __init__(self, msg: str) -> None:
super().__init__(msg)
class TorConnectionError(ClanError):
def __init__(self, msg: str) -> None:
super().__init__(msg)

View File

@@ -25,14 +25,14 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool:
secret_name = secret_fact["name"]
if not secret_facts_store.exists(service, secret_name):
machine.info(
f"Secret fact '{secret_fact}' for service '{service}' in machine {machine.name} is missing."
f"Secret fact '{secret_fact}' for service '{service}' is missing."
)
missing_secret_facts.append((service, secret_name))
for public_fact in machine.facts_data[service]["public"]:
if not public_facts_store.exists(service, public_fact):
machine.info(
f"Public fact '{public_fact}' for service '{service}' in machine {machine.name} is missing."
f"Public fact '{public_fact}' for service '{service}' is missing."
)
missing_public_facts.append((service, public_fact))

View File

@@ -2,7 +2,6 @@ import argparse
import importlib
import logging
import os
import subprocess
import sys
from collections.abc import Callable
from pathlib import Path
@@ -32,9 +31,9 @@ def read_multiline_input(prompt: str = "Finish with Ctrl-D") -> str:
Read multi-line input from stdin.
"""
print(prompt, flush=True)
proc = subprocess.run(["cat"], stdout=subprocess.PIPE, text=True, check=False)
proc = run(["cat"], RunOpts(check=False))
log.info("Input received. Processing...")
return proc.stdout
return proc.stdout.rstrip(os.linesep).rstrip()
def bubblewrap_cmd(generator: str, facts_dir: Path, secrets_dir: Path) -> list[str]:

View File

@@ -1,6 +1,5 @@
import argparse
import importlib
import json
import logging
import os
import sys
@@ -21,7 +20,7 @@ from clan_cli.facts.generate import generate_facts
from clan_cli.machines.hardware import HardwareConfig
from clan_cli.machines.machines import Machine
from clan_cli.nix import nix_shell
from clan_cli.ssh.cli import is_ipv6, is_reachable, qrcode_scan
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse
from clan_cli.vars.generate import generate_vars
log = logging.getLogger(__name__)
@@ -36,7 +35,7 @@ class InstallOptions:
kexec: str | None = None
debug: bool = False
no_reboot: bool = False
json_ssh_deploy: dict[str, str] | None = None
deploy_info: DeployInfo | None = None
build_on_remote: bool = False
nix_options: list[str] = field(default_factory=list)
update_hardware_config: HardwareConfig = HardwareConfig.NONE
@@ -45,6 +44,12 @@ class InstallOptions:
@API.register
def install_machine(opts: InstallOptions) -> None:
# TODO: Fixme, replace opts.machine and opts.flake with machine object
# remove opts.deploy_info, opts.target_host and populate machine object
if opts.deploy_info:
msg = "Deploy info has not been fully implemented yet"
raise NotImplementedError(msg)
machine = Machine(opts.machine, flake=opts.flake)
machine.override_target_host = opts.target_host
@@ -129,21 +134,15 @@ def install_command(args: argparse.Namespace) -> None:
if args.flake is None:
msg = "Could not find clan flake toplevel directory"
raise ClanError(msg)
json_ssh_deploy = None
if args.json:
json_file = Path(args.json)
if json_file.is_file():
json_ssh_deploy = json.loads(json_file.read_text())
else:
json_ssh_deploy = json.loads(args.json)
elif args.png:
json_ssh_deploy = json.loads(qrcode_scan(args.png))
deploy_info: DeployInfo | None = ssh_command_parse(args)
if json_ssh_deploy:
target_host = (
f"root@{find_reachable_host_from_deploy_json(json_ssh_deploy)}"
)
password = json_ssh_deploy["pass"]
if deploy_info:
host = find_reachable_host(deploy_info)
if host is None:
msg = f"Couldn't reach any host address: {deploy_info.addrs}"
raise ClanError(msg)
target_host = host.target
password = deploy_info.pwd
elif args.target_host:
target_host = args.target_host
password = None
@@ -174,7 +173,7 @@ def install_command(args: argparse.Namespace) -> None:
kexec=args.kexec,
debug=args.debug,
no_reboot=args.no_reboot,
json_ssh_deploy=json_ssh_deploy,
deploy_info=deploy_info,
nix_options=args.option,
build_on_remote=args.build_on_remote,
update_hardware_config=HardwareConfig(args.update_hardware_config),
@@ -186,22 +185,6 @@ def install_command(args: argparse.Namespace) -> None:
sys.exit(1)
def find_reachable_host_from_deploy_json(deploy_json: dict[str, str]) -> str:
host = None
for addr in deploy_json["addrs"]:
if is_reachable(addr):
host = f"[{addr}]" if is_ipv6(addr) else addr
break
if not host:
msg = f"""
Could not reach any of the host addresses provided in the json string.
Please doublecheck if they are reachable from your machine.
Try `ping [ADDR]` with one of the addresses: {deploy_json['addrs']}
"""
raise ClanError(msg)
return host
def register_install_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--kexec",

View File

@@ -97,7 +97,11 @@ def nix_metadata(flake_url: str | Path) -> dict[str, Any]:
def nix_shell(packages: list[str], cmd: list[str]) -> list[str]:
# we cannot use nix-shell inside the nix sandbox
# in our tests we just make sure we have all the packages
if os.environ.get("IN_NIX_SANDBOX") or os.environ.get("CLAN_NO_DYNAMIC_DEPS"):
if (
os.environ.get("IN_NIX_SANDBOX")
or os.environ.get("CLAN_NO_DYNAMIC_DEPS")
or len(packages) == 0
):
return cmd
return [
*nix_command(["shell", "--inputs-from", f"{nixpkgs_flake()!s}"]),

View File

@@ -77,7 +77,7 @@ class KeyType(enum.Enum):
except FileNotFoundError:
return
except Exception as ex:
log.warning(f"Could not read age keys from {key_path}: {ex}")
log.warning(f"Could not read age keys from {key_path}", exc_info=ex)
# Sops will try every location, see age/keysource.go
if key_path := os.environ.get("SOPS_AGE_KEY_FILE"):
@@ -246,14 +246,10 @@ def sops_run(
def get_public_age_key(privkey: str) -> str:
cmd = nix_shell(["nixpkgs#age"], ["age-keygen", "-y"])
try:
res = subprocess.run(
cmd, input=privkey, stdout=subprocess.PIPE, text=True, check=True
)
except subprocess.CalledProcessError as e:
msg = "Failed to get public key for age private key. Is the key malformed?"
raise ClanError(msg) from e
return res.stdout.strip()
error_msg = "Failed to get public key for age private key. Is the key malformed?"
res = run(cmd, RunOpts(input=privkey.encode(), error_msg=error_msg))
return res.stdout.rstrip(os.linesep).rstrip()
def generate_private_key(out_file: Path | None = None) -> tuple[str, str]:

View File

@@ -1,137 +0,0 @@
import argparse
import ipaddress
import json
import logging
import socket
import subprocess
from pathlib import Path
from clan_cli.nix import nix_shell
log = logging.getLogger(__name__)
def ssh(
host: str,
user: str = "root",
password: str | None = None,
ssh_args: list[str] | None = None,
torify: bool = False,
) -> None:
if ssh_args is None:
ssh_args = []
packages = ["nixpkgs#openssh"]
if torify:
packages.append("nixpkgs#tor")
password_args = []
if password:
packages.append("nixpkgs#sshpass")
password_args = [
"sshpass",
"-p",
password,
]
_ssh_args = [
*ssh_args,
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
]
if password:
_ssh_args.append("-o")
_ssh_args.append("IdentitiesOnly=yes")
cmd_args = [*password_args, *_ssh_args]
if torify:
cmd_args.insert(0, "torify")
cmd = nix_shell(packages, cmd_args)
subprocess.run(cmd, check=True)
def qrcode_scan(picture_file: str) -> str:
return (
subprocess.run(
nix_shell(
["nixpkgs#zbar"],
[
"zbarimg",
"--quiet",
"--raw",
picture_file,
],
),
stdout=subprocess.PIPE,
check=True,
)
.stdout.decode()
.strip()
)
def is_reachable(host: str) -> bool:
sock = socket.socket(
socket.AF_INET6 if ":" in host else socket.AF_INET, socket.SOCK_STREAM
)
sock.settimeout(2)
try:
sock.connect((host, 22))
sock.close()
except OSError:
return False
else:
return True
def connect_ssh_from_json(ssh_data: dict[str, str]) -> None:
for address in ssh_data["addrs"]:
log.debug(f"Trying to reach host on: {address}")
if is_reachable(address):
ssh(host=address, password=ssh_data["pass"])
exit(0)
else:
log.debug(f"Could not reach host on {address}")
log.debug(f'Trying to reach host via torify on {ssh_data["tor"]}')
ssh(host=ssh_data["tor"], password=ssh_data["pass"], torify=True)
def is_ipv6(ip: str) -> bool:
try:
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
except ValueError:
return False
def main(args: argparse.Namespace) -> None:
if args.json:
json_file = Path(args.json)
if json_file.is_file():
ssh_data = json.loads(json_file.read_text())
else:
ssh_data = json.loads(args.json)
connect_ssh_from_json(ssh_data)
elif args.png:
ssh_data = json.loads(qrcode_scan(args.png))
connect_ssh_from_json(ssh_data)
def register_parser(parser: argparse.ArgumentParser) -> None:
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-j",
"--json",
help="specify the json file for ssh data (generated by starting the clan installer)",
)
group.add_argument(
"-P",
"--png",
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
)
# TODO pass all args we don't parse into ssh_args, currently it fails if arg starts with -
parser.add_argument("ssh_args", nargs="*", default=[])
parser.set_defaults(func=main)

View File

@@ -0,0 +1,127 @@
import argparse
import ipaddress
import json
import logging
import shlex
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from clan_cli.cmd import run
from clan_cli.errors import ClanError, TorConnectionError, TorSocksError
from clan_cli.nix import nix_shell
from clan_cli.ssh.host import Host, is_ssh_reachable
from clan_cli.ssh.tor import TorTarget, ssh_tor_reachable
log = logging.getLogger(__name__)
@dataclass
class DeployInfo:
addrs: list[str]
tor: str | None = None
pwd: str | None = None
@staticmethod
def from_json(data: dict[str, Any]) -> "DeployInfo":
return DeployInfo(tor=data["tor"], pwd=data["pass"], addrs=data["addrs"])
def is_ipv6(ip: str) -> bool:
try:
return isinstance(ipaddress.ip_address(ip), ipaddress.IPv6Address)
except ValueError:
return False
def find_reachable_host(deploy_info: DeployInfo) -> Host | None:
host = None
for addr in deploy_info.addrs:
host_addr = f"[{addr}]" if is_ipv6(addr) else addr
host = Host(host=host_addr)
if is_ssh_reachable(host):
break
return host
def qrcode_scan(picture_file: Path) -> str:
cmd = nix_shell(
["nixpkgs#zbar"],
[
"zbarimg",
"--quiet",
"--raw",
str(picture_file),
],
)
res = run(cmd)
return res.stdout.strip()
def parse_qr_code(picture_file: Path) -> DeployInfo:
data = qrcode_scan(picture_file)
return DeployInfo.from_json(json.loads(data))
def ssh_shell_from_deploy(deploy_info: DeployInfo) -> None:
if host := find_reachable_host(deploy_info):
host.connect_ssh_shell(password=deploy_info.pwd)
else:
log.info("Could not reach host via clearnet 'addrs'")
log.info(f"Trying to reach host via tor '{deploy_info.tor}'")
if not deploy_info.tor:
msg = "No tor address provided, please provide a tor address."
raise ClanError(msg)
if ssh_tor_reachable(TorTarget(onion=deploy_info.tor, port=22)):
host = Host(host=deploy_info.tor)
host.connect_ssh_shell(password=deploy_info.pwd, tor_socks=True)
else:
msg = "Could not reach host via tor either."
raise ClanError(msg)
def ssh_command_parse(args: argparse.Namespace) -> DeployInfo | None:
if args.json:
json_file = Path(args.json)
if json_file.is_file():
data = json.loads(json_file.read_text())
return DeployInfo.from_json(data)
data = json.loads(args.json)
return DeployInfo.from_json(data)
if args.png:
return parse_qr_code(Path(args.png))
return None
def ssh_command(args: argparse.Namespace) -> None:
deploy_info = ssh_command_parse(args)
if not deploy_info:
msg = "No --json or --png data provided"
raise ClanError(msg)
try:
ssh_shell_from_deploy(deploy_info)
except TorSocksError as ex:
log.error(ex)
tor_cmd = nix_shell(["nixpkgs#tor"], ["tor"])
log.error("Is Tor running? If not, you can start it by running:")
log.error(f"{' '.join(shlex.quote(arg) for arg in tor_cmd)}")
except TorConnectionError:
log.error("The onion address is not reachable via Tor.")
def register_parser(parser: argparse.ArgumentParser) -> None:
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-j",
"--json",
help="specify the json file for ssh data (generated by starting the clan installer)",
)
group.add_argument(
"-P",
"--png",
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
)
parser.add_argument(
"--ssh_args", nargs=argparse.REMAINDER, help="additional ssh arguments"
)
parser.set_defaults(func=ssh_command)

View File

@@ -3,6 +3,8 @@
import logging
import os
import shlex
import socket
import subprocess
from dataclasses import dataclass, field
from shlex import quote
from typing import Any
@@ -10,6 +12,7 @@ from typing import Any
from clan_cli.cmd import CmdOut, RunOpts, run
from clan_cli.colors import AnsiColor
from clan_cli.errors import ClanError
from clan_cli.nix import nix_shell
from clan_cli.ssh.host_key import HostKeyCheck
cmdlog = logging.getLogger(__name__)
@@ -40,13 +43,6 @@ class Host:
def target(self) -> str:
return f"{self.user or 'root'}@{self.host}"
@property
def target_for_rsync(self) -> str:
host = self.host
if ":" in host:
host = f"[{host}]"
return f"{self.user or 'root'}@{host}"
def run_local(
self,
cmd: list[str],
@@ -163,8 +159,20 @@ class Host:
def ssh_cmd(
self,
verbose_ssh: bool = False,
tor_socks: bool = False,
tty: bool = False,
password: str | None = None,
) -> list[str]:
packages = []
password_args = []
if password:
packages.append("nixpkgs#sshpass")
password_args = [
"sshpass",
"-p",
password,
]
ssh_opts = self.ssh_cmd_opts
if verbose_ssh or self.verbose_ssh:
ssh_opts.extend(["-v"])
@@ -176,8 +184,36 @@ class Host:
if self.key:
ssh_opts.extend(["-i", self.key])
return [
if tor_socks:
ssh_opts.append("-o")
ssh_opts.append("ProxyCommand=nc -x 127.0.0.1:9050 -X 5 %h %p")
cmd = [
*password_args,
"ssh",
self.target,
*ssh_opts,
]
return nix_shell(packages, cmd)
def connect_ssh_shell(
self, *, password: str | None = None, tor_socks: bool = False
) -> None:
cmd = self.ssh_cmd(tor_socks=tor_socks, password=password)
subprocess.run(cmd)
def is_ssh_reachable(host: Host) -> bool:
sock = socket.socket(
socket.AF_INET6 if ":" in host.host else socket.AF_INET, socket.SOCK_STREAM
)
sock.settimeout(2)
try:
sock.connect((host.host, host.port or 22))
sock.close()
except OSError:
return False
else:
return True

203
pkgs/clan-cli/clan_cli/ssh/tor.py Executable file
View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
import argparse
import logging
import socket
import struct
from dataclasses import dataclass
from clan_cli.cmd import run
from clan_cli.errors import TorConnectionError, TorSocksError
from clan_cli.nix import nix_shell
log = logging.getLogger(__name__)
@dataclass
class TorTarget:
onion: str
port: int
proxy_host: str = "127.0.0.1"
proxy_port: int = 9050
def connect_to_tor_socks(sock: socket.socket, target: TorTarget) -> socket.socket:
"""
Connects to a .onion host through Tor's SOCKS5 proxy using the standard library.
Args:
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
Returns:
socket.socket: A socket connected to the .onion address via Tor.
"""
try:
# 1. Create a socket to the Tor SOCKS proxy
sock.connect((target.proxy_host, target.proxy_port))
except ConnectionRefusedError as ex:
msg = f"Failed to connect to Tor SOCKS proxy at {target.proxy_host}:{target.proxy_port}: {ex}"
raise TorSocksError(msg) from ex
# 2. SOCKS5 handshake
sock.sendall(
b"\x05\x01\x00"
) # SOCKS version (0x05), number of authentication methods (0x01), no-authentication (0x00)
response = sock.recv(2)
# Validate the SOCKS5 handshake response
if response != b"\x05\x00": # SOCKS version = 0x05, no-authentication = 0x00
msg = f"SOCKS5 handshake failed, unexpected response: {response.hex()}"
raise TorSocksError(msg)
# 3. Connection request
request = (
b"\x05\x01\x00\x03" # SOCKS version, connect command, reserved, address type = domainname
+ bytes([len(target.onion)])
+ target.onion.encode("utf-8") # Add domain name length and domain name
+ struct.pack(">H", target.port) # Add destination port in network byte order
)
sock.sendall(request)
# Read the connection request response
response = sock.recv(10)
if response[1] != 0x00: # 0x00 indicates success
msg = f".onion address not reachable: {response[1]}"
raise TorConnectionError(msg)
return sock
def fetch_onion_content(target: TorTarget) -> str:
"""
Fetches the HTTP response from a .onion service through a Tor SOCKS5 proxy.
Args:
target (TorTarget): An object containing the .onion address, port, proxy host, and proxy port.
Returns:
str: The HTTP response text, or an error message if something goes wrong.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Connect to the .onion service via the SOCKS proxy
sock = connect_to_tor_socks(sock, target)
# 1. Send an HTTP GET request
request = f"GET / HTTP/1.1\r\nHost: {target.onion}\r\nConnection: close\r\n\r\n"
sock.sendall(request.encode("utf-8"))
# 2. Read the HTTP response
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
return response.decode("utf-8", errors="replace")
def spawn_tor() -> None:
"""
Spawns a Tor process using `nix-shell`.
"""
cmd_args = ["tor"]
packages = ["nixpkgs#tor"]
cmd = nix_shell(packages, cmd_args)
run(cmd)
def tor_online_test() -> bool:
"""
Tests if Tor is online by attempting to fetch content from a known .onion service.
Returns True if successful, False otherwise.
"""
target = TorTarget(
onion="duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion", port=80
)
try:
response = fetch_onion_content(target)
except TorConnectionError:
return False
else:
return "duckduckgo" in response
def ssh_tor_reachable(target: TorTarget) -> bool:
"""
Tests if SSH is reachable via Tor by attempting to connect to a known .onion service.
Returns True if successful, False otherwise.
"""
try:
response = fetch_onion_content(target)
except TorConnectionError:
return False
else:
return "SSH-" in response
def main() -> None:
"""
Main function to handle command-line arguments and execute the script.
"""
parser = argparse.ArgumentParser(
description="Interact with a .onion service through Tor SOCKS5 proxy."
)
parser.add_argument(
"onion_url", type=str, help=".onion URL to connect to (e.g., 'example.onion')"
)
parser.add_argument(
"--port", type=int, help="Port to connect to on the .onion URL (default: 80)"
)
parser.add_argument(
"--proxy-host",
type=str,
default="127.0.0.1",
help="Address of the Tor SOCKS5 proxy (default: 127.0.0.1)",
)
parser.add_argument(
"--proxy-port",
type=int,
default=9050,
help="Port of the Tor SOCKS5 proxy (default: 9050)",
)
parser.add_argument(
"--ssh-tor-reachable",
action="store_true",
help="Test if SSH is reachable via Tor",
)
args = parser.parse_args()
default_port = 22 if args.ssh_tor_reachable else 80
# Create a TorTarget instance
target = TorTarget(
onion=args.onion_url,
port=args.port or default_port,
proxy_host=args.proxy_host,
proxy_port=args.proxy_port,
)
if args.ssh_tor_reachable:
print(f"Testing if SSH is reachable via Tor for {target.onion}...")
reachable = ssh_tor_reachable(target)
print(f"SSH is {'reachable' if reachable else 'not reachable'} via Tor.")
return
print(
f"Connecting to {target.onion} on port {target.port} via proxy {target.proxy_host}:{target.proxy_port}..."
)
try:
response = fetch_onion_content(target)
print("Response:")
print(response)
except TorSocksError:
log.error("Failed to connect to the Tor SOCKS proxy.")
log.error(
"Is Tor running? If not, you can start it by running 'tor' in a nix-shell."
)
except TorConnectionError:
log.error("The onion address is not reachable via Tor.")
if __name__ == "__main__":
main()

View File

@@ -1,141 +0,0 @@
import os
import sys
import clan_cli
import pytest
import pytest_subprocess.fake_process
from clan_cli.ssh import cli
from pytest_subprocess import utils
from stdout import CaptureOutput
def test_no_args(
monkeypatch: pytest.MonkeyPatch,
capture_output: CaptureOutput,
) -> None:
monkeypatch.setattr(sys, "argv", ["", "ssh"])
with capture_output as output, pytest.raises(SystemExit):
clan_cli.main()
assert output.err.startswith("usage:")
# using fp fixture from pytest-subprocess
def test_ssh_no_pass(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
)
assert fp.call_count(cmd) == 1
def test_ssh_with_pass(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"sshpass",
"-p",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
password="XXX",
)
assert fp.call_count(cmd) == 1
def test_ssh_no_pass_with_torify(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"torify",
"ssh",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"StrictHostKeyChecking=no",
f"{user}@{host}",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
torify=True,
)
assert fp.call_count(cmd) == 1
def test_ssh_with_pass_with_torify(
fp: pytest_subprocess.fake_process.FakeProcess, monkeypatch: pytest.MonkeyPatch
) -> None:
host = "somehost"
user = "user"
if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[str | utils.Any] = [
"nix",
fp.any(),
"shell",
fp.any(),
"-c",
"torify",
"sshpass",
"-p",
fp.any(),
]
fp.register(cmd)
cli.ssh(
host=host,
user=user,
password="XXX",
torify=True,
)
assert fp.call_count(cmd) == 1
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
cmd: list[str | utils.Any] = [fp.any()]
fp.register(cmd, stdout="https://test.test")
result = cli.qrcode_scan("test.png")
assert result == "https://test.test"

View File

@@ -48,6 +48,7 @@ lint.ignore = [
"A003",
"ANN101",
"ANN401",
"TRY400",
"E402",
"E501",
"E731",