From 044edc9d062419f87fdcd57b6edeba75b8b457ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Fri, 27 Jun 2025 18:34:50 +0200 Subject: [PATCH] container-test-driver: implement wait_for_open_port --- lib/test/container-test-driver/test_driver/__init__.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/test/container-test-driver/test_driver/__init__.py b/lib/test/container-test-driver/test_driver/__init__.py index 0c92b70b5..3d403dff2 100644 --- a/lib/test/container-test-driver/test_driver/__init__.py +++ b/lib/test/container-test-driver/test_driver/__init__.py @@ -2,6 +2,7 @@ import argparse import ctypes import os import re +import shlex import shutil import subprocess import time @@ -303,6 +304,15 @@ class Machine: retry(check_success, timeout) return output + def wait_for_open_port( + self, port: int, addr: str = "localhost", timeout: int = 900 + ) -> None: + """ + Wait for a port to be open on the given address. + """ + command = f"nc -z {shlex.quote(addr)} {port}" + self.wait_until_succeeds(command, timeout=timeout) + def wait_for_unit(self, unit: str, timeout: int = 900) -> None: """ Wait for a systemd unit to get into "active" state.