Merge pull request 'change install test to run clan outside of the VM' (#3906) from vm-test into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/3906
This commit is contained in:
28
pkgs/clan-cli/clan_cli/host_key_check.py
Normal file
28
pkgs/clan-cli/clan_cli/host_key_check.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""Common argument types and utilities for host key checking in clan CLI commands."""
|
||||
|
||||
import argparse
|
||||
|
||||
from clan_lib.ssh.host_key import HostKeyCheck
|
||||
|
||||
|
||||
def host_key_check_type(value: str) -> HostKeyCheck:
|
||||
"""
|
||||
Argparse type converter for HostKeyCheck enum.
|
||||
"""
|
||||
try:
|
||||
return HostKeyCheck(value)
|
||||
except ValueError:
|
||||
valid_values = [e.value for e in HostKeyCheck]
|
||||
msg = f"Invalid host key check mode: {value}. Valid options: {', '.join(valid_values)}"
|
||||
raise argparse.ArgumentTypeError(msg) from None
|
||||
|
||||
|
||||
def add_host_key_check_arg(
|
||||
parser: argparse.ArgumentParser, default: HostKeyCheck = HostKeyCheck.ASK
|
||||
) -> None:
|
||||
parser.add_argument(
|
||||
"--host-key-check",
|
||||
type=host_key_check_type,
|
||||
default=default,
|
||||
help=f"Host key (.ssh/known_hosts) check mode. Options: {', '.join([e.value for e in HostKeyCheck])}",
|
||||
)
|
||||
@@ -1,5 +1,6 @@
|
||||
import argparse
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from clan_lib.machines.hardware import (
|
||||
HardwareConfig,
|
||||
@@ -11,6 +12,7 @@ from clan_lib.machines.suggestions import validate_machine_names
|
||||
from clan_lib.ssh.remote import Remote
|
||||
|
||||
from clan_cli.completions import add_dynamic_completer, complete_machines
|
||||
from clan_cli.host_key_check import add_host_key_check_arg
|
||||
|
||||
from .types import machine_name_type
|
||||
|
||||
@@ -19,7 +21,6 @@ log = logging.getLogger(__name__)
|
||||
|
||||
def update_hardware_config_command(args: argparse.Namespace) -> None:
|
||||
validate_machine_names([args.machine], args.flake)
|
||||
host_key_check = args.host_key_check
|
||||
machine = Machine(flake=args.flake, name=args.machine)
|
||||
opts = HardwareGenerateOptions(
|
||||
machine=machine,
|
||||
@@ -30,9 +31,13 @@ def update_hardware_config_command(args: argparse.Namespace) -> None:
|
||||
if args.target_host:
|
||||
target_host = Remote.from_ssh_uri(
|
||||
machine_name=machine.name, address=args.target_host
|
||||
).override(host_key_check=host_key_check)
|
||||
)
|
||||
else:
|
||||
target_host = machine.target_host().override(host_key_check=host_key_check)
|
||||
target_host = machine.target_host()
|
||||
|
||||
target_host = target_host.override(
|
||||
host_key_check=args.host_key_check, private_key=args.identity_file
|
||||
)
|
||||
|
||||
generate_machine_hardware_info(opts, target_host)
|
||||
|
||||
@@ -51,12 +56,7 @@ def register_update_hardware_config(parser: argparse.ArgumentParser) -> None:
|
||||
nargs="?",
|
||||
help="ssh address to install to in the form of user@host:2222",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host-key-check",
|
||||
choices=["strict", "ask", "tofu", "none"],
|
||||
default="ask",
|
||||
help="Host key (.ssh/known_hosts) check mode.",
|
||||
)
|
||||
add_host_key_check_arg(parser)
|
||||
parser.add_argument(
|
||||
"--password",
|
||||
help="Pre-provided password the cli will prompt otherwise if needed.",
|
||||
@@ -69,3 +69,9 @@ def register_update_hardware_config(parser: argparse.ArgumentParser) -> None:
|
||||
choices=["nixos-generate-config", "nixos-facter"],
|
||||
default="nixos-facter",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
dest="identity_file",
|
||||
type=Path,
|
||||
help="specify which SSH private key file to use",
|
||||
)
|
||||
|
||||
@@ -13,6 +13,7 @@ from clan_cli.completions import (
|
||||
complete_machines,
|
||||
complete_target_host,
|
||||
)
|
||||
from clan_cli.host_key_check import add_host_key_check_arg
|
||||
from clan_cli.machines.hardware import HardwareConfig
|
||||
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host, ssh_command_parse
|
||||
|
||||
@@ -97,12 +98,7 @@ def register_install_parser(parser: argparse.ArgumentParser) -> None:
|
||||
help="do not reboot after installation (deprecated)",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host-key-check",
|
||||
choices=["strict", "ask", "tofu", "none"],
|
||||
default="ask",
|
||||
help="Host key (.ssh/known_hosts) check mode.",
|
||||
)
|
||||
add_host_key_check_arg(parser)
|
||||
parser.add_argument(
|
||||
"--build-on",
|
||||
choices=[x.value for x in BuildOn],
|
||||
|
||||
@@ -16,6 +16,7 @@ from clan_cli.completions import (
|
||||
complete_machines,
|
||||
complete_tags,
|
||||
)
|
||||
from clan_cli.host_key_check import add_host_key_check_arg
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -163,12 +164,7 @@ def register_update_parser(parser: argparse.ArgumentParser) -> None:
|
||||
)
|
||||
add_dynamic_completer(tag_parser, complete_tags)
|
||||
|
||||
parser.add_argument(
|
||||
"--host-key-check",
|
||||
choices=["strict", "ask", "tofu", "none"],
|
||||
default="ask",
|
||||
help="Host key (.ssh/known_hosts) check mode.",
|
||||
)
|
||||
add_host_key_check_arg(parser)
|
||||
parser.add_argument(
|
||||
"--target-host",
|
||||
type=str,
|
||||
|
||||
@@ -8,12 +8,14 @@ from typing import Any
|
||||
from clan_lib.cmd import run
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.nix import nix_shell
|
||||
from clan_lib.ssh.remote import HostKeyCheck, Remote
|
||||
from clan_lib.ssh.host_key import HostKeyCheck
|
||||
from clan_lib.ssh.remote import Remote
|
||||
|
||||
from clan_cli.completions import (
|
||||
add_dynamic_completer,
|
||||
complete_machines,
|
||||
)
|
||||
from clan_cli.host_key_check import add_host_key_check_arg
|
||||
from clan_cli.ssh.tor import TorTarget, spawn_tor, ssh_tor_reachable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -181,10 +183,5 @@ def register_parser(parser: argparse.ArgumentParser) -> None:
|
||||
"--png",
|
||||
help="specify the json file for ssh data as the qrcode image (generated by starting the clan installer)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host-key-check",
|
||||
choices=["strict", "ask", "tofu", "none"],
|
||||
default="tofu",
|
||||
help="Host key (.ssh/known_hosts) check mode.",
|
||||
)
|
||||
add_host_key_check_arg(parser, default=HostKeyCheck.TOFU)
|
||||
parser.set_defaults(func=ssh_command)
|
||||
|
||||
@@ -4,6 +4,7 @@ from pathlib import Path
|
||||
import pytest
|
||||
from clan_lib.cmd import RunOpts, run
|
||||
from clan_lib.nix import nix_shell
|
||||
from clan_lib.ssh.host_key import HostKeyCheck
|
||||
from clan_lib.ssh.remote import Remote
|
||||
|
||||
from clan_cli.ssh.deploy_info import DeployInfo, find_reachable_host
|
||||
@@ -23,7 +24,7 @@ def test_qrcode_scan(temp_dir: Path) -> None:
|
||||
run(cmd, RunOpts(input=data.encode()))
|
||||
|
||||
# Call the qrcode_scan function
|
||||
deploy_info = DeployInfo.from_qr_code(img_path, "none")
|
||||
deploy_info = DeployInfo.from_qr_code(img_path, HostKeyCheck.NONE)
|
||||
|
||||
host = deploy_info.addrs[0]
|
||||
assert host.address == "192.168.122.86"
|
||||
@@ -46,7 +47,7 @@ def test_qrcode_scan(temp_dir: Path) -> None:
|
||||
|
||||
def test_from_json() -> None:
|
||||
data = '{"pass":"scabbed-defender-headlock","tor":"qjeerm4r6t55hcfum4pinnvscn5njlw2g3k7ilqfuu7cdt3ahaxhsbid.onion","addrs":["192.168.122.86"]}'
|
||||
deploy_info = DeployInfo.from_json(json.loads(data), "none")
|
||||
deploy_info = DeployInfo.from_json(json.loads(data), HostKeyCheck.NONE)
|
||||
|
||||
host = deploy_info.addrs[0]
|
||||
assert host.password == "scabbed-defender-headlock"
|
||||
@@ -69,7 +70,9 @@ def test_from_json() -> None:
|
||||
@pytest.mark.with_core
|
||||
def test_find_reachable_host(hosts: list[Remote]) -> None:
|
||||
host = hosts[0]
|
||||
deploy_info = DeployInfo.from_hostnames(["172.19.1.2", host.ssh_url()], "none")
|
||||
deploy_info = DeployInfo.from_hostnames(
|
||||
["172.19.1.2", host.ssh_url()], HostKeyCheck.NONE
|
||||
)
|
||||
|
||||
assert deploy_info.addrs[0].address == "172.19.1.2"
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from clan_cli.tests.sshd import Sshd
|
||||
from clan_lib.ssh.host_key import HostKeyCheck
|
||||
from clan_lib.ssh.remote import Remote
|
||||
|
||||
|
||||
@@ -16,7 +17,7 @@ def hosts(sshd: Sshd) -> list[Remote]:
|
||||
port=sshd.port,
|
||||
user=login,
|
||||
private_key=Path(sshd.key),
|
||||
host_key_check="none",
|
||||
host_key_check=HostKeyCheck.NONE,
|
||||
command_prefix="local_test",
|
||||
)
|
||||
]
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
# Adapted from https://github.com/numtide/deploykit
|
||||
|
||||
from typing import Literal
|
||||
from enum import Enum
|
||||
|
||||
from clan_lib.errors import ClanError
|
||||
|
||||
HostKeyCheck = Literal[
|
||||
"strict", # Strictly check ssh host keys, prompt for unknown ones
|
||||
"ask", # Ask for confirmation on first use
|
||||
"tofu", # Trust on ssh keys on first use
|
||||
"none", # Do not check ssh host keys
|
||||
]
|
||||
|
||||
class HostKeyCheck(Enum):
|
||||
STRICT = "strict" # Strictly check ssh host keys, prompt for unknown ones
|
||||
ASK = "ask" # Ask for confirmation on first use
|
||||
TOFU = "tofu" # Trust on ssh keys on first use
|
||||
NONE = "none" # Do not check ssh host keys
|
||||
|
||||
|
||||
def hostkey_to_ssh_opts(host_key_check: HostKeyCheck) -> list[str]:
|
||||
@@ -17,13 +17,13 @@ def hostkey_to_ssh_opts(host_key_check: HostKeyCheck) -> list[str]:
|
||||
Convert a HostKeyCheck value to SSH options.
|
||||
"""
|
||||
match host_key_check:
|
||||
case "strict":
|
||||
case HostKeyCheck.STRICT:
|
||||
return ["-o", "StrictHostKeyChecking=yes"]
|
||||
case "ask":
|
||||
case HostKeyCheck.ASK:
|
||||
return []
|
||||
case "tofu":
|
||||
case HostKeyCheck.TOFU:
|
||||
return ["-o", "StrictHostKeyChecking=accept-new"]
|
||||
case "none":
|
||||
case HostKeyCheck.NONE:
|
||||
return [
|
||||
"-o",
|
||||
"StrictHostKeyChecking=no",
|
||||
|
||||
@@ -39,7 +39,7 @@ class Remote:
|
||||
private_key: Path | None = None
|
||||
password: str | None = None
|
||||
forward_agent: bool = True
|
||||
host_key_check: HostKeyCheck = "ask"
|
||||
host_key_check: HostKeyCheck = HostKeyCheck.ASK
|
||||
verbose_ssh: bool = False
|
||||
ssh_options: dict[str, str] = field(default_factory=dict)
|
||||
tor_socks: bool = False
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
from clan_lib.async_run import AsyncRuntime
|
||||
from clan_lib.cmd import ClanCmdTimeoutError, Log, RunOpts
|
||||
from clan_lib.errors import ClanError, CmdOut
|
||||
from clan_lib.ssh.host_key import HostKeyCheck
|
||||
from clan_lib.ssh.remote import Remote
|
||||
from clan_lib.ssh.sudo_askpass_proxy import SudoAskpassProxy
|
||||
|
||||
@@ -113,7 +114,7 @@ def test_parse_deployment_address(
|
||||
result = Remote.from_ssh_uri(
|
||||
machine_name=machine_name,
|
||||
address=test_addr,
|
||||
).override(host_key_check="strict")
|
||||
).override(host_key_check=HostKeyCheck.STRICT)
|
||||
|
||||
if expected_exception:
|
||||
return
|
||||
@@ -131,7 +132,7 @@ def test_parse_deployment_address(
|
||||
def test_parse_ssh_options() -> None:
|
||||
addr = "root@example.com:2222?IdentityFile=/path/to/private/key&StrictRemoteKeyChecking=yes"
|
||||
host = Remote.from_ssh_uri(machine_name="foo", address=addr).override(
|
||||
host_key_check="strict"
|
||||
host_key_check=HostKeyCheck.STRICT
|
||||
)
|
||||
assert host.address == "example.com"
|
||||
assert host.port == 2222
|
||||
|
||||
@@ -33,6 +33,7 @@ from clan_lib.nix_models.clan import (
|
||||
)
|
||||
from clan_lib.nix_models.clan import InventoryMachineDeploy as MachineDeploy
|
||||
from clan_lib.persist.util import set_value_by_path
|
||||
from clan_lib.ssh.host_key import HostKeyCheck
|
||||
from clan_lib.ssh.remote import Remote, can_ssh_login
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -188,7 +189,7 @@ def test_clan_create_api(
|
||||
clan_dir_flake.invalidate_cache()
|
||||
|
||||
target_host = machine.target_host().override(
|
||||
private_key=private_key, host_key_check="none"
|
||||
private_key=private_key, host_key_check=HostKeyCheck.NONE
|
||||
)
|
||||
result = can_ssh_login(target_host)
|
||||
assert result == "Online", f"Machine {machine.name} is not online"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
perSystem =
|
||||
{ pkgs, ... }:
|
||||
{ pkgs, lib, ... }:
|
||||
{
|
||||
legacyPackages = {
|
||||
setupNixInNix = ''
|
||||
@@ -21,57 +21,31 @@
|
||||
fi
|
||||
'';
|
||||
|
||||
setupNixInNixPythonPackage = pkgs.python3Packages.buildPythonPackage {
|
||||
pname = "setup-nix-in-nix";
|
||||
# NixOS test library combining port utils and clan VM test utilities
|
||||
nixosTestLib = pkgs.python3Packages.buildPythonPackage {
|
||||
pname = "nixos-test-lib";
|
||||
version = "1.0.0";
|
||||
format = "other";
|
||||
|
||||
dontUnpack = true;
|
||||
|
||||
installPhase = ''
|
||||
mkdir -p $out/${pkgs.python3.sitePackages}
|
||||
cat > $out/${pkgs.python3.sitePackages}/setup_nix_in_nix.py << 'EOF'
|
||||
from os import environ
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
def setup_nix_in_nix():
|
||||
"""Set up a Nix store inside the test environment."""
|
||||
environ['HOME'] = environ['TMPDIR']
|
||||
environ['NIX_STATE_DIR'] = environ['TMPDIR'] + '/nix'
|
||||
environ['NIX_CONF_DIR'] = environ['TMPDIR'] + '/etc'
|
||||
environ['IN_NIX_SANDBOX'] = '1'
|
||||
environ['CLAN_TEST_STORE'] = environ['TMPDIR'] + '/store'
|
||||
environ['LOCK_NIX'] = environ['TMPDIR'] + '/nix_lock'
|
||||
|
||||
Path(environ['CLAN_TEST_STORE'] + '/nix/store').mkdir(parents=True, exist_ok=True)
|
||||
Path(environ['CLAN_TEST_STORE'] + '/nix/var/nix/gcroots').mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if 'closureInfo' in environ:
|
||||
# Read store paths from the closure info file
|
||||
with open(environ['closureInfo'] + '/store-paths', 'r') as f:
|
||||
store_paths = f.read().strip().split('\n')
|
||||
|
||||
# Copy store paths using absolute path to cp
|
||||
subprocess.run(
|
||||
['${pkgs.coreutils}/bin/cp', '--recursive', '--target', environ['CLAN_TEST_STORE'] + '/nix/store'] + store_paths,
|
||||
check=True
|
||||
)
|
||||
|
||||
# Load the nix database using absolute path to nix-store
|
||||
with open(environ['closureInfo'] + '/registration', 'r') as f:
|
||||
subprocess.run(
|
||||
['${pkgs.nix}/bin/nix-store', '--load-db', '--store', environ['CLAN_TEST_STORE']],
|
||||
input=f.read(),
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
EOF
|
||||
touch $out/${pkgs.python3.sitePackages}/py.typed
|
||||
format = "pyproject";
|
||||
src = lib.fileset.toSource {
|
||||
root = ./.;
|
||||
fileset = lib.fileset.unions [
|
||||
./pyproject.toml
|
||||
./nixos_test_lib
|
||||
];
|
||||
};
|
||||
nativeBuildInputs = with pkgs.python3Packages; [
|
||||
setuptools
|
||||
wheel
|
||||
];
|
||||
postPatch = ''
|
||||
substituteInPlace nixos_test_lib/nix_setup.py \
|
||||
--replace '@cp@' '${pkgs.coreutils}/bin/cp' \
|
||||
--replace '@nix-store@' '${pkgs.nix}/bin/nix-store' \
|
||||
--replace '@xargs@' '${pkgs.findutils}/bin/xargs'
|
||||
'';
|
||||
|
||||
doCheck = false;
|
||||
};
|
||||
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
3
pkgs/testing/nixos_test_lib/__init__.py
Normal file
3
pkgs/testing/nixos_test_lib/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""NixOS test library for clan VM testing"""
|
||||
|
||||
__version__ = "1.0.0"
|
||||
96
pkgs/testing/nixos_test_lib/nix_setup.py
Normal file
96
pkgs/testing/nixos_test_lib/nix_setup.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""Nix store setup utilities for VM tests"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
# These paths will be substituted during package build
|
||||
CP_BIN = "@cp@"
|
||||
NIX_STORE_BIN = "@nix-store@"
|
||||
XARGS_BIN = "@xargs@"
|
||||
|
||||
|
||||
def setup_nix_in_nix(closure_info: str | None) -> None:
|
||||
"""Set up Nix store inside test environment
|
||||
|
||||
Args:
|
||||
closure_info: Path to closure info directory containing store-paths file,
|
||||
or None if no closure info
|
||||
"""
|
||||
tmpdir = Path(os.environ.get("TMPDIR", "/tmp")) # noqa: S108
|
||||
|
||||
# Remove NIX_REMOTE if present (we don't have any nix daemon running)
|
||||
if "NIX_REMOTE" in os.environ:
|
||||
del os.environ["NIX_REMOTE"]
|
||||
|
||||
# Set NIX_CONFIG globally to disable substituters for speed
|
||||
os.environ["NIX_CONFIG"] = "substituters = \ntrusted-public-keys = "
|
||||
|
||||
# Set up environment variables for test environment
|
||||
os.environ["HOME"] = str(tmpdir)
|
||||
os.environ["NIX_STATE_DIR"] = f"{tmpdir}/nix"
|
||||
os.environ["NIX_CONF_DIR"] = f"{tmpdir}/etc"
|
||||
os.environ["IN_NIX_SANDBOX"] = "1"
|
||||
os.environ["CLAN_TEST_STORE"] = f"{tmpdir}/store"
|
||||
os.environ["LOCK_NIX"] = f"{tmpdir}/nix_lock"
|
||||
|
||||
# Create necessary directories
|
||||
Path(f"{tmpdir}/nix").mkdir(parents=True, exist_ok=True)
|
||||
Path(f"{tmpdir}/etc").mkdir(parents=True, exist_ok=True)
|
||||
Path(f"{tmpdir}/store").mkdir(parents=True, exist_ok=True)
|
||||
Path(f"{tmpdir}/store/nix/store").mkdir(parents=True, exist_ok=True)
|
||||
Path(f"{tmpdir}/store/nix/var/nix/gcroots").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Set up Nix store if closure info is provided
|
||||
if closure_info and Path(closure_info).exists():
|
||||
store_paths_file = Path(closure_info) / "store-paths"
|
||||
if store_paths_file.exists():
|
||||
# Use xargs to handle potentially long lists of store paths
|
||||
# Equivalent to: xargs cp --recursive --target-directory
|
||||
# "$CLAN_TEST_STORE/nix/store" < "$closureInfo/store-paths"
|
||||
with store_paths_file.open() as f:
|
||||
subprocess.run( # noqa: S603
|
||||
[
|
||||
XARGS_BIN,
|
||||
CP_BIN,
|
||||
"--recursive",
|
||||
"--target-directory",
|
||||
f"{tmpdir}/store/nix/store",
|
||||
],
|
||||
stdin=f,
|
||||
check=True,
|
||||
)
|
||||
|
||||
# Load Nix database
|
||||
registration_file = Path(closure_info) / "registration"
|
||||
if registration_file.exists():
|
||||
with registration_file.open() as f:
|
||||
subprocess.run( # noqa: S603
|
||||
[NIX_STORE_BIN, "--load-db", "--store", f"{tmpdir}/store"],
|
||||
input=f.read(),
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
def prepare_test_flake(
|
||||
temp_dir: str, clan_core_for_checks: str, closure_info: str
|
||||
) -> str:
|
||||
"""Set up Nix store and copy test flake to temporary directory
|
||||
|
||||
Args:
|
||||
temp_dir: Temporary directory
|
||||
clan_core_for_checks: Path to clan-core-for-checks
|
||||
closure_info: Path to closure info for Nix store setup
|
||||
|
||||
Returns:
|
||||
Path to the test flake directory
|
||||
"""
|
||||
# Set up Nix store
|
||||
setup_nix_in_nix(closure_info)
|
||||
|
||||
# Copy test flake
|
||||
flake_dir = Path(temp_dir) / "test-flake"
|
||||
subprocess.run(["cp", "-r", clan_core_for_checks, flake_dir], check=True) # noqa: S603, S607
|
||||
subprocess.run(["chmod", "-R", "+w", flake_dir], check=True) # noqa: S603, S607
|
||||
return str(flake_dir)
|
||||
51
pkgs/testing/nixos_test_lib/port.py
Normal file
51
pkgs/testing/nixos_test_lib/port.py
Normal file
@@ -0,0 +1,51 @@
|
||||
"""Port management utilities for NixOS installation tests."""
|
||||
|
||||
import socket
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
|
||||
class PortUtilsError(Exception):
|
||||
"""Port utils related errors."""
|
||||
|
||||
|
||||
def find_free_port() -> int:
|
||||
"""Find a free port on the host."""
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(("", 0))
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
||||
def check_host_port_open(port: int) -> bool:
|
||||
"""Verify port forwarding is working by checking if the host port is listening."""
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.settimeout(1)
|
||||
result = s.connect_ex(("localhost", port))
|
||||
return result == 0
|
||||
except OSError as e:
|
||||
print(f"Port check failed: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def setup_port_forwarding(target: Any, host_port: int) -> None:
|
||||
"""Set up port forwarding and wait for it to be ready."""
|
||||
print(f"Setting up port forwarding from host port {host_port} to guest port 22")
|
||||
target.forward_port(host_port=host_port, guest_port=22)
|
||||
|
||||
# Give the port forwarding time to establish
|
||||
time.sleep(2)
|
||||
|
||||
# Wait up to 30 seconds for the port to become available
|
||||
port_ready = False
|
||||
for i in range(30):
|
||||
if check_host_port_open(host_port):
|
||||
port_ready = True
|
||||
print(f"Host port {host_port} is now listening")
|
||||
break
|
||||
print(f"Waiting for host port {host_port} to be ready... attempt {i + 1}/30")
|
||||
time.sleep(1)
|
||||
|
||||
if not port_ready:
|
||||
msg = f"Host port {host_port} never became available for forwarding"
|
||||
raise PortUtilsError(msg)
|
||||
0
pkgs/testing/nixos_test_lib/py.typed
Normal file
0
pkgs/testing/nixos_test_lib/py.typed
Normal file
40
pkgs/testing/nixos_test_lib/ssh.py
Normal file
40
pkgs/testing/nixos_test_lib/ssh.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""SSH and test setup utilities"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple
|
||||
|
||||
from .port import find_free_port, setup_port_forwarding
|
||||
|
||||
|
||||
class SSHConnection(NamedTuple):
|
||||
host_port: int
|
||||
ssh_key: str
|
||||
|
||||
|
||||
def setup_ssh_connection(
|
||||
target,
|
||||
temp_dir: str,
|
||||
assets_ssh_privkey: str,
|
||||
) -> SSHConnection:
|
||||
"""Set up SSH connection with port forwarding to test VM
|
||||
|
||||
Args:
|
||||
target: Test VM target
|
||||
temp_dir: Temporary directory for SSH key
|
||||
assets_ssh_privkey: Path to SSH private key asset
|
||||
|
||||
Returns:
|
||||
SSHConnection with host_port and ssh_key path
|
||||
"""
|
||||
host_port = find_free_port()
|
||||
target.wait_for_unit("sshd.service")
|
||||
target.wait_for_open_port(22)
|
||||
|
||||
setup_port_forwarding(target, host_port)
|
||||
|
||||
ssh_key = Path(temp_dir) / "id_ed25519"
|
||||
with ssh_key.open("w") as f, Path(assets_ssh_privkey).open() as src:
|
||||
f.write(src.read())
|
||||
ssh_key.chmod(0o600)
|
||||
|
||||
return SSHConnection(host_port, str(ssh_key))
|
||||
45
pkgs/testing/pyproject.toml
Normal file
45
pkgs/testing/pyproject.toml
Normal file
@@ -0,0 +1,45 @@
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "nixos-test-lib"
|
||||
version = "1.0.0"
|
||||
description = "NixOS test utilities for clan VM testing"
|
||||
authors = [
|
||||
{name = "Clan Core Team"}
|
||||
]
|
||||
dependencies = []
|
||||
|
||||
[project.optional-dependencies]
|
||||
dev = [
|
||||
"mypy",
|
||||
"ruff"
|
||||
]
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["."]
|
||||
include = ["nixos_test_lib*"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
"nixos_test_lib" = ["py.typed"]
|
||||
|
||||
[tool.mypy]
|
||||
python_version = "3.12"
|
||||
strict = true
|
||||
warn_return_any = true
|
||||
warn_unused_configs = true
|
||||
|
||||
[tool.ruff]
|
||||
target-version = "py312"
|
||||
line-length = 88
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["ALL"]
|
||||
ignore = [
|
||||
"D", # docstrings
|
||||
"ANN", # type annotations
|
||||
"COM812", # trailing comma
|
||||
"ISC001", # string concatenation
|
||||
"T201", # print statements
|
||||
]
|
||||
Reference in New Issue
Block a user