inventory tests: use containers by default

This commit is contained in:
DavHau
2025-04-23 18:55:59 +07:00
parent 502563a96a
commit 8b00edad4a
13 changed files with 44 additions and 39 deletions

View File

@@ -0,0 +1,121 @@
{
hostPkgs,
lib,
config,
...
}:
let
testDriver = hostPkgs.python3.pkgs.callPackage ./package.nix {
inherit (config) extraPythonPackages;
inherit (hostPkgs.pkgs) util-linux systemd nix;
};
containers =
testScript:
map (m: [
m.system.build.toplevel
(hostPkgs.closureInfo {
rootPaths = [
m.system.build.toplevel
(hostPkgs.writeText "testScript" testScript)
];
})
]) (lib.attrValues config.nodes);
pythonizeName =
name:
let
head = lib.substring 0 1 name;
tail = lib.substring 1 (-1) name;
in
(if builtins.match "[A-z_]" head == null then "_" else head)
+ lib.stringAsChars (c: if builtins.match "[A-z0-9_]" c == null then "_" else c) tail;
nodeHostNames =
let
nodesList = map (c: c.system.name) (lib.attrValues config.nodes);
in
nodesList ++ lib.optional (lib.length nodesList == 1 && !lib.elem "machine" nodesList) "machine";
machineNames = map (name: "${name}: Machine;") pythonizedNames;
pythonizedNames = map pythonizeName nodeHostNames;
in
{
defaults.imports = [
./nixos-module.nix
];
driver = lib.mkForce (
hostPkgs.runCommand "nixos-test-driver-${config.name}"
{
nativeBuildInputs = [
hostPkgs.makeWrapper
] ++ lib.optionals (!config.skipTypeCheck) [ hostPkgs.mypy ];
buildInputs = [ testDriver ];
testScript = config.testScriptString;
preferLocalBuild = true;
passthru = config.passthru;
meta = config.meta // {
mainProgram = "nixos-test-driver";
};
}
''
mkdir -p $out/bin
${lib.optionalString (!config.skipTypeCheck) ''
# prepend type hints so the test script can be type checked with mypy
cat "${./test-script-prepend.py}" >> testScriptWithTypes
echo "${builtins.toString machineNames}" >> testScriptWithTypes
echo -n "$testScript" >> testScriptWithTypes
echo "Running type check (enable/disable: config.skipTypeCheck)"
echo "See https://nixos.org/manual/nixos/stable/#test-opt-skipTypeCheck"
mypy --no-implicit-optional \
--pretty \
--no-color-output \
testScriptWithTypes
''}
echo -n "$testScript" >> $out/test-script
ln -s ${testDriver}/bin/nixos-test-driver $out/bin/nixos-test-driver
wrapProgram $out/bin/nixos-test-driver \
${
lib.concatStringsSep " " (
map (container: "--add-flags '--container ${builtins.toString container}'") (
containers config.testScriptString
)
)
} \
--add-flags "--test-script '$out/test-script'"
''
);
test = lib.mkForce (
lib.lazyDerivation {
# lazyDerivation improves performance when only passthru items and/or meta are used.
derivation = hostPkgs.stdenv.mkDerivation {
name = "vm-test-run-${config.name}";
requiredSystemFeatures = [ "uid-range" ];
buildCommand = ''
mkdir -p $out
# effectively mute the XMLLogger
export LOGFILE=/dev/null
${config.driver}/bin/nixos-test-driver -o $out
'';
nativeBuildInputs = [
hostPkgs.util-linux
hostPkgs.coreutils
hostPkgs.iproute2
];
passthru = config.passthru;
meta = config.meta;
};
inherit (config) passthru meta;
}
);
}

View File

@@ -0,0 +1,26 @@
{ pkgs, lib, ... }:
{
boot.isContainer = true;
# needed since nixpkgs 7fb2f407c01b017737eafc26b065d7f56434a992 removed the getty unit by default
console.enable = true;
# undo qemu stuff
system.build.initialRamdisk = "";
virtualisation.sharedDirectories = lib.mkForce { };
networking.useDHCP = false;
# We use networkd to assign static ip addresses
networking.useNetworkd = true;
services.resolved.enable = false;
# Rename the host0 interface to eth0 to match what we expect in VM tests.
system.activationScripts.renameInterface = ''
${pkgs.iproute2}/bin/ip link set dev host0 name eth1
'';
systemd.services.backdoor.enable = false;
# we don't have permission to set cpu scheduler in our container
systemd.services.nix-daemon.serviceConfig.CPUSchedulingPolicy = lib.mkForce "";
}

