diff --git a/checks/flake-module.nix b/checks/flake-module.nix index 3a42db111..a93036fa0 100644 --- a/checks/flake-module.nix +++ b/checks/flake-module.nix @@ -86,6 +86,7 @@ in # Container Tests nixos-test-container = self.clanLib.test.containerTest ./container nixosTestArgs; + nixos-systemd-abstraction = self.clanLib.test.containerTest ./systemd-abstraction nixosTestArgs; nixos-test-user-firewall-iptables = self.clanLib.test.containerTest ./user-firewall/iptables.nix nixosTestArgs; nixos-test-user-firewall-nftables = self.clanLib.test.containerTest ./user-firewall/nftables.nix nixosTestArgs; nixos-test-extra-python-packages = self.clanLib.test.containerTest ./test-extra-python-packages nixosTestArgs; diff --git a/checks/systemd-abstraction/default.nix b/checks/systemd-abstraction/default.nix new file mode 100644 index 000000000..464b66c7e --- /dev/null +++ b/checks/systemd-abstraction/default.nix @@ -0,0 +1,67 @@ +{ self, pkgs, ... }: + +let + + cli = self.packages.${pkgs.hostPlatform.system}.clan-cli-full; +in +{ + name = "systemd-abstraction"; + + nodes = { + peer1 = { + + users.users.text-user = { + isNormalUser = true; + linger = true; + uid = 1000; + extraGroups = [ "systemd-journal" ]; + }; + + # Set environment variables for user systemd + environment.extraInit = '' + if [ "$(id -u)" = "1000" ]; then + export XDG_RUNTIME_DIR="/run/user/1000" + export DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/1000/bus" + fi + ''; + + # Enable PAM for user systemd sessions + security.pam.services.systemd-user = { + startSession = true; + # Workaround for containers - use pam_permit to avoid helper binary issues + text = pkgs.lib.mkForce '' + account required pam_permit.so + session required pam_permit.so + session required pam_env.so conffile=/etc/pam/environment readenv=0 + session required ${pkgs.systemd}/lib/security/pam_systemd.so + ''; + }; + + environment.systemPackages = [ + cli + (cli.pythonRuntime.withPackages ( + ps: with ps; [ + pytest + pytest-xdist + ] + )) + ]; + }; + }; + + testScript = + { ... }: + '' + start_all() + + peer1.wait_for_unit("multi-user.target") + peer1.wait_for_unit("user@1000.service") + + # Fix user journal permissions so text-user can read their own logs + peer1.succeed("chown text-user:systemd-journal /var/log/journal/*/user-1000.journal*") + peer1.succeed("chmod 640 /var/log/journal/*/user-1000.journal*") + + # Run tests as text-user (environment variables are set automatically) + peer1.succeed("su - text-user -c 'pytest -s -n0 ${cli}/${cli.pythonRuntime.sitePackages}/clan_lib/service_runner'") + ''; +} diff --git a/pkgs/clan-cli/clan_lib/service_runner/container_test.py b/pkgs/clan-cli/clan_lib/service_runner/container_test.py new file mode 100755 index 000000000..8d2e8f937 --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/container_test.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 +"""Test suite for service runner with group functionality.""" + +# Allow assert statements and magic values in test code + +import time +from collections.abc import Generator +from contextlib import suppress +from pathlib import Path + +import pytest + +from clan_lib.errors import ClanError +from clan_lib.service_runner import create_service_manager +from clan_lib.service_runner.protocols import ServiceManagerProtocol + + +@pytest.fixture +def service_manager() -> Generator[ServiceManagerProtocol]: + """Create a service manager and ensure cleanup after test.""" + manager = create_service_manager() + + # List of services that might be created during tests + test_services = [ + "simple-service", + "nginx-service", + "api-service", + "postgres-service", + "autostart-service", + "log-test", + "restart-test", + ] + + # Test groups that might be created + test_groups = ["web", "database"] + + # Yield the manager to the test + yield manager + + # Cleanup after test (runs even if test fails) + for service in test_services: + with suppress(ClanError): + manager.stop_service(service) + + for group in test_groups: + with suppress(ClanError): + manager.stop_services_by_group(group) + + +@pytest.mark.service_runner +def test_transient_service(service_manager: ServiceManagerProtocol) -> None: + """Test transient service (no autostart, uses systemd-run).""" + # Start a transient service + name = service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + description="A simple transient service", + autostart=False, + ) + assert name == "simple-service", f"Expected 'simple-service', got {name}" + + # Give systemd time to start the service + time.sleep(0.5) + + # Check status + status = service_manager.get_status("simple-service") + assert status == "running", f"Expected 'running', got {status}" + + # Verify it's listed + services = service_manager.list_running_services() + service_names = [s["service_name"] for s in services] + assert "service-runner-simple-service" in service_names, "Service not in list" + + # Check it's marked as transient (no unit file) + simple_service = next( + s for s in services if s["service_name"] == "service-runner-simple-service" + ) + assert simple_service["unit_file"] == "(transient)", ( + f"Should be transient, got {simple_service['unit_file']!r}" + ) + + # Stop the service + service_manager.stop_service("simple-service") + + # Verify it's stopped + time.sleep(0.5) + status = service_manager.get_status("simple-service") + assert status in ("stopped", "unknown"), f"Expected stopped/unknown, got {status}" + + +@pytest.mark.service_runner +def test_autostart_service(service_manager: ServiceManagerProtocol) -> None: + """Test autostart service (creates persistent unit file).""" + # Start an autostart service + service_manager.start_service( + name="autostart-service", + command=["sleep", "300"], + description="An autostart service", + autostart=True, + ) + + time.sleep(0.5) + + # Check status + status = service_manager.get_status("autostart-service") + assert status == "running", f"Expected 'running', got {status}" + + # Verify it has a unit file (not transient) + services = service_manager.list_running_services() + autostart_service = next( + s for s in services if s["service_name"] == "service-runner-autostart-service" + ) + assert autostart_service["unit_file"] != "(transient)", "Should have unit file" + assert autostart_service["unit_file"].endswith(".service"), ( + "Should be .service file" + ) + + # Verify unit file exists + unit_file = Path(autostart_service["unit_file"]) + assert unit_file.exists(), f"Unit file should exist: {unit_file}" + + # Stop and verify unit file is removed + service_manager.stop_service("autostart-service") + + time.sleep(0.5) + assert not unit_file.exists(), f"Unit file should be removed: {unit_file}" + + +@pytest.mark.service_runner +def test_grouped_services(service_manager: ServiceManagerProtocol) -> None: + """Test services with groups.""" + # Start services in the "web" group + service_manager.start_service( + name="nginx-service", + command=["sleep", "300"], + description="Web server", + autostart=True, + group="web", + ) + + service_manager.start_service( + name="api-service", + command=["sleep", "300"], + description="API server", + autostart=True, + group="web", + ) + + # Start service in "database" group + service_manager.start_service( + name="postgres-service", + command=["sleep", "300"], + description="Database server", + autostart=True, + group="database", + ) + + time.sleep(0.5) + + # Verify all services are running + all_services = service_manager.list_running_services() + service_names = {s["service_name"] for s in all_services} + assert "service-runner-nginx-service" in service_names + assert "service-runner-api-service" in service_names + assert "service-runner-postgres-service" in service_names + + # List services by group + web_services = service_manager.list_services_by_group("web") + assert len(web_services) == 2, f"Expected 2 web services, got {len(web_services)}" + web_service_names = {s["service_name"] for s in web_services} + assert "service-runner-nginx-service" in web_service_names + assert "service-runner-api-service" in web_service_names + + db_services = service_manager.list_services_by_group("database") + assert len(db_services) == 1, f"Expected 1 db service, got {len(db_services)}" + assert db_services[0]["service_name"] == "service-runner-postgres-service" + assert db_services[0]["group"] == "database" + + # Verify all grouped services have unit files + for service in web_services + db_services: + assert service["unit_file"] != "(transient)", ( + f"{service['service_name']} should have unit file" + ) + assert service["status"] == "active", ( + f"{service['service_name']} should be active" + ) + + # Stop services by group + service_manager.stop_services_by_group("web") + + time.sleep(0.5) + + # Verify web services are stopped + web_services_after = service_manager.list_services_by_group("web") + assert len(web_services_after) == 0, "Web services should be stopped" + + # Verify database service is still running + db_services_after = service_manager.list_services_by_group("database") + assert len(db_services_after) == 1, "Database service should still be running" + + # Clean up database group + service_manager.stop_services_by_group("database") + + time.sleep(0.5) + db_services_final = service_manager.list_services_by_group("database") + assert len(db_services_final) == 0, "Database services should be stopped" + + +@pytest.mark.service_runner +def test_service_logs(service_manager: ServiceManagerProtocol) -> None: + """Test retrieving service logs.""" + # Start a service + service_manager.start_service( + name="log-test", + command=["sleep", "300"], + description="Log test service", + autostart=False, + ) + + time.sleep(0.5) + + # Get logs - just verify we can retrieve them (may be empty) + logs = service_manager.get_service_logs("log-test", lines=20) + assert isinstance(logs, str), "Logs should be a string" + + # Clean up + service_manager.stop_service("log-test") + + +@pytest.mark.service_runner +def test_nonexistent_group(service_manager: ServiceManagerProtocol) -> None: + """Test listing services in nonexistent group.""" + # List services in nonexistent group + services = service_manager.list_services_by_group("nonexistent-group") + assert services == [], f"Expected empty list, got {services}" + + +@pytest.mark.service_runner +def test_restart_service(service_manager: ServiceManagerProtocol) -> None: + """Test restarting a service.""" + # Start a service + service_manager.start_service( + name="restart-test", + command=["sleep", "300"], + description="Restart test service", + autostart=False, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("restart-test") + assert status == "running", f"Expected 'running', got {status}" + + # Restart it + service_manager.restart_service("restart-test") + + time.sleep(0.5) + + # Verify it's still running + status = service_manager.get_status("restart-test") + assert status == "running", f"Expected 'running' after restart, got {status}" + + # Clean up + service_manager.stop_service("restart-test") + + +@pytest.mark.service_runner +def test_cleanup_on_failure(service_manager: ServiceManagerProtocol) -> None: + """Test that services are cleaned up even when test fails.""" + # Start a service + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("simple-service") + assert status == "running" + + # Service will be cleaned up by fixture even if we don't explicitly stop it + # This test passes, demonstrating that cleanup happens automatically + + +@pytest.mark.service_runner +def test_start_service_twice_transient(service_manager: ServiceManagerProtocol) -> None: + """Test starting the same transient service twice (should fail or replace).""" + # Start a transient service + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("simple-service") + assert status == "running" + + # Try to start the same service again - this should fail + # systemd won't allow starting a unit with the same name + with pytest.raises(ClanError, match="Failed to start service"): + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + # Original service should still be running + status = service_manager.get_status("simple-service") + assert status == "running" + + +@pytest.mark.service_runner +def test_start_service_twice_autostart(service_manager: ServiceManagerProtocol) -> None: + """Test starting the same autostart service twice (just restarts it).""" + # Start an autostart service + service_manager.start_service( + name="autostart-service", + command=["sleep", "300"], + autostart=True, + ) + + time.sleep(0.5) + + # Verify it's running + status = service_manager.get_status("autostart-service") + assert status == "running" + + # Try to start the same service again + # For autostart services, systemd will just restart the service + # (unlike transient services which fail) + service_manager.start_service( + name="autostart-service", + command=["sleep", "300"], + autostart=True, + ) + + time.sleep(0.5) + + # Service should still be running after "restart" + status = service_manager.get_status("autostart-service") + assert status == "running" + + +@pytest.mark.service_runner +def test_start_stopped_service_again(service_manager: ServiceManagerProtocol) -> None: + """Test starting a service, stopping it, then starting it again.""" + # Start a service + service_manager.start_service( + name="simple-service", + command=["sleep", "300"], + autostart=False, + ) + + time.sleep(0.5) + status = service_manager.get_status("simple-service") + assert status == "running" + + # Stop the service + service_manager.stop_service("simple-service") + time.sleep(0.5) + + status = service_manager.get_status("simple-service") + assert status in ("stopped", "unknown") + + # Start the service again with a different command - this should work + service_manager.start_service( + name="simple-service", + command=["sleep", "600"], + description="Restarted service with different command", + autostart=False, + ) + + time.sleep(0.5) + status = service_manager.get_status("simple-service") + assert status == "running" diff --git a/pkgs/clan-cli/clan_lib/service_runner/protocols.py b/pkgs/clan-cli/clan_lib/service_runner/protocols.py index 79ebc94d1..5f3c8e44f 100644 --- a/pkgs/clan-cli/clan_lib/service_runner/protocols.py +++ b/pkgs/clan-cli/clan_lib/service_runner/protocols.py @@ -2,11 +2,11 @@ import platform from pathlib import Path -from typing import Any, Protocol, runtime_checkable +from typing import Protocol, runtime_checkable from clan_lib.errors import ClanError -from .systemd_user import ServiceStatus +from .systemd_user import GroupedServiceInfo, ServiceInfo, ServiceStatus @runtime_checkable @@ -18,9 +18,10 @@ class ServiceManagerProtocol(Protocol): name: str, command: list[str], working_dir: Path | None = None, - extra_env_vars: dict[str, str] | None = None, + env_vars: dict[str, str] | None = None, description: str | None = None, autostart: bool = False, + group: str | None = None, ) -> str: """Start a service with the given configuration. @@ -28,9 +29,10 @@ class ServiceManagerProtocol(Protocol): name: Service identifier command: Command and arguments to run working_dir: Working directory for the service - extra_env_vars: Additional environment variables + env_vars: Environment variables for the service description: Human-readable service description autostart: Whether to enable service on boot + group: Optional group name for service grouping Returns: Service name/identifier @@ -41,17 +43,14 @@ class ServiceManagerProtocol(Protocol): """ ... - def stop_service(self, name: str) -> bool: + def stop_service(self, name: str) -> None: """Stop and remove a service. Args: name: Service identifier - Returns: - True if successful, False otherwise - Raises: - ClanError: If name is empty + ClanError: If name is empty or operation fails """ ... @@ -71,17 +70,14 @@ class ServiceManagerProtocol(Protocol): """ ... - def restart_service(self, name: str) -> bool: + def restart_service(self, name: str) -> None: """Restart a service. Args: name: Service identifier - Returns: - True if successful, False otherwise - Raises: - ClanError: If name is empty + ClanError: If name is empty or operation fails """ ... @@ -102,7 +98,7 @@ class ServiceManagerProtocol(Protocol): """ ... - def list_running_services(self) -> list[dict[str, Any]]: + def list_running_services(self) -> list[ServiceInfo]: """List all services managed by this backend. Returns: @@ -111,6 +107,33 @@ class ServiceManagerProtocol(Protocol): """ ... + def list_services_by_group(self, group: str) -> list[GroupedServiceInfo]: + """List all services in the specified group. + + Args: + group: Group name to filter by + + Returns: + List of service information dictionaries for the group + + Raises: + ClanError: If group name is empty + + """ + ... + + def stop_services_by_group(self, group: str) -> None: + """Stop all services in the specified group. + + Args: + group: Group name to stop services for + + Raises: + ClanError: If group name is empty or operation fails + + """ + ... + def create_service_manager() -> ServiceManagerProtocol: """Create a platform-appropriate service manager. diff --git a/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py b/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py index 73837db64..62f180a54 100644 --- a/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py +++ b/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py @@ -1,12 +1,9 @@ -import os import shlex import shutil import textwrap -from collections.abc import Generator -from contextlib import contextmanager, suppress from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Literal, TypedDict from clan_lib.cmd import RunOpts, run from clan_lib.errors import ClanError @@ -17,9 +14,28 @@ if TYPE_CHECKING: ServiceStatus = Literal["running", "stopped", "failed", "unknown"] +class ServiceInfo(TypedDict): + """Information about a running service.""" + + service_name: str + status: str + command: str + unit_file: str + + +class GroupedServiceInfo(TypedDict): + """Information about a service in a group.""" + + service_name: str + status: str + command: str + unit_file: str + group: str + + @dataclass(frozen=True) class SystemdUserService: - """Manages systemd user services by name""" + """Manages systemd user services using systemd-run for transient units.""" user_systemd_dir: Path @@ -27,27 +43,125 @@ class SystemdUserService: self.user_systemd_dir.mkdir(parents=True, exist_ok=True) def _service_name(self, name: str) -> str: - """Generate service name from given name""" return f"service-runner-{name}" + def _target_name(self, group: str) -> str: + return f"service-runner-{group}" + + def _target_file_path(self, group: str) -> Path: + return self.user_systemd_dir / f"{self._target_name(group)}.target" + def _unit_file_path(self, name: str) -> Path: - """Get the path to the systemd unit file for this service name""" - service_name = self._service_name(name) - return self.user_systemd_dir / f"{service_name}.service" + return self.user_systemd_dir / f"{self._service_name(name)}.service" - @contextmanager - def _cleanup_on_error(self, unit_file: Path) -> Generator[None]: - """Context manager to clean up created files if an exception occurs""" - try: - yield - except Exception: - # Clean up the unit file if it was created - if unit_file.exists(): - with suppress(OSError): - unit_file.unlink() - raise + def _validate_name(self, name: str, type_name: str = "Service") -> None: + if not name: + msg = f"{type_name} name cannot be empty" + raise ClanError(msg) - def _create_unit_file( + def _check_executable(self, command: list[str]) -> str: + executable = shutil.which(command[0]) + if not executable: + msg = f"Executable not found: {command[0]}" + raise ClanError(msg) + return executable + + def _systemctl(self, action: str, service_name: str) -> "CmdOut": + """Run systemctl command with --user flag.""" + return run( + ["systemctl", "--user", action, f"{service_name}.service"], + RunOpts(check=False), + ) + + def _get_property(self, service_name: str, prop: str) -> str: + """Get a systemd unit property value.""" + result = run( + [ + "systemctl", + "--user", + "show", + f"{service_name}.service", + f"--property={prop}", + "--no-pager", + ], + RunOpts(check=False), + ) + prefix = f"{prop}=" + for line in result.stdout.split("\n"): + if line.startswith(prefix): + return line[len(prefix) :].strip() + return "" + + def _create_target_file(self, group: str) -> None: + """Create systemd target file for a group if it doesn't exist.""" + target_file = self._target_file_path(group) + if target_file.exists(): + return + + content = textwrap.dedent( + f""" + [Unit] + Description=Service runner group: {group} + After=multi-user.target + """ + ) + target_file.touch(exist_ok=True) + target_file.chmod(0o600) + target_file.write_text(content) + run(["systemctl", "--user", "daemon-reload"]) + + def _create_autostart_unit( + self, + name: str, + command: list[str], + working_dir: Path | None, + env_vars: dict[str, str] | None, + description: str | None, + group: str | None, + ) -> None: + """Create persistent unit file for autostart services.""" + executable = self._check_executable(command) + exec_start = f"{executable} {' '.join(shlex.quote(arg) for arg in command[1:])}" + description = description or f"Service runner for {command[0]}" + + content = textwrap.dedent( + f""" + [Unit] + Description={description} + After=multi-user.target + """ + ) + + if group: + content += f"PartOf={self._target_name(group)}.target\n" + + content += textwrap.dedent( + f""" + [Service] + Type=simple + ExecStart={exec_start} + """ + ) + + if working_dir: + content += f"WorkingDirectory={working_dir}\n" + + for key, value in (env_vars or {}).items(): + content += f"Environment={key}={shlex.quote(value)}\n" + + content += textwrap.dedent( + f""" + [Install] + WantedBy={self._target_name(group) if group else "default"}.target + """ + ) + + unit_file = self._unit_file_path(name) + unit_file.touch(exist_ok=True) + unit_file.chmod(0o600) + unit_file.write_text(content) + + def start_service( self, name: str, command: list[str], @@ -55,253 +169,238 @@ class SystemdUserService: env_vars: dict[str, str] | None = None, description: str | None = None, autostart: bool = False, - ) -> Path: - """Create systemd unit file for the given command""" - unit_file = self._unit_file_path(name) - - with self._cleanup_on_error(unit_file): - executable = shutil.which(command[0]) - if not executable: - msg = f"Executable not found: {command[0]}" - raise ClanError(msg) - exec_start = f"{executable} {' '.join(command[1:])}" - - if not description: - description = f"Service runner for {shlex.quote(command[0])}" - - unit_content = textwrap.dedent( - f""" - [Unit] - Description="{description}" - After=multi-user.target - - [Service] - Type=simple - ExecStart={exec_start} - """ - ) - - if working_dir: - unit_content += f"WorkingDirectory={working_dir}\n" - - if env_vars: - for key, value in env_vars.items(): - # Properly quote the value for systemd - quoted_value = shlex.quote(value) - unit_content += f"Environment={key}={quoted_value}\n" - - if autostart: - unit_content += textwrap.dedent( - """ - [Install] - WantedBy=default.target - """ - ) - - unit_file.touch(exist_ok=True) - unit_file.chmod(0o600) - with unit_file.open("w") as f: - f.write(unit_content) - - return unit_file - - def _run_systemctl(self, action: str, service_name: str) -> "CmdOut": - """Run systemctl command with --user flag""" - cmd = ["systemctl", "--user", action, f"{service_name}.service"] - return run(cmd, RunOpts(check=False)) - - def start_service( - self, - name: str, - command: list[str], - working_dir: Path | None = None, - extra_env_vars: dict[str, str] | None = None, - description: str | None = None, - autostart: bool = False, + group: str | None = None, ) -> str: - """Start a systemd user service for the given command. - Returns the service name. + """Start a systemd user service. + + autostart=False: Uses systemd-run (transient, no files). + autostart=True: Creates unit files (persistent across reboots). """ + self._validate_name(name) if not command: msg = "Command cannot be empty" raise ClanError(msg) - if not name: - msg = "Service name cannot be empty" - raise ClanError(msg) service_name = self._service_name(name) + self._check_executable(command) - # Collect essential environment variables for user services - env_vars = {} - - # Essential variables that user services typically need - essential_vars = [ - "PATH", - "HOME", - "USER", - "LOGNAME", - "XDG_CONFIG_HOME", - "XDG_DATA_HOME", - "XDG_CACHE_HOME", - "XDG_RUNTIME_DIR", - "XDG_SESSION_ID", - "XDG_SESSION_TYPE", - "DBUS_SESSION_BUS_ADDRESS", - "SSH_AUTH_SOCK", - "SSH_AGENT_PID", - "GPG_AGENT_INFO", - "GNUPGHOME", - ] - - # Add essential vars if they exist in the current environment - for var in essential_vars: - value = os.environ.get(var) - if value is not None: - env_vars[var] = value - - # Allow extra_env_vars to override defaults - env_vars.update(extra_env_vars or {}) - - # Create the unit file - self._create_unit_file( - name, command, working_dir, env_vars, description, autostart - ) - - run(["systemctl", "--user", "daemon-reload"]) - - # Enable the service only if autostart is True if autostart: - result = self._run_systemctl("enable", service_name) + if group: + self._create_target_file(group) + self._create_autostart_unit( + name, command, working_dir, env_vars, description, group + ) + run(["systemctl", "--user", "daemon-reload"]) + + result = self._systemctl("enable", service_name) if result.returncode != 0: msg = f"Failed to enable service: {result.stderr}" raise ClanError(msg) - # Start the service - result = self._run_systemctl("start", service_name) - if result.returncode != 0: - msg = f"Failed to start service: {result.stderr}" - raise ClanError(msg) + result = self._systemctl("start", service_name) + if result.returncode != 0: + msg = f"Failed to start service: {result.stderr}" + raise ClanError(msg) + else: + # Use systemd-run for transient services + desc = description or f"Service runner for {command[0]}" + cmd = [ + "systemd-run", + "--user", + f"--unit={service_name}", + f"--description={desc}", + ] + + if working_dir: + cmd.append(f"--working-directory={working_dir}") + + for key, value in (env_vars or {}).items(): + cmd.append(f"--setenv={key}={value}") + + if group: + self._create_target_file(group) + cmd.append(f"--property=PartOf={self._target_name(group)}.target") + + cmd.extend(command) + + result = run(cmd, RunOpts(check=False)) + if result.returncode != 0: + msg = f"Failed to start service: {result.stderr}" + raise ClanError(msg) return name - def stop_service(self, name: str) -> bool: - """Stop the systemd user service for the given name. - Returns True if successful, False otherwise. - """ - if not name: - msg = "Service name cannot be empty" - raise ClanError(msg) - + def stop_service(self, name: str) -> None: + """Stop a systemd user service.""" + self._validate_name(name) service_name = self._service_name(name) - # Stop the service - result = self._run_systemctl("stop", service_name) - if result.returncode != 0: - return False + result = self._systemctl("stop", service_name) + if result.returncode != 0 and "not loaded" not in result.stderr.lower(): + msg = f"Failed to stop service: {result.stderr}" + raise ClanError(msg) - # Disable the service - result = self._run_systemctl("disable", service_name) - if result.returncode != 0: - return False + self._systemctl("disable", service_name) # Ignore errors for transient units - # Remove the unit file unit_file = self._unit_file_path(name) - try: + if unit_file.exists(): unit_file.unlink(missing_ok=True) - except OSError: - return False - - run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False)) - - return True + run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False)) def get_status(self, name: str) -> ServiceStatus: - """Get the status of the service for the given name""" - if not name: - msg = "Service name cannot be empty" + """Get the status of a service.""" + self._validate_name(name) + result = self._systemctl("is-active", self._service_name(name)) + status_map: dict[str, ServiceStatus] = { + "active": "running", + "inactive": "stopped", + "failed": "failed", + } + return status_map.get(result.stdout.strip(), "unknown") + + def restart_service(self, name: str) -> None: + """Restart a service.""" + self._validate_name(name) + result = self._systemctl("restart", self._service_name(name)) + if result.returncode != 0: + msg = f"Failed to restart service: {result.stderr}" raise ClanError(msg) - service_name = self._service_name(name) - - # Check if unit file exists - unit_file = self._unit_file_path(name) - if not unit_file.exists(): - return "unknown" - - result = self._run_systemctl("is-active", service_name) - status_output = result.stdout.strip() - - if status_output == "active": - return "running" - if status_output == "inactive": - return "stopped" - if status_output == "failed": - return "failed" - return "unknown" - - def restart_service(self, name: str) -> bool: - """Restart the service for the given name""" - if not name: - msg = "Service name cannot be empty" - raise ClanError(msg) - - service_name = self._service_name(name) - - result = self._run_systemctl("restart", service_name) - return result.returncode == 0 - def get_service_logs(self, name: str, lines: int = 50) -> str: - """Get recent logs for the service""" - if not name: - msg = "Service name cannot be empty" - raise ClanError(msg) + """Get recent logs for a service.""" + self._validate_name(name) + result = run( + [ + "journalctl", + "--user", + "-u", + f"{self._service_name(name)}.service", + "-n", + str(lines), + "--no-pager", + ] + ) + return result.stdout - service_name = self._service_name(name) + def _get_service_info(self, unit_name: str) -> tuple[str, str, str]: + """Get status, command, and unit file for a service.""" + status = self._get_property(unit_name, "ActiveState") + command = self._get_property(unit_name, "ExecStart") + fragment_path = self._get_property(unit_name, "FragmentPath") - cmd = [ - "journalctl", - "--user", - "-u", - f"{service_name}.service", - "-n", - str(lines), - "--no-pager", - ] - result = run(cmd, RunOpts(check=False)) - if result.returncode == 0: - return result.stdout - return f"Failed to get logs: {result.stderr}" + # Transient units are stored in /run/user/.../systemd/transient/ + if not fragment_path or "/transient/" in fragment_path: + unit_file = "(transient)" + else: + unit_file = fragment_path - def list_running_services(self) -> list[dict[str, Any]]: - """List all running service-runner services""" - services = [] + return status, command, unit_file - # Get all service files - for unit_file in self.user_systemd_dir.glob("service-runner-*.service"): - service_name = unit_file.stem + def list_running_services(self) -> list[ServiceInfo]: + """List all service-runner services.""" + result = run( + [ + "systemctl", + "--user", + "list-units", + "service-runner-*.service", + "--all", + "--no-legend", + "--no-pager", + "--plain", + ], + RunOpts(check=False), + ) - # Get status - result = self._run_systemctl("is-active", service_name) - status = result.stdout.strip() - - # Try to extract command from unit file - try: - with unit_file.open() as f: - content = f.read() - # Simple parsing - look for ExecStart line - for line in content.split("\n"): - if line.startswith("ExecStart="): - exec_start = line[10:] # Remove "ExecStart=" - services.append( - { - "service_name": service_name, - "status": status, - "command": exec_start, - "unit_file": str(unit_file), - } - ) - break - except OSError: + services: list[ServiceInfo] = [] + # systemctl list-units format: UNIT LOAD ACTIVE SUB DESCRIPTION + min_required_fields = 4 + for line in result.stdout.strip().split("\n"): + if not line.strip(): continue + parts = line.split(None, 4) + if len(parts) < min_required_fields: + continue + + unit_name = parts[0].replace(".service", "") + if not unit_name.startswith("service-runner-"): + continue + + status, command, unit_file = self._get_service_info(unit_name) + services.append( + { + "service_name": unit_name, + "status": status, + "command": command, + "unit_file": unit_file, + } + ) + return services + + def list_services_by_group(self, group: str) -> list[GroupedServiceInfo]: + """List all services in a group.""" + self._validate_name(group, "Group") + + if not self._target_file_path(group).exists(): + return [] + + result = run( + [ + "systemctl", + "--user", + "list-dependencies", + f"{self._target_name(group)}.target", + "--plain", + ], + RunOpts(check=False), + ) + + services: list[GroupedServiceInfo] = [] + for raw_line in result.stdout.split("\n"): + line = raw_line.strip() + if not (line.endswith(".service") and line.startswith("service-runner-")): + continue + + service_name = line.replace(".service", "") + status, command, unit_file = self._get_service_info(service_name) + services.append( + { + "service_name": service_name, + "status": status, + "command": command, + "unit_file": unit_file, + "group": group, + } + ) + + return services + + def stop_services_by_group(self, group: str) -> None: + """Stop all services in a group.""" + self._validate_name(group, "Group") + + target_file = self._target_file_path(group) + if not target_file.exists(): + return + + services = self.list_services_by_group(group) + + # Stop the target (stops all PartOf services) + result = run( + ["systemctl", "--user", "stop", f"{self._target_name(group)}.target"], + RunOpts(check=False), + ) + if result.returncode != 0: + msg = f"Failed to stop target: {result.stderr}" + raise ClanError(msg) + + # Disable and remove unit files for non-transient services + for service in services: + if service["unit_file"] != "(transient)": + self._systemctl("disable", service["service_name"]) + Path(service["unit_file"]).unlink(missing_ok=True) + + target_file.unlink(missing_ok=True) + run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False)) diff --git a/pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py b/pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py deleted file mode 100644 index cb851296f..000000000 --- a/pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py +++ /dev/null @@ -1,304 +0,0 @@ -import shutil -from pathlib import Path -from unittest.mock import MagicMock, patch - -import pytest - -from clan_lib.errors import ClanError - -from .systemd_user import SystemdUserService - - -@pytest.fixture -def service_runner(temporary_home: Path) -> SystemdUserService: - """Create a ServiceRunner instance with temporary home directory""" - systemd_dir = temporary_home / ".config" / "systemd" / "user" - return SystemdUserService(user_systemd_dir=systemd_dir) - - -@pytest.fixture -def systemd_service(temporary_home: Path) -> SystemdUserService: - """Create a SystemdUserService instance with temporary home directory""" - systemd_dir = temporary_home / ".config" / "systemd" / "user" - return SystemdUserService(user_systemd_dir=systemd_dir) - - -class TestSystemdUserService: - def test_service_name_generation(self, systemd_service: SystemdUserService) -> None: - """Test service name generation from name""" - name = "test-service" - service_name = systemd_service._service_name(name) - - assert service_name == "service-runner-test-service" - - def test_unit_file_path( - self, systemd_service: SystemdUserService, temporary_home: Path - ) -> None: - """Test unit file path generation""" - name = "test-service" - unit_file = systemd_service._unit_file_path(name) - - expected_dir = temporary_home / ".config" / "systemd" / "user" - assert unit_file.parent == expected_dir - assert unit_file.suffix == ".service" - assert unit_file.name == "service-runner-test-service.service" - - def test_create_unit_file( - self, systemd_service: SystemdUserService, temporary_home: Path - ) -> None: - """Test systemd unit file creation""" - name = "test-service" - command = ["python3", "-c", "print('test')"] - working_dir = temporary_home - env_vars = {"TEST_VAR": "test_value", "ANOTHER": "value"} - description = "Test service" - - unit_file = systemd_service._create_unit_file( - name, command, working_dir, env_vars, description - ) - - assert unit_file.exists() - content = unit_file.read_text() - - # Check basic structure - assert "[Unit]" in content - assert "[Service]" in content - - # Check specific values - assert f'Description="{description}"' in content - assert f"WorkingDirectory={working_dir}" in content - assert "Environment=TEST_VAR=test_value" in content - assert "Environment=ANOTHER=value" in content - - def test_create_unit_file_with_spaces( - self, systemd_service: SystemdUserService - ) -> None: - """Test unit file creation with commands containing spaces""" - name = "test-service" - command = ["python3", "-c", "print('hello world')"] - - unit_file = systemd_service._create_unit_file(name, command) - content = unit_file.read_text() - executable = shutil.which(command[0]) - expect = f"ExecStart={executable} -c print('hello world')" - # Should properly escape arguments with spaces - assert expect in content - - @patch("clan_lib.service_runner.systemd_user.run") - def test_run_systemctl( - self, mock_run: MagicMock, systemd_service: SystemdUserService - ) -> None: - """Test systemctl command execution""" - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "active" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - result = systemd_service._run_systemctl("status", "test-service") - - mock_run.assert_called_once() - assert result.returncode == 0 - - @patch("clan_lib.service_runner.systemd_user.run") - def test_get_status_running( - self, mock_run: MagicMock, systemd_service: SystemdUserService - ) -> None: - """Test status detection for running service""" - # Mock unit file existence - name = "test-service" - unit_file = systemd_service._unit_file_path(name) - unit_file.parent.mkdir(parents=True, exist_ok=True) - unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test") - - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "active" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - status = systemd_service.get_status(name) - assert status == "running" - - @patch("clan_lib.service_runner.systemd_user.run") - def test_get_status_stopped( - self, mock_run: MagicMock, systemd_service: SystemdUserService - ) -> None: - """Test status detection for stopped service""" - # Mock unit file existence - name = "test-service" - unit_file = systemd_service._unit_file_path(name) - unit_file.parent.mkdir(parents=True, exist_ok=True) - unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test") - - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "inactive" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - status = systemd_service.get_status(name) - assert status == "stopped" - - def test_get_status_unknown_no_unit_file( - self, systemd_service: SystemdUserService - ) -> None: - """Test status detection when no unit file exists""" - name = "nonexistent" - - status = systemd_service.get_status(name) - assert status == "unknown" - - -class TestServiceRunner: - def test_empty_name_raises_error(self, service_runner: SystemdUserService) -> None: - """Test that empty service name raises ClanError""" - with pytest.raises(ClanError, match="Service name cannot be empty"): - service_runner.start_service("", ["echo", "test"]) - - def test_empty_command_raises_error( - self, service_runner: SystemdUserService - ) -> None: - """Test that empty command raises ClanError""" - with pytest.raises(ClanError, match="Command cannot be empty"): - service_runner.start_service("test-service", []) - - @patch("clan_lib.service_runner.systemd_user.run") - def test_start_service_mocked( - self, mock_run: MagicMock, service_runner: SystemdUserService - ) -> None: - """Test service start with mocked systemctl calls""" - # Mock successful systemctl calls - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - name = "test-service" - command = ["echo", "test"] - service_name = service_runner.start_service( - name, command, description="Test service" - ) - - assert service_name == name - - # Verify systemctl calls were made - assert mock_run.call_count >= 2 # At least daemon-reload, enable, start - - @patch("clan_lib.service_runner.systemd_user.run") - def test_stop_service_mocked( - self, mock_run: MagicMock, service_runner: SystemdUserService - ) -> None: - """Test service stop with mocked systemctl calls""" - # First create a unit file - name = "test-service" - command = ["echo", "test"] - unit_file = service_runner._create_unit_file(name, command) - - # Mock successful systemctl calls - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - success = service_runner.stop_service(name) - assert success is True - - # Check unit file was removed - assert not unit_file.exists() - - @patch("clan_lib.service_runner.systemd_user.run") - def test_restart_service_mocked( - self, mock_run: MagicMock, service_runner: SystemdUserService - ) -> None: - """Test service restart with mocked systemctl calls""" - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - name = "test-service" - success = service_runner.restart_service(name) - - assert success is True - - @patch("clan_lib.service_runner.systemd_user.run") - def test_logs_service_mocked( - self, mock_run: MagicMock, service_runner: SystemdUserService - ) -> None: - """Test getting service logs with mocked journalctl""" - expected_logs = "Test log output\nAnother log line" - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = expected_logs - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - name = "test-service" - logs = service_runner.get_service_logs(name, lines=25) - - assert logs == expected_logs - mock_run.assert_called_once() - # Check journalctl command structure - call_args = mock_run.call_args[0][0] - assert "journalctl" in call_args - assert "--user" in call_args - assert "-n" in call_args - assert "25" in call_args - - def test_list_services_empty(self, service_runner: SystemdUserService) -> None: - """Test listing services when none exist""" - services = service_runner.list_running_services() - assert services == [] - - def test_list_services_with_unit_files( - self, service_runner: SystemdUserService - ) -> None: - """Test listing services when unit files exist""" - # Create some mock unit files - systemd_dir = service_runner.user_systemd_dir - - unit1 = systemd_dir / "service-runner-test1.service" - unit1.write_text("""[Unit] -Description=Test Service 1 - -[Service] -ExecStart=echo test1 - -[Install] -WantedBy=default.target -""") - - unit2 = systemd_dir / "service-runner-test2.service" - unit2.write_text("""[Unit] -Description=Test Service 2 - -[Service] -ExecStart=python3 -c "print('test')" - -[Install] -WantedBy=default.target -""") - - with patch("clan_lib.service_runner.systemd_user.run") as mock_run: - mock_cmd_out = MagicMock() - mock_cmd_out.returncode = 0 - mock_cmd_out.stdout = "inactive" - mock_cmd_out.stderr = "" - mock_run.return_value = mock_cmd_out - - services = service_runner.list_running_services() - - assert len(services) == 2 - - service_names = [s["service_name"] for s in services] - assert "service-runner-test1" in service_names - assert "service-runner-test2" in service_names - - # Check command extraction - commands = [s["command"] for s in services] - assert "echo test1" in commands - assert "python3 -c \"print('test')\"" in commands diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index 525cebaca..bb40d5e21 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -97,12 +97,13 @@ let sourceWithoutTests = cliSource ( nixFilter.filter { root = ./.; - include = [ + exclude = [ + # exclude if ( _root: path: _type: - (builtins.match ".*/test_[^/]+\.py" path) == null - && (builtins.match ".*/[^/]+_test\.py" path) == null - # && (builtins.match ".*/tests/.+" path) == null + (builtins.match ".*/test_[^/]+\.py" path) != null # matches test_*.py + && (builtins.match ".*/[^/]+_test\.py" path) != null # matches *_test.py + && (builtins.match ".*/container_test\.py" path) == null # doesn't match container_test.py ) ]; } @@ -194,7 +195,7 @@ pythonRuntime.pkgs.buildPythonApplication { # limit build cores to 16 jobs="$((NIX_BUILD_CORES>16 ? 16 : NIX_BUILD_CORES))" - python -m pytest -m "not impure and not with_core" -n "$jobs" \ + python -m pytest -m "not service_runner and not impure and not with_core" -n "$jobs" \ ./clan_cli \ ./clan_lib \ --cov ./clan_cli \ @@ -281,7 +282,7 @@ pythonRuntime.pkgs.buildPythonApplication { jobs="$((NIX_BUILD_CORES>16 ? 16 : NIX_BUILD_CORES))" # Run all tests with core marker - python -m pytest -m "not impure and with_core" -n "$jobs" \ + python -m pytest -m "not service_runner and not impure and with_core" -n "$jobs" \ ./clan_cli \ ./clan_lib \ --cov ./clan_cli \ diff --git a/pkgs/clan-cli/pyproject.toml b/pkgs/clan-cli/pyproject.toml index efbdde9b6..fae5b61bf 100644 --- a/pkgs/clan-cli/pyproject.toml +++ b/pkgs/clan-cli/pyproject.toml @@ -48,7 +48,7 @@ norecursedirs = ["clan_cli/tests/helpers", "clan_lib/nixpkgs"] # machines. In the CI pipeline we run these tests in a separate derivation # depending on clan-core. All other tests do not need to depend on clan-core # and can be cached more effectively. -markers = ["impure", "with_core"] +markers = ["impure", "with_core", "service_runner"] filterwarnings = "default::ResourceWarning" python_files = ["test_*.py", "*_test.py"]