diff --git a/checks/flake-module.nix b/checks/flake-module.nix index 3a42db111..a93036fa0 100644 --- a/checks/flake-module.nix +++ b/checks/flake-module.nix @@ -86,6 +86,7 @@ in # Container Tests nixos-test-container = self.clanLib.test.containerTest ./container nixosTestArgs; + nixos-systemd-abstraction = self.clanLib.test.containerTest ./systemd-abstraction nixosTestArgs; nixos-test-user-firewall-iptables = self.clanLib.test.containerTest ./user-firewall/iptables.nix nixosTestArgs; nixos-test-user-firewall-nftables = self.clanLib.test.containerTest ./user-firewall/nftables.nix nixosTestArgs; nixos-test-extra-python-packages = self.clanLib.test.containerTest ./test-extra-python-packages nixosTestArgs; diff --git a/checks/systemd-abstraction/default.nix b/checks/systemd-abstraction/default.nix new file mode 100644 index 000000000..464b66c7e --- /dev/null +++ b/checks/systemd-abstraction/default.nix @@ -0,0 +1,67 @@ +{ self, pkgs, ... }: + +let + + cli = self.packages.${pkgs.hostPlatform.system}.clan-cli-full; +in +{ + name = "systemd-abstraction"; + + nodes = { + peer1 = { + + users.users.text-user = { + isNormalUser = true; + linger = true; + uid = 1000; + extraGroups = [ "systemd-journal" ]; + }; + + # Set environment variables for user systemd + environment.extraInit = '' + if [ "$(id -u)" = "1000" ]; then + export XDG_RUNTIME_DIR="/run/user/1000" + export DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/1000/bus" + fi + ''; + + # Enable PAM for user systemd sessions + security.pam.services.systemd-user = { + startSession = true; + # Workaround for containers - use pam_permit to avoid helper binary issues + text = pkgs.lib.mkForce '' + account required pam_permit.so + session required pam_permit.so + session required pam_env.so conffile=/etc/pam/environment readenv=0 + session required ${pkgs.systemd}/lib/security/pam_systemd.so + ''; + }; + + environment.systemPackages = [ + cli + (cli.pythonRuntime.withPackages ( + ps: with ps; [ + pytest + pytest-xdist + ] + )) + ]; + }; + }; + + testScript = + { ... }: + '' + start_all() + + peer1.wait_for_unit("multi-user.target") + peer1.wait_for_unit("user@1000.service") + + # Fix user journal permissions so text-user can read their own logs + peer1.succeed("chown text-user:systemd-journal /var/log/journal/*/user-1000.journal*") + peer1.succeed("chmod 640 /var/log/journal/*/user-1000.journal*") + + # Run tests as text-user (environment variables are set automatically) + peer1.succeed("su - text-user -c 'pytest -s -n0 ${cli}/${cli.pythonRuntime.sitePackages}/clan_lib/service_runner'") + ''; +} diff --git a/pkgs/clan-cli/clan_lib/service_runner/__init__.py b/pkgs/clan-cli/clan_lib/service_runner/__init__.py new file mode 100644 index 000000000..56d8b0bbe --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/__init__.py @@ -0,0 +1,11 @@ +"""Systemd user service runner abstraction. + +This module provides a simple interface for managing systemd user services +based on command arrays. Each service is identified by a hash of the command, +allowing you to start/stop services using the same command that was used to create them. +""" + +from .protocols import create_service_manager +from .systemd_user import SystemdUserService + +__all__ = ["SystemdUserService", "create_service_manager"] diff --git a/pkgs/clan-cli/clan_lib/service_runner/container_test.py b/pkgs/clan-cli/clan_lib/service_runner/container_test.py new file mode 100755 index 000000000..0465ba8a9 --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/container_test.py @@ -0,0 +1,379 @@ +#!/usr/bin/env python3 +"""Test suite for service runner with group functionality.""" + +# Allow assert statements and magic values in test code + +import time +from collections.abc import Generator +from contextlib import suppress +from pathlib import Path + +import pytest + +from clan_lib.errors import ClanError +from clan_lib.service_runner import create_service_manager +from clan_lib.service_runner.protocols import ServiceManagerProtocol + + +@pytest.fixture +def service_manager() -> Generator[ServiceManagerProtocol]: + """Create a service manager and ensure cleanup after test.""" + manager = create_service_manager() + + # List of services that might be created during tests + test_services = [ + "simple-service", + "nginx-service", + "api-service", + "postgres-service", + "autostart-service", + "log-test", + "restart-test", + ] + + # Test groups that might be created + test_groups = ["web", "database"] + + # Yield the manager to the test + yield manager + + # Cleanup after test (runs even if test fails) + for service in test_services: + with suppress(ClanError): + manager.stop_service(service) + + for group in test_groups: + with suppress(ClanError): + manager.stop_services_by_group(group) + + +@pytest.mark.service_runner +def test_transient_service(service_manager: ServiceManagerProtocol) -> None: + """Test transient service (no autostart, uses systemd-run).""" + # Start a transient service + name = service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + description="A simple transient service", + autostart=False, + ) + assert name == "simple-service", f"Expected 'simple-service', got {name}" + + # Give systemd time to start the service + time.sleep(0.5) + + # Check status + status = service_manager.get_status("simple-service") + assert status == "running", f"Expected 'running', got {status}" + + # Verify it's listed + services = service_manager.list_running_services() + service_names = [s["service_name"] for s in services] + assert "service-runner-simple-service" in service_names, "Service not in list" + + # Check it's marked as transient (no unit file) + simple_service = next( + s for s in services if s["service_name"] == "service-runner-simple-service" + ) + assert simple_service["unit_file"] == "(transient)", ( + f"Should be transient, got {simple_service['unit_file']!r}" + ) + + # Stop the service + service_manager.stop_service("simple-service") + + # Verify it's stopped + time.sleep(0.5) + status = service_manager.get_status("simple-service") + assert status in ("stopped", "unknown"), f"Expected stopped/unknown, got {status}" + + +@pytest.mark.service_runner +def test_autostart_service(service_manager: ServiceManagerProtocol) -> None: + """Test autostart service (creates persistent unit file).""" + # Start an autostart service + service_manager.start_service( + name="autostart-service", + command=["sleep", "300"], + description="An autostart service", + autostart=True, + ) + + time.sleep(0.5) + + # Check status + status = service_manager.get_status("autostart-service") + assert status == "running", f"Expected 'running', got {status}" + + # Verify it has a unit file (not transient) + services = service_manager.list_running_services() + autostart_service = next( + s for s in services if s["service_name"] == "service-runner-autostart-service" + ) + assert autostart_service["unit_file"] != "(transient)", "Should have unit file" + assert autostart_service["unit_file"].endswith(".service"), ( + "Should be .service file" + ) + + # Verify unit file exists + unit_file = Path(autostart_service["unit_file"]) + assert unit_file.exists(), f"Unit file should exist: {unit_file}" + + # Stop and verify unit file is removed + service_manager.stop_service("autostart-service") + + time.sleep(0.5) + assert not unit_file.exists(), f"Unit file should be removed: {unit_file}" + + +@pytest.mark.service_runner +def test_grouped_services(service_manager: ServiceManagerProtocol) -> None: + """Test services with groups.""" + # Start services in the "web" group + service_manager.start_service( + name="nginx-service", + command=["sleep", "300"], + description="Web server", + autostart=True, + group="web", + ) + + service_manager.start_service( + name="api-service", + command=["sleep", "300"], + description="API server", + autostart=True, + group="web", + ) + + # Start service in "database" group + service_manager.start_service( + name="postgres-service", + command=["sleep", "300"], + description="Database server", + autostart=True, + group="database", + ) + + time.sleep(0.5) + + # Verify all services are running + all_services = service_manager.list_running_services() + service_names = {s["service_name"] for s in all_services} + assert "service-runner-nginx-service" in service_names + assert "service-runner-api-service" in service_names + assert "service-runner-postgres-service" in service_names + + # List services by group + web_services = service_manager.list_services_by_group("web") + assert len(web_services) == 2, f"Expected 2 web services, got {len(web_services)}" + web_service_names = {s["service_name"] for s in web_services} + assert "service-runner-nginx-service" in web_service_names + assert "service-runner-api-service" in web_service_names + + db_services = service_manager.list_services_by_group("database") + assert len(db_services) == 1, f"Expected 1 db service, got {len(db_services)}" + assert db_services[0]["service_name"] == "service-runner-postgres-service" + assert db_services[0]["group"] == "database" + + # Verify all grouped services have unit files + for service in web_services + db_services: + assert service["unit_file"] != "(transient)", ( + f"{service['service_name']} should have unit file" + ) + assert service["status"] == "active", ( + f"{service['service_name']} should be active" + ) + + # Stop services by group + service_manager.stop_services_by_group("web") + + time.sleep(0.5) + + # Verify web services are stopped + web_services_after = service_manager.list_services_by_group("web") + assert len(web_services_after) == 0, "Web services should be stopped" + + # Verify database service is still running + db_services_after = service_manager.list_services_by_group("database") + assert len(db_services_after) == 1, "Database service should still be running" + + # Clean up database group + service_manager.stop_services_by_group("database") + + time.sleep(0.5) + db_services_final = service_manager.list_services_by_group("database") + assert len(db_services_final) == 0, "Database services should be stopped" + + +@pytest.mark.service_runner +def test_service_logs(service_manager: ServiceManagerProtocol) -> None: + """Test retrieving service logs.""" + # Start a service + service_manager.start_service( + name="log-test", + command=["sleep", "300"], + description="Log test service", + autostart=False, + ) + + time.sleep(0.5) + + # Get logs - just verify we can retrieve them (may be empty) + logs = service_manager.get_service_logs("log-test", lines=20) + assert isinstance(logs, str), "Logs should be a string" + + # Clean up + service_manager.stop_service("log-test") + + +@pytest.mark.service_runner +def test_nonexistent_group(service_manager: ServiceManagerProtocol) -> None: + """Test listing services in nonexistent group.""" + # List services in nonexistent group + services = service_manager.list_services_by_group("nonexistent-group") + assert services == [], f"Expected empty list, got {services}" + + +@pytest.mark.service_runner +def test_restart_service(service_manager: ServiceManagerProtocol) -> None: + """Test restarting a service.""" + # Start a service + service_manager.start_service( + name="restart-test", + command=["sleep", "300"], + description="Restart test service", + autostart=False, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("restart-test") + assert status == "running", f"Expected 'running', got {status}" + + # Restart it + service_manager.restart_service("restart-test") + + time.sleep(0.5) + + # Verify it's still running + status = service_manager.get_status("restart-test") + assert status == "running", f"Expected 'running' after restart, got {status}" + + # Clean up + service_manager.stop_service("restart-test") + + +@pytest.mark.service_runner +def test_cleanup_on_failure(service_manager: ServiceManagerProtocol) -> None: + """Test that services are cleaned up even when test fails.""" + # Start a service + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("simple-service") + assert status == "running" + + # Service will be cleaned up by fixture even if we don't explicitly stop it + # This test passes, demonstrating that cleanup happens automatically + + +@pytest.mark.service_runner +def test_start_service_twice_transient(service_manager: ServiceManagerProtocol) -> None: + """Test starting the same transient service twice (should fail or replace).""" + # Start a transient service + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("simple-service") + assert status == "running" + + # Try to start the same service again - this shouldn't fail + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + # Original service should still be running + status = service_manager.get_status("simple-service") + assert status == "running" + + +@pytest.mark.service_runner +def test_start_service_twice_autostart(service_manager: ServiceManagerProtocol) -> None: + """Test starting the same autostart service twice (just restarts it).""" + # Start an autostart service + service_manager.start_service( + name="autostart-service", + command=["sleep", "300"], + autostart=True, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("autostart-service") + assert status == "running" + + # Try to start the same service again + # For autostart services, systemd will just restart the service + # (unlike transient services which fail) + service_manager.start_service( + name="autostart-service", + command=["sleep", "300"], + autostart=True, + ) + + time.sleep(0.5) + + # Service should still be running after "restart" + status = service_manager.get_status("autostart-service") + assert status == "running" + + +@pytest.mark.service_runner +def test_start_stopped_service_again(service_manager: ServiceManagerProtocol) -> None: + """Test starting a service, stopping it, then starting it again.""" + # Start a service + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + time.sleep(0.5) + status = service_manager.get_status("simple-service") + assert status == "running" + + # Stop the service + service_manager.stop_service("simple-service") + time.sleep(0.5) + + status = service_manager.get_status("simple-service") + assert status in ("stopped", "unknown") + + # Start the service again with a different command - this should work + service_manager.start_service( + name="simple-service", + command=["sleep", "600"], + description="Restarted service with different command", + autostart=False, + ) + + time.sleep(0.5) + status = service_manager.get_status("simple-service") + assert status == "running" diff --git a/pkgs/clan-cli/clan_lib/service_runner/protocols.py b/pkgs/clan-cli/clan_lib/service_runner/protocols.py new file mode 100644 index 000000000..5f3c8e44f --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/protocols.py @@ -0,0 +1,162 @@ +"""Protocol definitions for platform-independent service management.""" + +import platform +from pathlib import Path +from typing import Protocol, runtime_checkable + +from clan_lib.errors import ClanError + +from .systemd_user import GroupedServiceInfo, ServiceInfo, ServiceStatus + + +@runtime_checkable +class ServiceManagerProtocol(Protocol): + """Protocol for platform-independent service management backends.""" + + def start_service( + self, + name: str, + command: list[str], + working_dir: Path | None = None, + env_vars: dict[str, str] | None = None, + description: str | None = None, + autostart: bool = False, + group: str | None = None, + ) -> str: + """Start a service with the given configuration. + + Args: + name: Service identifier + command: Command and arguments to run + working_dir: Working directory for the service + env_vars: Environment variables for the service + description: Human-readable service description + autostart: Whether to enable service on boot + group: Optional group name for service grouping + + Returns: + Service name/identifier + + Raises: + ClanError: If service creation or start fails + + """ + ... + + def stop_service(self, name: str) -> None: + """Stop and remove a service. + + Args: + name: Service identifier + + Raises: + ClanError: If name is empty or operation fails + + """ + ... + + def get_status(self, name: str) -> ServiceStatus: + """Get the current status of a service. + + Args: + name: Service identifier + + Returns: + Current service status + + Raises: + ClanError: If name is empty + + """ + ... + + def restart_service(self, name: str) -> None: + """Restart a service. + + Args: + name: Service identifier + + Raises: + ClanError: If name is empty or operation fails + + """ + ... + + def get_service_logs(self, name: str, lines: int = 50) -> str: + """Get recent logs for a service. + + Args: + name: Service identifier + lines: Number of recent lines to retrieve + + Returns: + Service logs as string + + Raises: + ClanError: If name is empty + + """ + ... + + def list_running_services(self) -> list[ServiceInfo]: + """List all services managed by this backend. + + Returns: + List of service information dictionaries + + """ + ... + + def list_services_by_group(self, group: str) -> list[GroupedServiceInfo]: + """List all services in the specified group. + + Args: + group: Group name to filter by + + Returns: + List of service information dictionaries for the group + + Raises: + ClanError: If group name is empty + + """ + ... + + def stop_services_by_group(self, group: str) -> None: + """Stop all services in the specified group. + + Args: + group: Group name to stop services for + + Raises: + ClanError: If group name is empty or operation fails + + """ + ... + + +def create_service_manager() -> ServiceManagerProtocol: + """Create a platform-appropriate service manager. + + Returns: + Service manager implementation for current platform + + Raises: + ClanError: If platform is not supported + + """ + system = platform.system().lower() + + if system == "linux": + from .systemd_user import SystemdUserService # noqa: PLC0415 + + return SystemdUserService( + user_systemd_dir=Path.home() / ".config" / "systemd" / "user" + ) + + supported_platforms = ["linux"] + msg = ( + f"Platform '{system}' is not supported. " + f"Supported platforms: {', '.join(supported_platforms)}" + ) + raise ClanError(msg) diff --git a/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py b/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py new file mode 100644 index 000000000..611616911 --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py @@ -0,0 +1,410 @@ +import shlex +import shutil +import textwrap +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Literal, TypedDict + +from clan_lib.cmd import RunOpts, run +from clan_lib.errors import ClanError + +if TYPE_CHECKING: + from clan_lib.errors import CmdOut + +ServiceStatus = Literal["running", "stopped", "failed", "unknown"] + + +class ServiceInfo(TypedDict): + """Information about a running service.""" + + service_name: str + status: str + command: str + unit_file: str + + +class GroupedServiceInfo(TypedDict): + """Information about a service in a group.""" + + service_name: str + status: str + command: str + unit_file: str + group: str + + +@dataclass(frozen=True) +class SystemdUserService: + """Manages systemd user services using systemd-run for transient units.""" + + user_systemd_dir: Path + + def __post_init__(self) -> None: + self.user_systemd_dir.mkdir(parents=True, exist_ok=True) + + def _service_name(self, name: str) -> str: + return f"service-runner-{name}" + + def _target_name(self, group: str) -> str: + return f"service-runner-{group}" + + def _target_file_path(self, group: str) -> Path: + return self.user_systemd_dir / f"{self._target_name(group)}.target" + + def _unit_file_path(self, name: str) -> Path: + return self.user_systemd_dir / f"{self._service_name(name)}.service" + + def _validate_name(self, name: str, type_name: str = "Service") -> None: + if not name: + msg = f"{type_name} name cannot be empty" + raise ClanError(msg) + + def _check_executable(self, command: list[str]) -> str: + executable = shutil.which(command[0]) + if not executable: + msg = f"Executable not found: {command[0]}" + raise ClanError(msg) + return executable + + def _systemctl(self, action: str, service_name: str) -> "CmdOut": + """Run systemctl command with --user flag.""" + return run( + ["systemctl", "--user", action, f"{service_name}.service"], + RunOpts(check=False), + ) + + def _get_property(self, service_name: str, prop: str) -> str: + """Get a systemd unit property value.""" + result = run( + [ + "systemctl", + "--user", + "show", + f"{service_name}.service", + f"--property={prop}", + "--no-pager", + ], + RunOpts(check=False), + ) + prefix = f"{prop}=" + for line in result.stdout.split("\n"): + if line.startswith(prefix): + return line[len(prefix) :].strip() + return "" + + def _create_target_file(self, group: str) -> None: + """Create systemd target file for a group if it doesn't exist.""" + target_file = self._target_file_path(group) + if target_file.exists(): + return + + content = textwrap.dedent( + f""" + [Unit] + Description=Service runner group: {group} + After=multi-user.target + """ + ) + target_file.touch(exist_ok=True) + target_file.chmod(0o600) + target_file.write_text(content) + run(["systemctl", "--user", "daemon-reload"]) + + def _create_autostart_unit( + self, + name: str, + command: list[str], + working_dir: Path | None, + env_vars: dict[str, str] | None, + description: str | None, + group: str | None, + ) -> None: + """Create persistent unit file for autostart services.""" + executable = self._check_executable(command) + exec_start = f"{executable} {' '.join(shlex.quote(arg) for arg in command[1:])}" + description = description or f"Service runner for {command[0]}" + + content = textwrap.dedent( + f""" + [Unit] + Description={description} + After=multi-user.target + """ + ) + + if group: + content += f"PartOf={self._target_name(group)}.target\n" + + content += textwrap.dedent( + f""" + [Service] + Type=simple + ExecStart={exec_start} + """ + ) + + if working_dir: + content += f"WorkingDirectory={working_dir}\n" + + for key, value in (env_vars or {}).items(): + content += f"Environment={key}={shlex.quote(value)}\n" + + content += textwrap.dedent( + f""" + [Install] + WantedBy={self._target_name(group) if group else "default"}.target + """ + ) + + unit_file = self._unit_file_path(name) + unit_file.touch(exist_ok=True) + unit_file.chmod(0o600) + unit_file.write_text(content) + + def start_service( + self, + name: str, + command: list[str], + working_dir: Path | None = None, + env_vars: dict[str, str] | None = None, + description: str | None = None, + autostart: bool = False, + group: str | None = None, + ) -> str: + """Start a systemd user service. + + autostart=False: Uses systemd-run (transient, no files). + autostart=True: Creates unit files (persistent across reboots). + """ + self._validate_name(name) + if not command: + msg = "Command cannot be empty" + raise ClanError(msg) + + service_name = self._service_name(name) + self._check_executable(command) + + # Stop and reset any existing service (allows redefining failed/running services) + self._systemctl("stop", service_name) + run( + ["systemctl", "--user", "reset-failed", f"{service_name}.service"], + RunOpts(check=False), + ) + + if autostart: + if group: + self._create_target_file(group) + self._create_autostart_unit( + name, command, working_dir, env_vars, description, group + ) + run(["systemctl", "--user", "daemon-reload"]) + + result = self._systemctl("enable", service_name) + if result.returncode != 0: + msg = f"Failed to enable service: {result.stderr}" + raise ClanError(msg) + + result = self._systemctl("start", service_name) + if result.returncode != 0: + msg = f"Failed to start service: {result.stderr}" + raise ClanError(msg) + else: + # Use systemd-run for transient services + desc = description or f"Service runner for {command[0]}" + cmd = [ + "systemd-run", + "--user", + f"--unit={service_name}", + f"--description={desc}", + ] + + if working_dir: + cmd.append(f"--working-directory={working_dir}") + + for key, value in (env_vars or {}).items(): + cmd.append(f"--setenv={key}={value}") + + if group: + self._create_target_file(group) + cmd.append(f"--property=PartOf={self._target_name(group)}.target") + + cmd.extend(command) + + result = run(cmd, RunOpts(error_msg="Failed to start service")) + + return name + + def stop_service(self, name: str) -> None: + """Stop a systemd user service.""" + self._validate_name(name) + service_name = self._service_name(name) + + result = self._systemctl("stop", service_name) + if result.returncode != 0 and "not loaded" not in result.stderr.lower(): + msg = f"Failed to stop service: {result.stderr}" + raise ClanError(msg) + + self._systemctl("disable", service_name) # Ignore errors for transient units + + unit_file = self._unit_file_path(name) + if unit_file.exists(): + unit_file.unlink(missing_ok=True) + run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False)) + + def get_status(self, name: str) -> ServiceStatus: + """Get the status of a service.""" + self._validate_name(name) + result = self._systemctl("is-active", self._service_name(name)) + status_map: dict[str, ServiceStatus] = { + "active": "running", + "inactive": "stopped", + "failed": "failed", + } + return status_map.get(result.stdout.strip(), "unknown") + + def restart_service(self, name: str) -> None: + """Restart a service.""" + self._validate_name(name) + result = self._systemctl("restart", self._service_name(name)) + if result.returncode != 0: + msg = f"Failed to restart service: {result.stderr}" + raise ClanError(msg) + + def get_service_logs(self, name: str, lines: int = 50) -> str: + """Get recent logs for a service.""" + self._validate_name(name) + result = run( + [ + "journalctl", + "--user", + "-u", + f"{self._service_name(name)}.service", + "-n", + str(lines), + "--no-pager", + ] + ) + return result.stdout + + def _get_service_info(self, unit_name: str) -> tuple[str, str, str]: + """Get status, command, and unit file for a service.""" + status = self._get_property(unit_name, "ActiveState") + command = self._get_property(unit_name, "ExecStart") + fragment_path = self._get_property(unit_name, "FragmentPath") + + # Transient units are stored in /run/user/.../systemd/transient/ + if not fragment_path or "/transient/" in fragment_path: + unit_file = "(transient)" + else: + unit_file = fragment_path + + return status, command, unit_file + + def list_running_services(self) -> list[ServiceInfo]: + """List all service-runner services.""" + result = run( + [ + "systemctl", + "--user", + "list-units", + "service-runner-*.service", + "--all", + "--no-legend", + "--no-pager", + "--plain", + ], + RunOpts(check=False), + ) + + services: list[ServiceInfo] = [] + # systemctl list-units format: UNIT LOAD ACTIVE SUB DESCRIPTION + min_required_fields = 4 + for line in result.stdout.strip().split("\n"): + if not line.strip(): + continue + + parts = line.split(None, 4) + if len(parts) < min_required_fields: + continue + + unit_name = parts[0].replace(".service", "") + if not unit_name.startswith("service-runner-"): + continue + + status, command, unit_file = self._get_service_info(unit_name) + services.append( + { + "service_name": unit_name, + "status": status, + "command": command, + "unit_file": unit_file, + } + ) + + return services + + def list_services_by_group(self, group: str) -> list[GroupedServiceInfo]: + """List all services in a group.""" + self._validate_name(group, "Group") + + if not self._target_file_path(group).exists(): + return [] + + result = run( + [ + "systemctl", + "--user", + "list-dependencies", + f"{self._target_name(group)}.target", + "--plain", + ], + RunOpts(check=False), + ) + + services: list[GroupedServiceInfo] = [] + for raw_line in result.stdout.split("\n"): + line = raw_line.strip() + if not (line.endswith(".service") and line.startswith("service-runner-")): + continue + + service_name = line.replace(".service", "") + status, command, unit_file = self._get_service_info(service_name) + services.append( + { + "service_name": service_name, + "status": status, + "command": command, + "unit_file": unit_file, + "group": group, + } + ) + + return services + + def stop_services_by_group(self, group: str) -> None: + """Stop all services in a group.""" + self._validate_name(group, "Group") + + target_file = self._target_file_path(group) + if not target_file.exists(): + return + + services = self.list_services_by_group(group) + + # Stop the target (stops all PartOf services) + result = run( + ["systemctl", "--user", "stop", f"{self._target_name(group)}.target"], + RunOpts(check=False), + ) + if result.returncode != 0: + msg = f"Failed to stop target: {result.stderr}" + raise ClanError(msg) + + # Disable and remove unit files for non-transient services + for service in services: + if service["unit_file"] != "(transient)": + self._systemctl("disable", service["service_name"]) + Path(service["unit_file"]).unlink(missing_ok=True) + + target_file.unlink(missing_ok=True) + run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False)) diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index 525cebaca..bb40d5e21 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -97,12 +97,13 @@ let sourceWithoutTests = cliSource ( nixFilter.filter { root = ./.; - include = [ + exclude = [ + # exclude if ( _root: path: _type: - (builtins.match ".*/test_[^/]+\.py" path) == null - && (builtins.match ".*/[^/]+_test\.py" path) == null - # && (builtins.match ".*/tests/.+" path) == null + (builtins.match ".*/test_[^/]+\.py" path) != null # matches test_*.py + && (builtins.match ".*/[^/]+_test\.py" path) != null # matches *_test.py + && (builtins.match ".*/container_test\.py" path) == null # doesn't match container_test.py ) ]; } @@ -194,7 +195,7 @@ pythonRuntime.pkgs.buildPythonApplication { # limit build cores to 16 jobs="$((NIX_BUILD_CORES>16 ? 16 : NIX_BUILD_CORES))" - python -m pytest -m "not impure and not with_core" -n "$jobs" \ + python -m pytest -m "not service_runner and not impure and not with_core" -n "$jobs" \ ./clan_cli \ ./clan_lib \ --cov ./clan_cli \ @@ -281,7 +282,7 @@ pythonRuntime.pkgs.buildPythonApplication { jobs="$((NIX_BUILD_CORES>16 ? 16 : NIX_BUILD_CORES))" # Run all tests with core marker - python -m pytest -m "not impure and with_core" -n "$jobs" \ + python -m pytest -m "not service_runner and not impure and with_core" -n "$jobs" \ ./clan_cli \ ./clan_lib \ --cov ./clan_cli \ diff --git a/pkgs/clan-cli/pyproject.toml b/pkgs/clan-cli/pyproject.toml index efbdde9b6..fae5b61bf 100644 --- a/pkgs/clan-cli/pyproject.toml +++ b/pkgs/clan-cli/pyproject.toml @@ -48,7 +48,7 @@ norecursedirs = ["clan_cli/tests/helpers", "clan_lib/nixpkgs"] # machines. In the CI pipeline we run these tests in a separate derivation # depending on clan-core. All other tests do not need to depend on clan-core # and can be cached more effectively. -markers = ["impure", "with_core"] +markers = ["impure", "with_core", "service_runner"] filterwarnings = "default::ResourceWarning" python_files = ["test_*.py", "*_test.py"]