container-test-driver: implement wait_for_open_port

This commit is contained in:
Jörg Thalheim
2025-06-27 18:34:50 +02:00
parent 97e52a6946
commit 044edc9d06

View File

@@ -2,6 +2,7 @@ import argparse
import ctypes import ctypes
import os import os
import re import re
import shlex
import shutil import shutil
import subprocess import subprocess
import time import time
@@ -303,6 +304,15 @@ class Machine:
retry(check_success, timeout) retry(check_success, timeout)
return output return output
def wait_for_open_port(
self, port: int, addr: str = "localhost", timeout: int = 900
) -> None:
"""
Wait for a port to be open on the given address.
"""
command = f"nc -z {shlex.quote(addr)} {port}"
self.wait_until_succeeds(command, timeout=timeout)
def wait_for_unit(self, unit: str, timeout: int = 900) -> None: def wait_for_unit(self, unit: str, timeout: int = 900) -> None:
""" """
Wait for a systemd unit to get into "active" state. Wait for a systemd unit to get into "active" state.