View File

@@ -0,0 +1,40 @@
{
extraPythonPackages ? (_ps: [ ]),
python3Packages,
python3,
buildPythonApplication,
setuptools,
util-linux,
systemd,
nix,
colorama,
junit-xml,
mkShell,
}:
let
package = buildPythonApplication {
pname = "test-driver";
version = "0.0.1";
propagatedBuildInputs = [
util-linux
systemd
colorama
junit-xml
nix
] ++ extraPythonPackages python3Packages;
nativeBuildInputs = [ setuptools ];
format = "pyproject";
src = ./.;
passthru.devShell = mkShell {
packages = [
(python3.withPackages (_ps: package.propagatedBuildInputs))
package.propagatedBuildInputs
python3.pkgs.pytest
];
shellHook = ''
export PYTHONPATH="$(realpath .):$PYTHONPATH"
'';
};
};
in
package

View File

@@ -0,0 +1,22 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "nixos-test-driver"
version = "0.0.0"
[project.scripts]
nixos-test-driver = "test_driver:main"
[tool.setuptools.packages]
find = {}
[tool.setuptools.package-data]
test_driver = ["py.typed"]
[tool.mypy]
python_version = "3.12"
warn_redundant_casts = true
disallow_untyped_calls = true
disallow_untyped_defs = true
no_implicit_optional = true

View File

@@ -0,0 +1,9 @@
# This file contains type hints that can be prepended to Nix test scripts so they can be type
# checked.
from collections.abc import Callable
from test_driver import Machine
start_all: Callable[[], None]
machines: list[Machine]

View File

