inventory tests: use containers by default
This commit is contained in:
121
lib/test/container-test-driver/driver-module.nix
Normal file
121
lib/test/container-test-driver/driver-module.nix
Normal file
@@ -0,0 +1,121 @@
|
||||
{
|
||||
hostPkgs,
|
||||
lib,
|
||||
config,
|
||||
...
|
||||
}:
|
||||
let
|
||||
testDriver = hostPkgs.python3.pkgs.callPackage ./package.nix {
|
||||
inherit (config) extraPythonPackages;
|
||||
inherit (hostPkgs.pkgs) util-linux systemd nix;
|
||||
};
|
||||
containers =
|
||||
testScript:
|
||||
map (m: [
|
||||
m.system.build.toplevel
|
||||
(hostPkgs.closureInfo {
|
||||
rootPaths = [
|
||||
m.system.build.toplevel
|
||||
(hostPkgs.writeText "testScript" testScript)
|
||||
];
|
||||
})
|
||||
]) (lib.attrValues config.nodes);
|
||||
pythonizeName =
|
||||
name:
|
||||
let
|
||||
head = lib.substring 0 1 name;
|
||||
tail = lib.substring 1 (-1) name;
|
||||
in
|
||||
(if builtins.match "[A-z_]" head == null then "_" else head)
|
||||
+ lib.stringAsChars (c: if builtins.match "[A-z0-9_]" c == null then "_" else c) tail;
|
||||
nodeHostNames =
|
||||
let
|
||||
nodesList = map (c: c.system.name) (lib.attrValues config.nodes);
|
||||
in
|
||||
nodesList ++ lib.optional (lib.length nodesList == 1 && !lib.elem "machine" nodesList) "machine";
|
||||
machineNames = map (name: "${name}: Machine;") pythonizedNames;
|
||||
pythonizedNames = map pythonizeName nodeHostNames;
|
||||
in
|
||||
{
|
||||
defaults.imports = [
|
||||
./nixos-module.nix
|
||||
];
|
||||
driver = lib.mkForce (
|
||||
hostPkgs.runCommand "nixos-test-driver-${config.name}"
|
||||
{
|
||||
nativeBuildInputs = [
|
||||
hostPkgs.makeWrapper
|
||||
] ++ lib.optionals (!config.skipTypeCheck) [ hostPkgs.mypy ];
|
||||
buildInputs = [ testDriver ];
|
||||
testScript = config.testScriptString;
|
||||
preferLocalBuild = true;
|
||||
passthru = config.passthru;
|
||||
meta = config.meta // {
|
||||
mainProgram = "nixos-test-driver";
|
||||
};
|
||||
}
|
||||
''
|
||||
mkdir -p $out/bin
|
||||
|
||||
${lib.optionalString (!config.skipTypeCheck) ''
|
||||
# prepend type hints so the test script can be type checked with mypy
|
||||
cat "${./test-script-prepend.py}" >> testScriptWithTypes
|
||||
echo "${builtins.toString machineNames}" >> testScriptWithTypes
|
||||
echo -n "$testScript" >> testScriptWithTypes
|
||||
|
||||
echo "Running type check (enable/disable: config.skipTypeCheck)"
|
||||
echo "See https://nixos.org/manual/nixos/stable/#test-opt-skipTypeCheck"
|
||||
|
||||
mypy --no-implicit-optional \
|
||||
--pretty \
|
||||
--no-color-output \
|
||||
testScriptWithTypes
|
||||
''}
|
||||
|
||||
echo -n "$testScript" >> $out/test-script
|
||||
|
||||
ln -s ${testDriver}/bin/nixos-test-driver $out/bin/nixos-test-driver
|
||||
|
||||
wrapProgram $out/bin/nixos-test-driver \
|
||||
${
|
||||
lib.concatStringsSep " " (
|
||||
map (container: "--add-flags '--container ${builtins.toString container}'") (
|
||||
containers config.testScriptString
|
||||
)
|
||||
)
|
||||
} \
|
||||
--add-flags "--test-script '$out/test-script'"
|
||||
''
|
||||
);
|
||||
|
||||
test = lib.mkForce (
|
||||
lib.lazyDerivation {
|
||||
# lazyDerivation improves performance when only passthru items and/or meta are used.
|
||||
derivation = hostPkgs.stdenv.mkDerivation {
|
||||
name = "vm-test-run-${config.name}";
|
||||
|
||||
requiredSystemFeatures = [ "uid-range" ];
|
||||
|
||||
buildCommand = ''
|
||||
mkdir -p $out
|
||||
|
||||
# effectively mute the XMLLogger
|
||||
export LOGFILE=/dev/null
|
||||
|
||||
${config.driver}/bin/nixos-test-driver -o $out
|
||||
'';
|
||||
|
||||
nativeBuildInputs = [
|
||||
hostPkgs.util-linux
|
||||
hostPkgs.coreutils
|
||||
hostPkgs.iproute2
|
||||
];
|
||||
|
||||
passthru = config.passthru;
|
||||
|
||||
meta = config.meta;
|
||||
};
|
||||
inherit (config) passthru meta;
|
||||
}
|
||||
);
|
||||
}
|
||||
26
lib/test/container-test-driver/nixos-module.nix
Normal file
26
lib/test/container-test-driver/nixos-module.nix
Normal file
@@ -0,0 +1,26 @@
|
||||
{ pkgs, lib, ... }:
|
||||
{
|
||||
boot.isContainer = true;
|
||||
|
||||
# needed since nixpkgs 7fb2f407c01b017737eafc26b065d7f56434a992 removed the getty unit by default
|
||||
console.enable = true;
|
||||
|
||||
# undo qemu stuff
|
||||
system.build.initialRamdisk = "";
|
||||
virtualisation.sharedDirectories = lib.mkForce { };
|
||||
networking.useDHCP = false;
|
||||
|
||||
# We use networkd to assign static ip addresses
|
||||
networking.useNetworkd = true;
|
||||
services.resolved.enable = false;
|
||||
|
||||
# Rename the host0 interface to eth0 to match what we expect in VM tests.
|
||||
system.activationScripts.renameInterface = ''
|
||||
${pkgs.iproute2}/bin/ip link set dev host0 name eth1
|
||||
'';
|
||||
|
||||
systemd.services.backdoor.enable = false;
|
||||
|
||||
# we don't have permission to set cpu scheduler in our container
|
||||
systemd.services.nix-daemon.serviceConfig.CPUSchedulingPolicy = lib.mkForce "";
|
||||
}
|
||||
40
lib/test/container-test-driver/package.nix
Normal file
40
lib/test/container-test-driver/package.nix
Normal file
@@ -0,0 +1,40 @@
|
||||
{
|
||||
extraPythonPackages ? (_ps: [ ]),
|
||||
python3Packages,
|
||||
python3,
|
||||
buildPythonApplication,
|
||||
setuptools,
|
||||
util-linux,
|
||||
systemd,
|
||||
nix,
|
||||
colorama,
|
||||
junit-xml,
|
||||
mkShell,
|
||||
}:
|
||||
let
|
||||
package = buildPythonApplication {
|
||||
pname = "test-driver";
|
||||
version = "0.0.1";
|
||||
propagatedBuildInputs = [
|
||||
util-linux
|
||||
systemd
|
||||
colorama
|
||||
junit-xml
|
||||
nix
|
||||
] ++ extraPythonPackages python3Packages;
|
||||
nativeBuildInputs = [ setuptools ];
|
||||
format = "pyproject";
|
||||
src = ./.;
|
||||
passthru.devShell = mkShell {
|
||||
packages = [
|
||||
(python3.withPackages (_ps: package.propagatedBuildInputs))
|
||||
package.propagatedBuildInputs
|
||||
python3.pkgs.pytest
|
||||
];
|
||||
shellHook = ''
|
||||
export PYTHONPATH="$(realpath .):$PYTHONPATH"
|
||||
'';
|
||||
};
|
||||
};
|
||||
in
|
||||
package
|
||||
22
lib/test/container-test-driver/pyproject.toml
Normal file
22
lib/test/container-test-driver/pyproject.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[build-system]
|
||||
requires = ["setuptools"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "nixos-test-driver"
|
||||
version = "0.0.0"
|
||||
|
||||
[project.scripts]
|
||||
nixos-test-driver = "test_driver:main"
|
||||
|
||||
[tool.setuptools.packages]
|
||||
find = {}
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
test_driver = ["py.typed"]
|
||||
[tool.mypy]
|
||||
python_version = "3.12"
|
||||
warn_redundant_casts = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_defs = true
|
||||
no_implicit_optional = true
|
||||
9
lib/test/container-test-driver/test-script-prepend.py
Normal file
9
lib/test/container-test-driver/test-script-prepend.py
Normal file
@@ -0,0 +1,9 @@
|
||||
# This file contains type hints that can be prepended to Nix test scripts so they can be type
|
||||
# checked.
|
||||
|
||||
from collections.abc import Callable
|
||||
|
||||
from test_driver import Machine
|
||||
|
||||
start_all: Callable[[], None]
|
||||
machines: list[Machine]
|
||||
549
lib/test/container-test-driver/test_driver/__init__.py
Normal file
549
lib/test/container-test-driver/test_driver/__init__.py
Normal file
@@ -0,0 +1,549 @@
|
||||
import argparse
|
||||
import ctypes
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
import types
|
||||
from collections.abc import Callable
|
||||
from contextlib import _GeneratorContextManager
|
||||
from dataclasses import dataclass
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any
|
||||
|
||||
from .logger import AbstractLogger, CompositeLogger, TerminalLogger
|
||||
|
||||
# Load the C library
|
||||
libc = ctypes.CDLL("libc.so.6", use_errno=True)
|
||||
|
||||
# Define the mount function
|
||||
libc.mount.argtypes = [
|
||||
ctypes.c_char_p, # source
|
||||
ctypes.c_char_p, # target
|
||||
ctypes.c_char_p, # filesystemtype
|
||||
ctypes.c_ulong, # mountflags
|
||||
ctypes.c_void_p, # data
|
||||
]
|
||||
libc.mount.restype = ctypes.c_int
|
||||
|
||||
MS_BIND = 0x1000
|
||||
MS_REC = 0x4000
|
||||
|
||||
|
||||
def mount(
|
||||
source: Path,
|
||||
target: Path,
|
||||
filesystemtype: str,
|
||||
mountflags: int = 0,
|
||||
data: str | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
A Python wrapper for the mount system call.
|
||||
|
||||
:param source: The source of the file system (e.g., device name, remote filesystem).
|
||||
:param target: The mount point (an existing directory).
|
||||
:param filesystemtype: The filesystem type (e.g., "ext4", "nfs").
|
||||
:param mountflags: Mount options flags.
|
||||
:param data: File system-specific data (e.g., options like "rw").
|
||||
:raises OSError: If the mount system call fails.
|
||||
"""
|
||||
# Convert Python strings to C-compatible strings
|
||||
source_c = ctypes.c_char_p(str(source).encode("utf-8"))
|
||||
target_c = ctypes.c_char_p(str(target).encode("utf-8"))
|
||||
fstype_c = ctypes.c_char_p(filesystemtype.encode("utf-8"))
|
||||
data_c = ctypes.c_char_p(data.encode("utf-8")) if data else None
|
||||
|
||||
# Call the mount system call
|
||||
result = libc.mount(
|
||||
source_c, target_c, fstype_c, ctypes.c_ulong(mountflags), data_c
|
||||
)
|
||||
|
||||
if result != 0:
|
||||
errno = ctypes.get_errno()
|
||||
raise OSError(errno, os.strerror(errno))
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def prepare_machine_root(machinename: str, root: Path) -> None:
|
||||
root.mkdir(parents=True, exist_ok=True)
|
||||
root.joinpath("etc").mkdir(parents=True, exist_ok=True)
|
||||
root.joinpath(".env").write_text(
|
||||
"\n".join(f"{k}={v}" for k, v in os.environ.items())
|
||||
)
|
||||
|
||||
|
||||
def pythonize_name(name: str) -> str:
|
||||
return re.sub(r"^[^A-z_]|[^A-z0-9_]", "_", name)
|
||||
|
||||
|
||||
def retry(fn: Callable, timeout: int = 900) -> None:
|
||||
"""Call the given function repeatedly, with 1 second intervals,
|
||||
until it returns True or a timeout is reached.
|
||||
"""
|
||||
|
||||
for _ in range(timeout):
|
||||
if fn(False):
|
||||
return
|
||||
time.sleep(1)
|
||||
|
||||
if not fn(True):
|
||||
msg = f"action timed out after {timeout} seconds"
|
||||
raise Error(msg)
|
||||
|
||||
|
||||
class Machine:
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
toplevel: Path,
|
||||
logger: AbstractLogger,
|
||||
rootdir: Path,
|
||||
out_dir: str,
|
||||
) -> None:
|
||||
self.name = name
|
||||
self.toplevel = toplevel
|
||||
self.out_dir = out_dir
|
||||
self.process: subprocess.Popen | None = None
|
||||
self.rootdir: Path = rootdir
|
||||
self.logger = logger
|
||||
|
||||
@cached_property
|
||||
def container_pid(self) -> int:
|
||||
return self.get_systemd_process()
|
||||
|
||||
def start(self) -> None:
|
||||
prepare_machine_root(self.name, self.rootdir)
|
||||
cmd = [
|
||||
"systemd-nspawn",
|
||||
"--keep-unit",
|
||||
"-M",
|
||||
self.name,
|
||||
"-D",
|
||||
self.rootdir,
|
||||
"--register=no",
|
||||
"--resolv-conf=off",
|
||||
f"--bind=/.containers/{self.name}/nix:/nix",
|
||||
"--bind=/proc:/run/host/proc",
|
||||
"--bind=/sys:/run/host/sys",
|
||||
"--private-network",
|
||||
"--network-bridge=br0",
|
||||
self.toplevel.joinpath("init"),
|
||||
]
|
||||
env = os.environ.copy()
|
||||
env["SYSTEMD_NSPAWN_UNIFIED_HIERARCHY"] = "1"
|
||||
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, env=env)
|
||||
|
||||
def get_systemd_process(self) -> int:
|
||||
assert self.process is not None, "Machine not started"
|
||||
assert self.process.stdout is not None, "Machine has no stdout"
|
||||
for line in self.process.stdout:
|
||||
print(line, end="")
|
||||
if (
|
||||
line.startswith("systemd[1]: Startup finished in")
|
||||
or "Welcome to NixOS" in line
|
||||
):
|
||||
break
|
||||
else:
|
||||
msg = f"Failed to start container {self.name}"
|
||||
raise RuntimeError(msg)
|
||||
childs = (
|
||||
Path(f"/proc/{self.process.pid}/task/{self.process.pid}/children")
|
||||
.read_text()
|
||||
.split()
|
||||
)
|
||||
assert len(childs) == 1, (
|
||||
f"Expected exactly one child process for systemd-nspawn, got {childs}"
|
||||
)
|
||||
try:
|
||||
return int(childs[0])
|
||||
except ValueError as e:
|
||||
msg = f"Failed to parse child process id {childs[0]}"
|
||||
raise RuntimeError(msg) from e
|
||||
|
||||
def get_unit_info(self, unit: str) -> dict[str, str]:
|
||||
proc = self.systemctl(f'--no-pager show "{unit}"')
|
||||
if proc.returncode != 0:
|
||||
msg = (
|
||||
f'retrieving systemctl info for unit "{unit}"'
|
||||
f" failed with exit code {proc.returncode}"
|
||||
)
|
||||
raise Error(msg)
|
||||
|
||||
line_pattern = re.compile(r"^([^=]+)=(.*)$")
|
||||
|
||||
def tuple_from_line(line: str) -> tuple[str, str]:
|
||||
match = line_pattern.match(line)
|
||||
assert match is not None
|
||||
return match[1], match[2]
|
||||
|
||||
return dict(
|
||||
tuple_from_line(line)
|
||||
for line in proc.stdout.split("\n")
|
||||
if line_pattern.match(line)
|
||||
)
|
||||
|
||||
def execute(
|
||||
self,
|
||||
command: str,
|
||||
check_return: bool = True,
|
||||
check_output: bool = True,
|
||||
timeout: int | None = 900,
|
||||
) -> subprocess.CompletedProcess:
|
||||
"""
|
||||
Execute a shell command, returning a list `(status, stdout)`.
|
||||
|
||||
Commands are run with `set -euo pipefail` set:
|
||||
|
||||
- If several commands are separated by `;` and one fails, the
|
||||
command as a whole will fail.
|
||||
|
||||
- For pipelines, the last non-zero exit status will be returned
|
||||
(if there is one; otherwise zero will be returned).
|
||||
|
||||
- Dereferencing unset variables fails the command.
|
||||
|
||||
- It will wait for stdout to be closed.
|
||||
|
||||
If the command detaches, it must close stdout, as `execute` will wait
|
||||
for this to consume all output reliably. This can be achieved by
|
||||
redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
|
||||
a file. Examples of detaching commands are `sleep 365d &`, where the
|
||||
shell forks a new process that can write to stdout and `xclip -i`, where
|
||||
the `xclip` command itself forks without closing stdout.
|
||||
|
||||
Takes an optional parameter `check_return` that defaults to `True`.
|
||||
Setting this parameter to `False` will not check for the return code
|
||||
and return -1 instead. This can be used for commands that shut down
|
||||
the VM and would therefore break the pipe that would be used for
|
||||
retrieving the return code.
|
||||
|
||||
A timeout for the command can be specified (in seconds) using the optional
|
||||
`timeout` parameter, e.g., `execute(cmd, timeout=10)` or
|
||||
`execute(cmd, timeout=None)`. The default is 900 seconds.
|
||||
"""
|
||||
|
||||
# Always run command with shell opts
|
||||
command = f"set -eo pipefail; source /etc/profile; set -u; {command}"
|
||||
|
||||
proc = subprocess.run(
|
||||
[
|
||||
"nsenter",
|
||||
"--target",
|
||||
str(self.container_pid),
|
||||
"--mount",
|
||||
"--uts",
|
||||
"--ipc",
|
||||
"--net",
|
||||
"--pid",
|
||||
"--cgroup",
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
command,
|
||||
],
|
||||
timeout=timeout,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
text=True,
|
||||
)
|
||||
return proc
|
||||
|
||||
def nested(
|
||||
self, msg: str, attrs: dict[str, str] | None = None
|
||||
) -> _GeneratorContextManager:
|
||||
if attrs is None:
|
||||
attrs = {}
|
||||
my_attrs = {"machine": self.name}
|
||||
my_attrs.update(attrs)
|
||||
return self.logger.nested(msg, my_attrs)
|
||||
|
||||
def systemctl(self, q: str) -> subprocess.CompletedProcess:
|
||||
"""
|
||||
Runs `systemctl` commands with optional support for
|
||||
`systemctl --user`
|
||||
|
||||
```py
|
||||
# run `systemctl list-jobs --no-pager`
|
||||
machine.systemctl("list-jobs --no-pager")
|
||||
|
||||
# spawn a shell for `any-user` and run
|
||||
# `systemctl --user list-jobs --no-pager`
|
||||
machine.systemctl("list-jobs --no-pager", "any-user")
|
||||
```
|
||||
"""
|
||||
return self.execute(f"systemctl {q}")
|
||||
|
||||
def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
|
||||
"""
|
||||
Repeat a shell command with 1-second intervals until it succeeds.
|
||||
Has a default timeout of 900 seconds which can be modified, e.g.
|
||||
`wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
|
||||
command execution.
|
||||
Throws an exception on timeout.
|
||||
"""
|
||||
output = ""
|
||||
|
||||
def check_success(_: Any) -> bool:
|
||||
nonlocal output
|
||||
result = self.execute(command, timeout=timeout)
|
||||
return result.returncode == 0
|
||||
|
||||
with self.nested(f"waiting for success: {command}"):
|
||||
retry(check_success, timeout)
|
||||
return output
|
||||
|
||||
def wait_for_unit(self, unit: str, timeout: int = 900) -> None:
|
||||
"""
|
||||
Wait for a systemd unit to get into "active" state.
|
||||
Throws exceptions on "failed" and "inactive" states as well as after
|
||||
timing out.
|
||||
"""
|
||||
|
||||
def check_active(_: bool) -> bool:
|
||||
info = self.get_unit_info(unit)
|
||||
state = info["ActiveState"]
|
||||
if state == "failed":
|
||||
proc = self.systemctl(f"--lines 0 status {unit}")
|
||||
journal = self.execute(f"journalctl -u {unit} --no-pager")
|
||||
msg = f'unit "{unit}" reached state "{state}":\n{proc.stdout}\n{journal.stdout}'
|
||||
raise Error(msg)
|
||||
|
||||
if state == "inactive":
|
||||
proc = self.systemctl("list-jobs --full 2>&1")
|
||||
if "No jobs" in proc.stdout:
|
||||
info = self.get_unit_info(unit)
|
||||
if info["ActiveState"] == state:
|
||||
msg = f'unit "{unit}" is inactive and there are no pending jobs'
|
||||
raise Error(msg)
|
||||
|
||||
return state == "active"
|
||||
|
||||
retry(check_active, timeout)
|
||||
|
||||
def succeed(self, command: str, timeout: int | None = None) -> str:
|
||||
res = self.execute(command, timeout=timeout)
|
||||
if res.returncode != 0:
|
||||
msg = f"Failed to run command {command}\n"
|
||||
msg += f"Exit code: {res.returncode}\n"
|
||||
msg += f"Stdout: {res.stdout}"
|
||||
raise RuntimeError(msg)
|
||||
return res.stdout
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""
|
||||
Shut down the machine, waiting for the VM to exit.
|
||||
"""
|
||||
if self.process:
|
||||
self.process.terminate()
|
||||
self.process.wait()
|
||||
self.process = None
|
||||
|
||||
def release(self) -> None:
|
||||
self.shutdown()
|
||||
|
||||
|
||||
@dataclass
|
||||
class ContainerInfo:
|
||||
toplevel: Path
|
||||
closure_info: Path
|
||||
|
||||
@cached_property
|
||||
def name(self) -> str:
|
||||
name_match = re.match(r".*-nixos-system-(.+)-(.+)", self.toplevel.name)
|
||||
if not name_match:
|
||||
msg = f"Unable to extract hostname from {self.toplevel.name}"
|
||||
raise Error(msg)
|
||||
return name_match.group(1)
|
||||
|
||||
@property
|
||||
def root_dir(self) -> Path:
|
||||
return Path(f"/.containers/{self.name}")
|
||||
|
||||
@property
|
||||
def nix_store_dir(self) -> Path:
|
||||
return self.root_dir / "nix" / "store"
|
||||
|
||||
@property
|
||||
def etc_dir(self) -> Path:
|
||||
return self.root_dir / "etc"
|
||||
|
||||
|
||||
def setup_filesystems(container: ContainerInfo) -> None:
|
||||
# We don't care about cleaning up the mount points, since we're running in a nix sandbox.
|
||||
Path("/run").mkdir(parents=True, exist_ok=True)
|
||||
subprocess.run(["mount", "-t", "tmpfs", "none", "/run"], check=True)
|
||||
subprocess.run(["mount", "-t", "cgroup2", "none", "/sys/fs/cgroup"], check=True)
|
||||
container.etc_dir.mkdir(parents=True)
|
||||
Path("/etc/os-release").touch()
|
||||
Path("/etc/machine-id").write_text("a5ea3f98dedc0278b6f3cc8c37eeaeac")
|
||||
container.nix_store_dir.mkdir(parents=True)
|
||||
# Read /proc/mounts and replicate every bind mount
|
||||
with Path("/proc/self/mounts").open() as f:
|
||||
for line in f:
|
||||
columns = line.split(" ")
|
||||
source = Path(columns[1])
|
||||
if source.parent != Path("/nix/store/"):
|
||||
continue
|
||||
target = container.nix_store_dir / source.name
|
||||
if source.is_dir():
|
||||
target.mkdir()
|
||||
else:
|
||||
target.touch()
|
||||
try:
|
||||
if "acl" in target.name:
|
||||
print(f"mount({source}, {target})")
|
||||
mount(source, target, "none", MS_BIND)
|
||||
except OSError as e:
|
||||
msg = f"mount({source}, {target}) failed"
|
||||
raise Error(msg) from e
|
||||
|
||||
|
||||
def load_nix_db(container: ContainerInfo) -> None:
|
||||
with (container.closure_info / "registration").open() as f:
|
||||
subprocess.run(
|
||||
["nix-store", "--load-db", "--store", str(container.root_dir)],
|
||||
stdin=f,
|
||||
check=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
|
||||
class Driver:
|
||||
logger: AbstractLogger
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
containers: list[ContainerInfo],
|
||||
logger: AbstractLogger,
|
||||
testscript: str,
|
||||
out_dir: str,
|
||||
) -> None:
|
||||
self.containers = containers
|
||||
self.testscript = testscript
|
||||
self.out_dir = out_dir
|
||||
self.logger = logger
|
||||
|
||||
self.tempdir = TemporaryDirectory()
|
||||
tempdir_path = Path(self.tempdir.name)
|
||||
|
||||
self.machines = []
|
||||
for container in containers:
|
||||
setup_filesystems(container)
|
||||
load_nix_db(container)
|
||||
self.machines.append(
|
||||
Machine(
|
||||
name=container.name,
|
||||
toplevel=container.toplevel,
|
||||
rootdir=tempdir_path / container.name,
|
||||
out_dir=self.out_dir,
|
||||
logger=self.logger,
|
||||
)
|
||||
)
|
||||
|
||||
def start_all(self) -> None:
|
||||
# child
|
||||
# create bridge
|
||||
subprocess.run(
|
||||
["ip", "link", "add", "br0", "type", "bridge"], check=True, text=True
|
||||
)
|
||||
subprocess.run(["ip", "link", "set", "br0", "up"], check=True, text=True)
|
||||
|
||||
for machine in self.machines:
|
||||
print(f"Starting {machine.name}")
|
||||
machine.start()
|
||||
|
||||
def test_symbols(self) -> dict[str, Any]:
|
||||
general_symbols = {
|
||||
"start_all": self.start_all,
|
||||
"machines": self.machines,
|
||||
"driver": self,
|
||||
"Machine": Machine, # for typing
|
||||
}
|
||||
machine_symbols = {pythonize_name(m.name): m for m in self.machines}
|
||||
# If there's exactly one machine, make it available under the name
|
||||
# "machine", even if it's not called that.
|
||||
if len(self.machines) == 1:
|
||||
(machine_symbols["machine"],) = self.machines
|
||||
print(
|
||||
"additionally exposed symbols:\n "
|
||||
+ ", ".join(m.name for m in self.machines)
|
||||
+ ",\n "
|
||||
+ ", ".join(list(general_symbols.keys()))
|
||||
)
|
||||
return {**general_symbols, **machine_symbols}
|
||||
|
||||
def test_script(self) -> None:
|
||||
"""Run the test script"""
|
||||
exec(self.testscript, self.test_symbols(), None)
|
||||
|
||||
def run_tests(self) -> None:
|
||||
"""Run the test script (for non-interactive test runs)"""
|
||||
self.test_script()
|
||||
|
||||
def __enter__(self) -> "Driver":
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc_value: BaseException | None,
|
||||
traceback: types.TracebackType | None,
|
||||
) -> None:
|
||||
for machine in self.machines:
|
||||
machine.release()
|
||||
|
||||
|
||||
def writeable_dir(arg: str) -> Path:
|
||||
"""Raises an ArgumentTypeError if the given argument isn't a writeable directory
|
||||
Note: We want to fail as early as possible if a directory isn't writeable,
|
||||
since an executed nixos-test could fail (very late) because of the test-driver
|
||||
writing in a directory without proper permissions.
|
||||
"""
|
||||
path = Path(arg)
|
||||
if not path.is_dir():
|
||||
msg = f"{path} is not a directory"
|
||||
raise argparse.ArgumentTypeError(msg)
|
||||
if not os.access(path, os.W_OK):
|
||||
msg = f"{path} is not a writeable directory"
|
||||
raise argparse.ArgumentTypeError(msg)
|
||||
return path
|
||||
|
||||
|
||||
def main() -> None:
|
||||
arg_parser = argparse.ArgumentParser(prog="nixos-test-driver")
|
||||
arg_parser.add_argument(
|
||||
"--containers",
|
||||
nargs=2,
|
||||
action="append",
|
||||
type=Path,
|
||||
metavar=("TOPLEVEL_STORE_DIR", "CLOSURE_INFO"),
|
||||
help="container system toplevel store dir and closure info",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"--test-script",
|
||||
help="the test script to run",
|
||||
type=Path,
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"-o",
|
||||
"--output-directory",
|
||||
default=Path.cwd(),
|
||||
help="the directory to bind to /run/test-results",
|
||||
type=writeable_dir,
|
||||
)
|
||||
args = arg_parser.parse_args()
|
||||
logger = CompositeLogger([TerminalLogger()])
|
||||
with Driver(
|
||||
containers=[
|
||||
ContainerInfo(toplevel, closure_info)
|
||||
for toplevel, closure_info in args.containers
|
||||
],
|
||||
testscript=args.test_script.read_text(),
|
||||
out_dir=args.output_directory.resolve(),
|
||||
logger=logger,
|
||||
) as driver:
|
||||
driver.run_tests()
|
||||
335
lib/test/container-test-driver/test_driver/logger.py
Normal file
335
lib/test/container-test-driver/test_driver/logger.py
Normal file
@@ -0,0 +1,335 @@
|
||||
import atexit
|
||||
import codecs
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import unicodedata
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Iterator
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from pathlib import Path
|
||||
from queue import Empty, Queue
|
||||
from typing import Any
|
||||
from xml.sax.saxutils import XMLGenerator
|
||||
from xml.sax.xmlreader import AttributesImpl
|
||||
|
||||
from colorama import Fore, Style
|
||||
from junit_xml import TestCase, TestSuite
|
||||
|
||||
|
||||
class AbstractLogger(ABC):
|
||||
@abstractmethod
|
||||
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
@contextmanager
|
||||
def subtest(
|
||||
self, name: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
@contextmanager
|
||||
def nested(
|
||||
self, message: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def log_serial(self, message: str, machine: str) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def print_serial_logs(self, enable: bool) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class JunitXMLLogger(AbstractLogger):
|
||||
class TestCaseState:
|
||||
def __init__(self) -> None:
|
||||
self.stdout = ""
|
||||
self.stderr = ""
|
||||
self.failure = False
|
||||
|
||||
def __init__(self, outfile: Path) -> None:
|
||||
self.tests: dict[str, JunitXMLLogger.TestCaseState] = {
|
||||
"main": self.TestCaseState()
|
||||
}
|
||||
self.currentSubtest = "main"
|
||||
self.outfile: Path = outfile
|
||||
self._print_serial_logs = True
|
||||
atexit.register(self.close)
|
||||
|
||||
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
|
||||
self.tests[self.currentSubtest].stdout += message + os.linesep
|
||||
|
||||
@contextmanager
|
||||
def subtest(
|
||||
self, name: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
old_test = self.currentSubtest
|
||||
self.tests.setdefault(name, self.TestCaseState())
|
||||
self.currentSubtest = name
|
||||
|
||||
yield
|
||||
|
||||
self.currentSubtest = old_test
|
||||
|
||||
@contextmanager
|
||||
def nested(
|
||||
self, message: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
self.log(message)
|
||||
yield
|
||||
|
||||
def info(self, *args: Any, **kwargs: Any) -> None:
|
||||
self.tests[self.currentSubtest].stdout += args[0] + os.linesep
|
||||
|
||||
def warning(self, *args: Any, **kwargs: Any) -> None:
|
||||
self.tests[self.currentSubtest].stdout += args[0] + os.linesep
|
||||
|
||||
def error(self, *args: Any, **kwargs: Any) -> None:
|
||||
self.tests[self.currentSubtest].stderr += args[0] + os.linesep
|
||||
self.tests[self.currentSubtest].failure = True
|
||||
|
||||
def log_serial(self, message: str, machine: str) -> None:
|
||||
if not self._print_serial_logs:
|
||||
return
|
||||
|
||||
self.log(f"{machine} # {message}")
|
||||
|
||||
def print_serial_logs(self, enable: bool) -> None:
|
||||
self._print_serial_logs = enable
|
||||
|
||||
def close(self) -> None:
|
||||
with Path.open(self.outfile, "w") as f:
|
||||
test_cases = []
|
||||
for name, test_case_state in self.tests.items():
|
||||
tc = TestCase(
|
||||
name,
|
||||
stdout=test_case_state.stdout,
|
||||
stderr=test_case_state.stderr,
|
||||
)
|
||||
if test_case_state.failure:
|
||||
tc.add_failure_info("test case failed")
|
||||
|
||||
test_cases.append(tc)
|
||||
ts = TestSuite("NixOS integration test", test_cases)
|
||||
f.write(TestSuite.to_xml_string([ts]))
|
||||
|
||||
|
||||
class CompositeLogger(AbstractLogger):
|
||||
def __init__(self, logger_list: list[AbstractLogger]) -> None:
|
||||
self.logger_list = logger_list
|
||||
|
||||
def add_logger(self, logger: AbstractLogger) -> None:
|
||||
self.logger_list.append(logger)
|
||||
|
||||
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
|
||||
for logger in self.logger_list:
|
||||
logger.log(message, attributes)
|
||||
|
||||
@contextmanager
|
||||
def subtest(
|
||||
self, name: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
with ExitStack() as stack:
|
||||
for logger in self.logger_list:
|
||||
stack.enter_context(logger.subtest(name, attributes))
|
||||
yield
|
||||
|
||||
@contextmanager
|
||||
def nested(
|
||||
self, message: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
with ExitStack() as stack:
|
||||
for logger in self.logger_list:
|
||||
stack.enter_context(logger.nested(message, attributes))
|
||||
yield
|
||||
|
||||
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
for logger in self.logger_list:
|
||||
logger.info(*args, **kwargs)
|
||||
|
||||
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
for logger in self.logger_list:
|
||||
logger.warning(*args, **kwargs)
|
||||
|
||||
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
for logger in self.logger_list:
|
||||
logger.error(*args, **kwargs)
|
||||
sys.exit(1)
|
||||
|
||||
def print_serial_logs(self, enable: bool) -> None:
|
||||
for logger in self.logger_list:
|
||||
logger.print_serial_logs(enable)
|
||||
|
||||
def log_serial(self, message: str, machine: str) -> None:
|
||||
for logger in self.logger_list:
|
||||
logger.log_serial(message, machine)
|
||||
|
||||
|
||||
class TerminalLogger(AbstractLogger):
|
||||
def __init__(self) -> None:
|
||||
self._print_serial_logs = True
|
||||
|
||||
def maybe_prefix(self, message: str, attributes: dict[str, str] | None) -> str:
|
||||
if attributes and "machine" in attributes:
|
||||
return f"{attributes['machine']}: {message}"
|
||||
return message
|
||||
|
||||
@staticmethod
|
||||
def _eprint(*args: object, **kwargs: Any) -> None:
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
|
||||
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
|
||||
self._eprint(self.maybe_prefix(message, attributes))
|
||||
|
||||
@contextmanager
|
||||
def subtest(
|
||||
self, name: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
with self.nested("subtest: " + name, attributes):
|
||||
yield
|
||||
|
||||
@contextmanager
|
||||
def nested(
|
||||
self, message: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
self._eprint(
|
||||
self.maybe_prefix(
|
||||
Style.BRIGHT + Fore.GREEN + message + Style.RESET_ALL, attributes
|
||||
)
|
||||
)
|
||||
|
||||
tic = time.time()
|
||||
yield
|
||||
toc = time.time()
|
||||
self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
|
||||
|
||||
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
self.log(*args, **kwargs)
|
||||
|
||||
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
self.log(*args, **kwargs)
|
||||
|
||||
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
self.log(*args, **kwargs)
|
||||
|
||||
def print_serial_logs(self, enable: bool) -> None:
|
||||
self._print_serial_logs = enable
|
||||
|
||||
def log_serial(self, message: str, machine: str) -> None:
|
||||
if not self._print_serial_logs:
|
||||
return
|
||||
|
||||
self._eprint(Style.DIM + f"{machine} # {message}" + Style.RESET_ALL)
|
||||
|
||||
|
||||
class XMLLogger(AbstractLogger):
|
||||
def __init__(self, outfile: str) -> None:
|
||||
self.logfile_handle = codecs.open(outfile, "wb") # noqa: SIM115
|
||||
self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
|
||||
self.queue: Queue[dict[str, str]] = Queue()
|
||||
|
||||
self._print_serial_logs = True
|
||||
|
||||
self.xml.startDocument()
|
||||
self.xml.startElement("logfile", attrs=AttributesImpl({}))
|
||||
|
||||
def close(self) -> None:
|
||||
self.xml.endElement("logfile")
|
||||
self.xml.endDocument()
|
||||
self.logfile_handle.close()
|
||||
|
||||
def sanitise(self, message: str) -> str:
|
||||
return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
|
||||
|
||||
def maybe_prefix(
|
||||
self, message: str, attributes: dict[str, str] | None = None
|
||||
) -> str:
|
||||
if attributes and "machine" in attributes:
|
||||
return f"{attributes['machine']}: {message}"
|
||||
return message
|
||||
|
||||
def log_line(self, message: str, attributes: dict[str, str]) -> None:
|
||||
self.xml.startElement("line", attrs=AttributesImpl(attributes))
|
||||
self.xml.characters(message)
|
||||
self.xml.endElement("line")
|
||||
|
||||
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
self.log(*args, **kwargs)
|
||||
|
||||
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
self.log(*args, **kwargs)
|
||||
|
||||
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
|
||||
self.log(*args, **kwargs)
|
||||
|
||||
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
|
||||
if attributes is None:
|
||||
attributes = {}
|
||||
self.drain_log_queue()
|
||||
self.log_line(message, attributes)
|
||||
|
||||
def print_serial_logs(self, enable: bool) -> None:
|
||||
self._print_serial_logs = enable
|
||||
|
||||
def log_serial(self, message: str, machine: str) -> None:
|
||||
if not self._print_serial_logs:
|
||||
return
|
||||
|
||||
self.enqueue({"msg": message, "machine": machine, "type": "serial"})
|
||||
|
||||
def enqueue(self, item: dict[str, str]) -> None:
|
||||
self.queue.put(item)
|
||||
|
||||
def drain_log_queue(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
item = self.queue.get_nowait()
|
||||
msg = self.sanitise(item["msg"])
|
||||
del item["msg"]
|
||||
self.log_line(msg, item)
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
def subtest(
|
||||
self, name: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
with self.nested("subtest: " + name, attributes):
|
||||
yield
|
||||
|
||||
@contextmanager
|
||||
def nested(
|
||||
self, message: str, attributes: dict[str, str] | None = None
|
||||
) -> Iterator[None]:
|
||||
if attributes is None:
|
||||
attributes = {}
|
||||
self.xml.startElement("nest", attrs=AttributesImpl({}))
|
||||
self.xml.startElement("head", attrs=AttributesImpl(attributes))
|
||||
self.xml.characters(message)
|
||||
self.xml.endElement("head")
|
||||
|
||||
tic = time.time()
|
||||
self.drain_log_queue()
|
||||
yield
|
||||
self.drain_log_queue()
|
||||
toc = time.time()
|
||||
self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
|
||||
|
||||
self.xml.endElement("nest")
|
||||
0
lib/test/container-test-driver/test_driver/py.typed
Normal file
0
lib/test/container-test-driver/test_driver/py.typed
Normal file
@@ -4,6 +4,7 @@ let
|
||||
mkOption
|
||||
types
|
||||
;
|
||||
|
||||
in
|
||||
{
|
||||
flakeModules = clanLib.callLib ./flakeModules.nix { };
|
||||
@@ -17,6 +18,7 @@ in
|
||||
nixosTest,
|
||||
pkgs,
|
||||
self,
|
||||
useContainers ? true,
|
||||
...
|
||||
}:
|
||||
let
|
||||
@@ -28,7 +30,9 @@ in
|
||||
clanFlakeResult = config.clan;
|
||||
in
|
||||
{
|
||||
imports = [ nixosTest ];
|
||||
imports = [
|
||||
nixosTest
|
||||
] ++ lib.optionals (useContainers) [ ./container-test-driver/driver-module.nix ];
|
||||
options = {
|
||||
clanSettings = mkOption {
|
||||
default = { };
|
||||
|
||||
Reference in New Issue
Block a user