@@ -0,0 +1,549 @@
import argparse
import ctypes
import os
import re
import subprocess
import time
import types
from collections.abc import Callable
from contextlib import _GeneratorContextManager
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any
from .logger import AbstractLogger, CompositeLogger, TerminalLogger
# Load the C library
libc = ctypes.CDLL("libc.so.6", use_errno=True)
# Define the mount function
libc.mount.argtypes = [
ctypes.c_char_p, # source
ctypes.c_char_p, # target
ctypes.c_char_p, # filesystemtype
ctypes.c_ulong, # mountflags
ctypes.c_void_p, # data
]
libc.mount.restype = ctypes.c_int
MS_BIND = 0x1000
MS_REC = 0x4000
def mount(
source: Path,
target: Path,
filesystemtype: str,
mountflags: int = 0,
data: str | None = None,
) -> None:
"""
A Python wrapper for the mount system call.
:param source: The source of the file system (e.g., device name, remote filesystem).
:param target: The mount point (an existing directory).
:param filesystemtype: The filesystem type (e.g., "ext4", "nfs").
:param mountflags: Mount options flags.
:param data: File system-specific data (e.g., options like "rw").
:raises OSError: If the mount system call fails.
"""
# Convert Python strings to C-compatible strings
source_c = ctypes.c_char_p(str(source).encode("utf-8"))
target_c = ctypes.c_char_p(str(target).encode("utf-8"))
fstype_c = ctypes.c_char_p(filesystemtype.encode("utf-8"))
data_c = ctypes.c_char_p(data.encode("utf-8")) if data else None
# Call the mount system call
result = libc.mount(
source_c, target_c, fstype_c, ctypes.c_ulong(mountflags), data_c
)
if result != 0:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
class Error(Exception):
pass
def prepare_machine_root(machinename: str, root: Path) -> None:
root.mkdir(parents=True, exist_ok=True)
root.joinpath("etc").mkdir(parents=True, exist_ok=True)
root.joinpath(".env").write_text(
"\n".join(f"{k}={v}" for k, v in os.environ.items())
)
def pythonize_name(name: str) -> str:
return re.sub(r"^[^A-z_]|[^A-z0-9_]", "_", name)
def retry(fn: Callable, timeout: int = 900) -> None:
"""Call the given function repeatedly, with 1 second intervals,
until it returns True or a timeout is reached.
"""
for _ in range(timeout):
if fn(False):
return
time.sleep(1)
if not fn(True):
msg = f"action timed out after {timeout} seconds"
raise Error(msg)
class Machine:
def __init__(
self,
name: str,
toplevel: Path,
logger: AbstractLogger,
rootdir: Path,
out_dir: str,
) -> None:
self.name = name
self.toplevel = toplevel
self.out_dir = out_dir
self.process: subprocess.Popen | None = None
self.rootdir: Path = rootdir
self.logger = logger
@cached_property
def container_pid(self) -> int:
return self.get_systemd_process()
def start(self) -> None:
prepare_machine_root(self.name, self.rootdir)
cmd = [
"systemd-nspawn",
"--keep-unit",
"-M",
self.name,
"-D",
self.rootdir,
"--register=no",
"--resolv-conf=off",
f"--bind=/.containers/{self.name}/nix:/nix",
"--bind=/proc:/run/host/proc",
"--bind=/sys:/run/host/sys",
"--private-network",
"--network-bridge=br0",
self.toplevel.joinpath("init"),
]
env = os.environ.copy()
env["SYSTEMD_NSPAWN_UNIFIED_HIERARCHY"] = "1"
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, env=env)
def get_systemd_process(self) -> int:
assert self.process is not None, "Machine not started"
assert self.process.stdout is not None, "Machine has no stdout"
for line in self.process.stdout:
print(line, end="")
if (
line.startswith("systemd[1]: Startup finished in")
or "Welcome to NixOS" in line
):
break
else:
msg = f"Failed to start container {self.name}"
raise RuntimeError(msg)
childs = (
Path(f"/proc/{self.process.pid}/task/{self.process.pid}/children")
.read_text()
.split()
)
assert len(childs) == 1, (
f"Expected exactly one child process for systemd-nspawn, got {childs}"
)
try:
return int(childs[0])
except ValueError as e:
msg = f"Failed to parse child process id {childs[0]}"
raise RuntimeError(msg) from e
def get_unit_info(self, unit: str) -> dict[str, str]:
proc = self.systemctl(f'--no-pager show "{unit}"')
if proc.returncode != 0:
msg = (
f'retrieving systemctl info for unit "{unit}"'
f" failed with exit code {proc.returncode}"
)
raise Error(msg)
line_pattern = re.compile(r"^([^=]+)=(.*)$")
def tuple_from_line(line: str) -> tuple[str, str]:
match = line_pattern.match(line)
assert match is not None
return match[1], match[2]
return dict(
tuple_from_line(line)
for line in proc.stdout.split("\n")
if line_pattern.match(line)
)
def execute(
self,
command: str,
check_return: bool = True,
check_output: bool = True,
timeout: int | None = 900,
) -> subprocess.CompletedProcess:
"""
Execute a shell command, returning a list `(status, stdout)`.
Commands are run with `set -euo pipefail` set:
- If several commands are separated by `;` and one fails, the
command as a whole will fail.
- For pipelines, the last non-zero exit status will be returned
(if there is one; otherwise zero will be returned).
- Dereferencing unset variables fails the command.
- It will wait for stdout to be closed.
If the command detaches, it must close stdout, as `execute` will wait
for this to consume all output reliably. This can be achieved by
redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or
a file. Examples of detaching commands are `sleep 365d &`, where the
shell forks a new process that can write to stdout and `xclip -i`, where
the `xclip` command itself forks without closing stdout.
Takes an optional parameter `check_return` that defaults to `True`.
Setting this parameter to `False` will not check for the return code
and return -1 instead. This can be used for commands that shut down
the VM and would therefore break the pipe that would be used for
retrieving the return code.
A timeout for the command can be specified (in seconds) using the optional
`timeout` parameter, e.g., `execute(cmd, timeout=10)` or
`execute(cmd, timeout=None)`. The default is 900 seconds.
"""
# Always run command with shell opts
command = f"set -eo pipefail; source /etc/profile; set -u; {command}"
proc = subprocess.run(
[
"nsenter",
"--target",
str(self.container_pid),
"--mount",
"--uts",
"--ipc",
"--net",
"--pid",
"--cgroup",
"/bin/sh",
"-c",
command,
],
timeout=timeout,
check=False,
stdout=subprocess.PIPE,
text=True,
)
return proc
def nested(
self, msg: str, attrs: dict[str, str] | None = None
) -> _GeneratorContextManager:
if attrs is None:
attrs = {}
my_attrs = {"machine": self.name}
my_attrs.update(attrs)
return self.logger.nested(msg, my_attrs)
def systemctl(self, q: str) -> subprocess.CompletedProcess:
"""
Runs `systemctl` commands with optional support for
`systemctl --user`
```py
# run `systemctl list-jobs --no-pager`
machine.systemctl("list-jobs --no-pager")
# spawn a shell for `any-user` and run
# `systemctl --user list-jobs --no-pager`
machine.systemctl("list-jobs --no-pager", "any-user")
```
"""
return self.execute(f"systemctl {q}")
def wait_until_succeeds(self, command: str, timeout: int = 900) -> str:
"""
Repeat a shell command with 1-second intervals until it succeeds.
Has a default timeout of 900 seconds which can be modified, e.g.
`wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on
command execution.
Throws an exception on timeout.
"""
output = ""
def check_success(_: Any) -> bool:
nonlocal output
result = self.execute(command, timeout=timeout)
return result.returncode == 0
with self.nested(f"waiting for success: {command}"):
retry(check_success, timeout)
return output
def wait_for_unit(self, unit: str, timeout: int = 900) -> None:
"""
Wait for a systemd unit to get into "active" state.
Throws exceptions on "failed" and "inactive" states as well as after
timing out.
"""
def check_active(_: bool) -> bool:
info = self.get_unit_info(unit)
state = info["ActiveState"]
if state == "failed":
proc = self.systemctl(f"--lines 0 status {unit}")
journal = self.execute(f"journalctl -u {unit} --no-pager")
msg = f'unit "{unit}" reached state "{state}":\n{proc.stdout}\n{journal.stdout}'
raise Error(msg)
if state == "inactive":
proc = self.systemctl("list-jobs --full 2>&1")
if "No jobs" in proc.stdout:
info = self.get_unit_info(unit)
if info["ActiveState"] == state:
msg = f'unit "{unit}" is inactive and there are no pending jobs'
raise Error(msg)
return state == "active"
retry(check_active, timeout)
def succeed(self, command: str, timeout: int | None = None) -> str:
res = self.execute(command, timeout=timeout)
if res.returncode != 0:
msg = f"Failed to run command {command}\n"
msg += f"Exit code: {res.returncode}\n"
msg += f"Stdout: {res.stdout}"
raise RuntimeError(msg)
return res.stdout
def shutdown(self) -> None:
"""
Shut down the machine, waiting for the VM to exit.
"""
if self.process:
self.process.terminate()
self.process.wait()
self.process = None
def release(self) -> None:
self.shutdown()
@dataclass
class ContainerInfo:
toplevel: Path
closure_info: Path
@cached_property
def name(self) -> str:
name_match = re.match(r".*-nixos-system-(.+)-(.+)", self.toplevel.name)
if not name_match:
msg = f"Unable to extract hostname from {self.toplevel.name}"
raise Error(msg)
return name_match.group(1)
@property
def root_dir(self) -> Path:
return Path(f"/.containers/{self.name}")
@property
def nix_store_dir(self) -> Path:
return self.root_dir / "nix" / "store"
@property
def etc_dir(self) -> Path:
return self.root_dir / "etc"
def setup_filesystems(container: ContainerInfo) -> None:
# We don't care about cleaning up the mount points, since we're running in a nix sandbox.
Path("/run").mkdir(parents=True, exist_ok=True)
subprocess.run(["mount", "-t", "tmpfs", "none", "/run"], check=True)
subprocess.run(["mount", "-t", "cgroup2", "none", "/sys/fs/cgroup"], check=True)
container.etc_dir.mkdir(parents=True)
Path("/etc/os-release").touch()
Path("/etc/machine-id").write_text("a5ea3f98dedc0278b6f3cc8c37eeaeac")
container.nix_store_dir.mkdir(parents=True)
# Read /proc/mounts and replicate every bind mount
with Path("/proc/self/mounts").open() as f:
for line in f:
columns = line.split(" ")
source = Path(columns[1])
if source.parent != Path("/nix/store/"):
continue
target = container.nix_store_dir / source.name
if source.is_dir():
target.mkdir()
else:
target.touch()
try:
if "acl" in target.name:
print(f"mount({source}, {target})")
mount(source, target, "none", MS_BIND)
except OSError as e:
msg = f"mount({source}, {target}) failed"
raise Error(msg) from e
def load_nix_db(container: ContainerInfo) -> None:
with (container.closure_info / "registration").open() as f:
subprocess.run(
["nix-store", "--load-db", "--store", str(container.root_dir)],
stdin=f,
check=True,
text=True,
)
class Driver:
logger: AbstractLogger
def __init__(
self,
containers: list[ContainerInfo],
logger: AbstractLogger,
testscript: str,
out_dir: str,
) -> None:
self.containers = containers
self.testscript = testscript
self.out_dir = out_dir
self.logger = logger
self.tempdir = TemporaryDirectory()
tempdir_path = Path(self.tempdir.name)
self.machines = []
for container in containers:
setup_filesystems(container)
load_nix_db(container)
self.machines.append(
Machine(
name=container.name,
toplevel=container.toplevel,
rootdir=tempdir_path / container.name,
out_dir=self.out_dir,
logger=self.logger,
)
)
def start_all(self) -> None:
# child
# create bridge
subprocess.run(
["ip", "link", "add", "br0", "type", "bridge"], check=True, text=True
)
subprocess.run(["ip", "link", "set", "br0", "up"], check=True, text=True)
for machine in self.machines:
print(f"Starting {machine.name}")
machine.start()
def test_symbols(self) -> dict[str, Any]:
general_symbols = {
"start_all": self.start_all,
"machines": self.machines,
"driver": self,
"Machine": Machine, # for typing
}
machine_symbols = {pythonize_name(m.name): m for m in self.machines}
# If there's exactly one machine, make it available under the name
# "machine", even if it's not called that.
if len(self.machines) == 1:
(machine_symbols["machine"],) = self.machines
print(
"additionally exposed symbols:\n "
+ ", ".join(m.name for m in self.machines)
+ ",\n "
+ ", ".join(list(general_symbols.keys()))
)
return {**general_symbols, **machine_symbols}
def test_script(self) -> None:
"""Run the test script"""
exec(self.testscript, self.test_symbols(), None)
def run_tests(self) -> None:
"""Run the test script (for non-interactive test runs)"""
self.test_script()
def __enter__(self) -> "Driver":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: types.TracebackType | None,
) -> None:
for machine in self.machines:
machine.release()
def writeable_dir(arg: str) -> Path:
"""Raises an ArgumentTypeError if the given argument isn't a writeable directory
Note: We want to fail as early as possible if a directory isn't writeable,
since an executed nixos-test could fail (very late) because of the test-driver
writing in a directory without proper permissions.
"""
path = Path(arg)
if not path.is_dir():
msg = f"{path} is not a directory"
raise argparse.ArgumentTypeError(msg)
if not os.access(path, os.W_OK):
msg = f"{path} is not a writeable directory"
raise argparse.ArgumentTypeError(msg)
return path
def main() -> None:
arg_parser = argparse.ArgumentParser(prog="nixos-test-driver")
arg_parser.add_argument(
"--containers",
nargs=2,
action="append",
type=Path,
metavar=("TOPLEVEL_STORE_DIR", "CLOSURE_INFO"),
help="container system toplevel store dir and closure info",
)
arg_parser.add_argument(
"--test-script",
help="the test script to run",
type=Path,
)
arg_parser.add_argument(
"-o",
"--output-directory",
default=Path.cwd(),
help="the directory to bind to /run/test-results",
type=writeable_dir,
)
args = arg_parser.parse_args()
logger = CompositeLogger([TerminalLogger()])
with Driver(
containers=[
ContainerInfo(toplevel, closure_info)
for toplevel, closure_info in args.containers
],
testscript=args.test_script.read_text(),
out_dir=args.output_directory.resolve(),
logger=logger,
) as driver:
driver.run_tests()

View File

@@ -0,0 +1,335 @@
import atexit
import codecs
import os
import sys
import time
import unicodedata
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import ExitStack, contextmanager
from pathlib import Path
from queue import Empty, Queue
from typing import Any
from xml.sax.saxutils import XMLGenerator
from xml.sax.xmlreader import AttributesImpl
from colorama import Fore, Style
from junit_xml import TestCase, TestSuite
class AbstractLogger(ABC):
@abstractmethod
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
pass
@abstractmethod
@contextmanager
def subtest(
self, name: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
pass
@abstractmethod
@contextmanager
def nested(
self, message: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
pass
@abstractmethod
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
pass
@abstractmethod
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
pass
@abstractmethod
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
pass
@abstractmethod
def log_serial(self, message: str, machine: str) -> None:
pass
@abstractmethod
def print_serial_logs(self, enable: bool) -> None:
pass
class JunitXMLLogger(AbstractLogger):
class TestCaseState:
def __init__(self) -> None:
self.stdout = ""
self.stderr = ""
self.failure = False
def __init__(self, outfile: Path) -> None:
self.tests: dict[str, JunitXMLLogger.TestCaseState] = {
"main": self.TestCaseState()
}
self.currentSubtest = "main"
self.outfile: Path = outfile
self._print_serial_logs = True
atexit.register(self.close)
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
self.tests[self.currentSubtest].stdout += message + os.linesep
@contextmanager
def subtest(
self, name: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
old_test = self.currentSubtest
self.tests.setdefault(name, self.TestCaseState())
self.currentSubtest = name
yield
self.currentSubtest = old_test
@contextmanager
def nested(
self, message: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
self.log(message)
yield
def info(self, *args: Any, **kwargs: Any) -> None:
self.tests[self.currentSubtest].stdout += args[0] + os.linesep
def warning(self, *args: Any, **kwargs: Any) -> None:
self.tests[self.currentSubtest].stdout += args[0] + os.linesep
def error(self, *args: Any, **kwargs: Any) -> None:
self.tests[self.currentSubtest].stderr += args[0] + os.linesep
self.tests[self.currentSubtest].failure = True
def log_serial(self, message: str, machine: str) -> None:
if not self._print_serial_logs:
return
self.log(f"{machine} # {message}")
def print_serial_logs(self, enable: bool) -> None:
self._print_serial_logs = enable
def close(self) -> None:
with Path.open(self.outfile, "w") as f:
test_cases = []
for name, test_case_state in self.tests.items():
tc = TestCase(
name,
stdout=test_case_state.stdout,
stderr=test_case_state.stderr,
)
if test_case_state.failure:
tc.add_failure_info("test case failed")
test_cases.append(tc)
ts = TestSuite("NixOS integration test", test_cases)
f.write(TestSuite.to_xml_string([ts]))
class CompositeLogger(AbstractLogger):
def __init__(self, logger_list: list[AbstractLogger]) -> None:
self.logger_list = logger_list
def add_logger(self, logger: AbstractLogger) -> None:
self.logger_list.append(logger)
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
for logger in self.logger_list:
logger.log(message, attributes)
@contextmanager
def subtest(
self, name: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
with ExitStack() as stack:
for logger in self.logger_list:
stack.enter_context(logger.subtest(name, attributes))
yield
@contextmanager
def nested(
self, message: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
with ExitStack() as stack:
for logger in self.logger_list:
stack.enter_context(logger.nested(message, attributes))
yield
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
for logger in self.logger_list:
logger.info(*args, **kwargs)
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
for logger in self.logger_list:
logger.warning(*args, **kwargs)
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
for logger in self.logger_list:
logger.error(*args, **kwargs)
sys.exit(1)
def print_serial_logs(self, enable: bool) -> None:
for logger in self.logger_list:
logger.print_serial_logs(enable)
def log_serial(self, message: str, machine: str) -> None:
for logger in self.logger_list:
logger.log_serial(message, machine)
class TerminalLogger(AbstractLogger):
def __init__(self) -> None:
self._print_serial_logs = True
def maybe_prefix(self, message: str, attributes: dict[str, str] | None) -> str:
if attributes and "machine" in attributes:
return f"{attributes['machine']}: {message}"
return message
@staticmethod
def _eprint(*args: object, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
self._eprint(self.maybe_prefix(message, attributes))
@contextmanager
def subtest(
self, name: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
with self.nested("subtest: " + name, attributes):
yield
@contextmanager
def nested(
self, message: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
self._eprint(
self.maybe_prefix(
Style.BRIGHT + Fore.GREEN + message + Style.RESET_ALL, attributes
)
)
tic = time.time()
yield
toc = time.time()
self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self.log(*args, **kwargs)
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self.log(*args, **kwargs)
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self.log(*args, **kwargs)
def print_serial_logs(self, enable: bool) -> None:
self._print_serial_logs = enable
def log_serial(self, message: str, machine: str) -> None:
if not self._print_serial_logs:
return
self._eprint(Style.DIM + f"{machine} # {message}" + Style.RESET_ALL)
class XMLLogger(AbstractLogger):
def __init__(self, outfile: str) -> None:
self.logfile_handle = codecs.open(outfile, "wb") # noqa: SIM115
self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
self.queue: Queue[dict[str, str]] = Queue()
self._print_serial_logs = True
self.xml.startDocument()
self.xml.startElement("logfile", attrs=AttributesImpl({}))
def close(self) -> None:
self.xml.endElement("logfile")
self.xml.endDocument()
self.logfile_handle.close()
def sanitise(self, message: str) -> str:
return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
def maybe_prefix(
self, message: str, attributes: dict[str, str] | None = None
) -> str:
if attributes and "machine" in attributes:
return f"{attributes['machine']}: {message}"
return message
def log_line(self, message: str, attributes: dict[str, str]) -> None:
self.xml.startElement("line", attrs=AttributesImpl(attributes))
self.xml.characters(message)
self.xml.endElement("line")
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self.log(*args, **kwargs)
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self.log(*args, **kwargs)
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
self.log(*args, **kwargs)
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:
if attributes is None:
attributes = {}
self.drain_log_queue()
self.log_line(message, attributes)
def print_serial_logs(self, enable: bool) -> None:
self._print_serial_logs = enable
def log_serial(self, message: str, machine: str) -> None:
if not self._print_serial_logs:
return
self.enqueue({"msg": message, "machine": machine, "type": "serial"})
def enqueue(self, item: dict[str, str]) -> None:
self.queue.put(item)
def drain_log_queue(self) -> None:
try:
while True:
item = self.queue.get_nowait()
msg = self.sanitise(item["msg"])
del item["msg"]
self.log_line(msg, item)
except Empty:
pass
@contextmanager
def subtest(
self, name: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
with self.nested("subtest: " + name, attributes):
yield
@contextmanager
def nested(
self, message: str, attributes: dict[str, str] | None = None
) -> Iterator[None]:
if attributes is None:
attributes = {}
self.xml.startElement("nest", attrs=AttributesImpl({}))
self.xml.startElement("head", attrs=AttributesImpl(attributes))
self.xml.characters(message)
self.xml.endElement("head")
tic = time.time()
self.drain_log_queue()
yield
self.drain_log_queue()
toc = time.time()
self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
self.xml.endElement("nest")

View File

@@ -4,6 +4,7 @@ let
mkOption
types
;
in
{
flakeModules = clanLib.callLib ./flakeModules.nix { };
@@ -17,6 +18,7 @@ in
nixosTest,
pkgs,
self,
useContainers ? true,
...
}:
let
@@ -28,7 +30,9 @@ in
clanFlakeResult = config.clan;
in
{
imports = [ nixosTest ];
imports = [
nixosTest
] ++ lib.optionals (useContainers) [ ./container-test-driver/driver-module.nix ];
options = {
clanSettings = mkOption {
default = { };