From c6b0b114c51d01ee6c6016b5dd10c34055e47ed8 Mon Sep 17 00:00:00 2001 From: Qubasa Date: Sat, 4 Oct 2025 02:22:50 +0200 Subject: [PATCH] clan_lib: init platform independent service_runner --- .../clan_lib/service_runner/__init__.py | 11 + .../clan_lib/service_runner/protocols.py | 139 ++++++++ .../clan_lib/service_runner/systemd_user.py | 307 ++++++++++++++++++ .../service_runner/systemd_user_test.py | 304 +++++++++++++++++ 4 files changed, 761 insertions(+) create mode 100644 pkgs/clan-cli/clan_lib/service_runner/__init__.py create mode 100644 pkgs/clan-cli/clan_lib/service_runner/protocols.py create mode 100644 pkgs/clan-cli/clan_lib/service_runner/systemd_user.py create mode 100644 pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py diff --git a/pkgs/clan-cli/clan_lib/service_runner/__init__.py b/pkgs/clan-cli/clan_lib/service_runner/__init__.py new file mode 100644 index 000000000..56d8b0bbe --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/__init__.py @@ -0,0 +1,11 @@ +"""Systemd user service runner abstraction. + +This module provides a simple interface for managing systemd user services +based on command arrays. Each service is identified by a hash of the command, +allowing you to start/stop services using the same command that was used to create them. +""" + +from .protocols import create_service_manager +from .systemd_user import SystemdUserService + +__all__ = ["SystemdUserService", "create_service_manager"] diff --git a/pkgs/clan-cli/clan_lib/service_runner/protocols.py b/pkgs/clan-cli/clan_lib/service_runner/protocols.py new file mode 100644 index 000000000..79ebc94d1 --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/protocols.py @@ -0,0 +1,139 @@ +"""Protocol definitions for platform-independent service management.""" + +import platform +from pathlib import Path +from typing import Any, Protocol, runtime_checkable + +from clan_lib.errors import ClanError + +from .systemd_user import ServiceStatus + + +@runtime_checkable +class ServiceManagerProtocol(Protocol): + """Protocol for platform-independent service management backends.""" + + def start_service( + self, + name: str, + command: list[str], + working_dir: Path | None = None, + extra_env_vars: dict[str, str] | None = None, + description: str | None = None, + autostart: bool = False, + ) -> str: + """Start a service with the given configuration. + + Args: + name: Service identifier + command: Command and arguments to run + working_dir: Working directory for the service + extra_env_vars: Additional environment variables + description: Human-readable service description + autostart: Whether to enable service on boot + + Returns: + Service name/identifier + + Raises: + ClanError: If service creation or start fails + + """ + ... + + def stop_service(self, name: str) -> bool: + """Stop and remove a service. + + Args: + name: Service identifier + + Returns: + True if successful, False otherwise + + Raises: + ClanError: If name is empty + + """ + ... + + def get_status(self, name: str) -> ServiceStatus: + """Get the current status of a service. + + Args: + name: Service identifier + + Returns: + Current service status + + Raises: + ClanError: If name is empty + + """ + ... + + def restart_service(self, name: str) -> bool: + """Restart a service. + + Args: + name: Service identifier + + Returns: + True if successful, False otherwise + + Raises: + ClanError: If name is empty + + """ + ... + + def get_service_logs(self, name: str, lines: int = 50) -> str: + """Get recent logs for a service. + + Args: + name: Service identifier + lines: Number of recent lines to retrieve + + Returns: + Service logs as string + + Raises: + ClanError: If name is empty + + """ + ... + + def list_running_services(self) -> list[dict[str, Any]]: + """List all services managed by this backend. + + Returns: + List of service information dictionaries + + """ + ... + + +def create_service_manager() -> ServiceManagerProtocol: + """Create a platform-appropriate service manager. + + Returns: + Service manager implementation for current platform + + Raises: + ClanError: If platform is not supported + + """ + system = platform.system().lower() + + if system == "linux": + from .systemd_user import SystemdUserService # noqa: PLC0415 + + return SystemdUserService( + user_systemd_dir=Path.home() / ".config" / "systemd" / "user" + ) + + supported_platforms = ["linux"] + msg = ( + f"Platform '{system}' is not supported. " + f"Supported platforms: {', '.join(supported_platforms)}" + ) + raise ClanError(msg) diff --git a/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py b/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py new file mode 100644 index 000000000..73837db64 --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/systemd_user.py @@ -0,0 +1,307 @@ +import os +import shlex +import shutil +import textwrap +from collections.abc import Generator +from contextlib import contextmanager, suppress +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any, Literal + +from clan_lib.cmd import RunOpts, run +from clan_lib.errors import ClanError + +if TYPE_CHECKING: + from clan_lib.errors import CmdOut + +ServiceStatus = Literal["running", "stopped", "failed", "unknown"] + + +@dataclass(frozen=True) +class SystemdUserService: + """Manages systemd user services by name""" + + user_systemd_dir: Path + + def __post_init__(self) -> None: + self.user_systemd_dir.mkdir(parents=True, exist_ok=True) + + def _service_name(self, name: str) -> str: + """Generate service name from given name""" + return f"service-runner-{name}" + + def _unit_file_path(self, name: str) -> Path: + """Get the path to the systemd unit file for this service name""" + service_name = self._service_name(name) + return self.user_systemd_dir / f"{service_name}.service" + + @contextmanager + def _cleanup_on_error(self, unit_file: Path) -> Generator[None]: + """Context manager to clean up created files if an exception occurs""" + try: + yield + except Exception: + # Clean up the unit file if it was created + if unit_file.exists(): + with suppress(OSError): + unit_file.unlink() + raise + + def _create_unit_file( + self, + name: str, + command: list[str], + working_dir: Path | None = None, + env_vars: dict[str, str] | None = None, + description: str | None = None, + autostart: bool = False, + ) -> Path: + """Create systemd unit file for the given command""" + unit_file = self._unit_file_path(name) + + with self._cleanup_on_error(unit_file): + executable = shutil.which(command[0]) + if not executable: + msg = f"Executable not found: {command[0]}" + raise ClanError(msg) + exec_start = f"{executable} {' '.join(command[1:])}" + + if not description: + description = f"Service runner for {shlex.quote(command[0])}" + + unit_content = textwrap.dedent( + f""" + [Unit] + Description="{description}" + After=multi-user.target + + [Service] + Type=simple + ExecStart={exec_start} + """ + ) + + if working_dir: + unit_content += f"WorkingDirectory={working_dir}\n" + + if env_vars: + for key, value in env_vars.items(): + # Properly quote the value for systemd + quoted_value = shlex.quote(value) + unit_content += f"Environment={key}={quoted_value}\n" + + if autostart: + unit_content += textwrap.dedent( + """ + [Install] + WantedBy=default.target + """ + ) + + unit_file.touch(exist_ok=True) + unit_file.chmod(0o600) + with unit_file.open("w") as f: + f.write(unit_content) + + return unit_file + + def _run_systemctl(self, action: str, service_name: str) -> "CmdOut": + """Run systemctl command with --user flag""" + cmd = ["systemctl", "--user", action, f"{service_name}.service"] + return run(cmd, RunOpts(check=False)) + + def start_service( + self, + name: str, + command: list[str], + working_dir: Path | None = None, + extra_env_vars: dict[str, str] | None = None, + description: str | None = None, + autostart: bool = False, + ) -> str: + """Start a systemd user service for the given command. + Returns the service name. + """ + if not command: + msg = "Command cannot be empty" + raise ClanError(msg) + if not name: + msg = "Service name cannot be empty" + raise ClanError(msg) + + service_name = self._service_name(name) + + # Collect essential environment variables for user services + env_vars = {} + + # Essential variables that user services typically need + essential_vars = [ + "PATH", + "HOME", + "USER", + "LOGNAME", + "XDG_CONFIG_HOME", + "XDG_DATA_HOME", + "XDG_CACHE_HOME", + "XDG_RUNTIME_DIR", + "XDG_SESSION_ID", + "XDG_SESSION_TYPE", + "DBUS_SESSION_BUS_ADDRESS", + "SSH_AUTH_SOCK", + "SSH_AGENT_PID", + "GPG_AGENT_INFO", + "GNUPGHOME", + ] + + # Add essential vars if they exist in the current environment + for var in essential_vars: + value = os.environ.get(var) + if value is not None: + env_vars[var] = value + + # Allow extra_env_vars to override defaults + env_vars.update(extra_env_vars or {}) + + # Create the unit file + self._create_unit_file( + name, command, working_dir, env_vars, description, autostart + ) + + run(["systemctl", "--user", "daemon-reload"]) + + # Enable the service only if autostart is True + if autostart: + result = self._run_systemctl("enable", service_name) + if result.returncode != 0: + msg = f"Failed to enable service: {result.stderr}" + raise ClanError(msg) + + # Start the service + result = self._run_systemctl("start", service_name) + if result.returncode != 0: + msg = f"Failed to start service: {result.stderr}" + raise ClanError(msg) + + return name + + def stop_service(self, name: str) -> bool: + """Stop the systemd user service for the given name. + Returns True if successful, False otherwise. + """ + if not name: + msg = "Service name cannot be empty" + raise ClanError(msg) + + service_name = self._service_name(name) + + # Stop the service + result = self._run_systemctl("stop", service_name) + if result.returncode != 0: + return False + + # Disable the service + result = self._run_systemctl("disable", service_name) + if result.returncode != 0: + return False + + # Remove the unit file + unit_file = self._unit_file_path(name) + try: + unit_file.unlink(missing_ok=True) + except OSError: + return False + + run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False)) + + return True + + def get_status(self, name: str) -> ServiceStatus: + """Get the status of the service for the given name""" + if not name: + msg = "Service name cannot be empty" + raise ClanError(msg) + + service_name = self._service_name(name) + + # Check if unit file exists + unit_file = self._unit_file_path(name) + if not unit_file.exists(): + return "unknown" + + result = self._run_systemctl("is-active", service_name) + status_output = result.stdout.strip() + + if status_output == "active": + return "running" + if status_output == "inactive": + return "stopped" + if status_output == "failed": + return "failed" + return "unknown" + + def restart_service(self, name: str) -> bool: + """Restart the service for the given name""" + if not name: + msg = "Service name cannot be empty" + raise ClanError(msg) + + service_name = self._service_name(name) + + result = self._run_systemctl("restart", service_name) + return result.returncode == 0 + + def get_service_logs(self, name: str, lines: int = 50) -> str: + """Get recent logs for the service""" + if not name: + msg = "Service name cannot be empty" + raise ClanError(msg) + + service_name = self._service_name(name) + + cmd = [ + "journalctl", + "--user", + "-u", + f"{service_name}.service", + "-n", + str(lines), + "--no-pager", + ] + result = run(cmd, RunOpts(check=False)) + if result.returncode == 0: + return result.stdout + return f"Failed to get logs: {result.stderr}" + + def list_running_services(self) -> list[dict[str, Any]]: + """List all running service-runner services""" + services = [] + + # Get all service files + for unit_file in self.user_systemd_dir.glob("service-runner-*.service"): + service_name = unit_file.stem + + # Get status + result = self._run_systemctl("is-active", service_name) + status = result.stdout.strip() + + # Try to extract command from unit file + try: + with unit_file.open() as f: + content = f.read() + # Simple parsing - look for ExecStart line + for line in content.split("\n"): + if line.startswith("ExecStart="): + exec_start = line[10:] # Remove "ExecStart=" + services.append( + { + "service_name": service_name, + "status": status, + "command": exec_start, + "unit_file": str(unit_file), + } + ) + break + except OSError: + continue + + return services diff --git a/pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py b/pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py new file mode 100644 index 000000000..cb851296f --- /dev/null +++ b/pkgs/clan-cli/clan_lib/service_runner/systemd_user_test.py @@ -0,0 +1,304 @@ +import shutil +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from clan_lib.errors import ClanError + +from .systemd_user import SystemdUserService + + +@pytest.fixture +def service_runner(temporary_home: Path) -> SystemdUserService: + """Create a ServiceRunner instance with temporary home directory""" + systemd_dir = temporary_home / ".config" / "systemd" / "user" + return SystemdUserService(user_systemd_dir=systemd_dir) + + +@pytest.fixture +def systemd_service(temporary_home: Path) -> SystemdUserService: + """Create a SystemdUserService instance with temporary home directory""" + systemd_dir = temporary_home / ".config" / "systemd" / "user" + return SystemdUserService(user_systemd_dir=systemd_dir) + + +class TestSystemdUserService: + def test_service_name_generation(self, systemd_service: SystemdUserService) -> None: + """Test service name generation from name""" + name = "test-service" + service_name = systemd_service._service_name(name) + + assert service_name == "service-runner-test-service" + + def test_unit_file_path( + self, systemd_service: SystemdUserService, temporary_home: Path + ) -> None: + """Test unit file path generation""" + name = "test-service" + unit_file = systemd_service._unit_file_path(name) + + expected_dir = temporary_home / ".config" / "systemd" / "user" + assert unit_file.parent == expected_dir + assert unit_file.suffix == ".service" + assert unit_file.name == "service-runner-test-service.service" + + def test_create_unit_file( + self, systemd_service: SystemdUserService, temporary_home: Path + ) -> None: + """Test systemd unit file creation""" + name = "test-service" + command = ["python3", "-c", "print('test')"] + working_dir = temporary_home + env_vars = {"TEST_VAR": "test_value", "ANOTHER": "value"} + description = "Test service" + + unit_file = systemd_service._create_unit_file( + name, command, working_dir, env_vars, description + ) + + assert unit_file.exists() + content = unit_file.read_text() + + # Check basic structure + assert "[Unit]" in content + assert "[Service]" in content + + # Check specific values + assert f'Description="{description}"' in content + assert f"WorkingDirectory={working_dir}" in content + assert "Environment=TEST_VAR=test_value" in content + assert "Environment=ANOTHER=value" in content + + def test_create_unit_file_with_spaces( + self, systemd_service: SystemdUserService + ) -> None: + """Test unit file creation with commands containing spaces""" + name = "test-service" + command = ["python3", "-c", "print('hello world')"] + + unit_file = systemd_service._create_unit_file(name, command) + content = unit_file.read_text() + executable = shutil.which(command[0]) + expect = f"ExecStart={executable} -c print('hello world')" + # Should properly escape arguments with spaces + assert expect in content + + @patch("clan_lib.service_runner.systemd_user.run") + def test_run_systemctl( + self, mock_run: MagicMock, systemd_service: SystemdUserService + ) -> None: + """Test systemctl command execution""" + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "active" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + result = systemd_service._run_systemctl("status", "test-service") + + mock_run.assert_called_once() + assert result.returncode == 0 + + @patch("clan_lib.service_runner.systemd_user.run") + def test_get_status_running( + self, mock_run: MagicMock, systemd_service: SystemdUserService + ) -> None: + """Test status detection for running service""" + # Mock unit file existence + name = "test-service" + unit_file = systemd_service._unit_file_path(name) + unit_file.parent.mkdir(parents=True, exist_ok=True) + unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test") + + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "active" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + status = systemd_service.get_status(name) + assert status == "running" + + @patch("clan_lib.service_runner.systemd_user.run") + def test_get_status_stopped( + self, mock_run: MagicMock, systemd_service: SystemdUserService + ) -> None: + """Test status detection for stopped service""" + # Mock unit file existence + name = "test-service" + unit_file = systemd_service._unit_file_path(name) + unit_file.parent.mkdir(parents=True, exist_ok=True) + unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test") + + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "inactive" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + status = systemd_service.get_status(name) + assert status == "stopped" + + def test_get_status_unknown_no_unit_file( + self, systemd_service: SystemdUserService + ) -> None: + """Test status detection when no unit file exists""" + name = "nonexistent" + + status = systemd_service.get_status(name) + assert status == "unknown" + + +class TestServiceRunner: + def test_empty_name_raises_error(self, service_runner: SystemdUserService) -> None: + """Test that empty service name raises ClanError""" + with pytest.raises(ClanError, match="Service name cannot be empty"): + service_runner.start_service("", ["echo", "test"]) + + def test_empty_command_raises_error( + self, service_runner: SystemdUserService + ) -> None: + """Test that empty command raises ClanError""" + with pytest.raises(ClanError, match="Command cannot be empty"): + service_runner.start_service("test-service", []) + + @patch("clan_lib.service_runner.systemd_user.run") + def test_start_service_mocked( + self, mock_run: MagicMock, service_runner: SystemdUserService + ) -> None: + """Test service start with mocked systemctl calls""" + # Mock successful systemctl calls + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + name = "test-service" + command = ["echo", "test"] + service_name = service_runner.start_service( + name, command, description="Test service" + ) + + assert service_name == name + + # Verify systemctl calls were made + assert mock_run.call_count >= 2 # At least daemon-reload, enable, start + + @patch("clan_lib.service_runner.systemd_user.run") + def test_stop_service_mocked( + self, mock_run: MagicMock, service_runner: SystemdUserService + ) -> None: + """Test service stop with mocked systemctl calls""" + # First create a unit file + name = "test-service" + command = ["echo", "test"] + unit_file = service_runner._create_unit_file(name, command) + + # Mock successful systemctl calls + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + success = service_runner.stop_service(name) + assert success is True + + # Check unit file was removed + assert not unit_file.exists() + + @patch("clan_lib.service_runner.systemd_user.run") + def test_restart_service_mocked( + self, mock_run: MagicMock, service_runner: SystemdUserService + ) -> None: + """Test service restart with mocked systemctl calls""" + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + name = "test-service" + success = service_runner.restart_service(name) + + assert success is True + + @patch("clan_lib.service_runner.systemd_user.run") + def test_logs_service_mocked( + self, mock_run: MagicMock, service_runner: SystemdUserService + ) -> None: + """Test getting service logs with mocked journalctl""" + expected_logs = "Test log output\nAnother log line" + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = expected_logs + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + name = "test-service" + logs = service_runner.get_service_logs(name, lines=25) + + assert logs == expected_logs + mock_run.assert_called_once() + # Check journalctl command structure + call_args = mock_run.call_args[0][0] + assert "journalctl" in call_args + assert "--user" in call_args + assert "-n" in call_args + assert "25" in call_args + + def test_list_services_empty(self, service_runner: SystemdUserService) -> None: + """Test listing services when none exist""" + services = service_runner.list_running_services() + assert services == [] + + def test_list_services_with_unit_files( + self, service_runner: SystemdUserService + ) -> None: + """Test listing services when unit files exist""" + # Create some mock unit files + systemd_dir = service_runner.user_systemd_dir + + unit1 = systemd_dir / "service-runner-test1.service" + unit1.write_text("""[Unit] +Description=Test Service 1 + +[Service] +ExecStart=echo test1 + +[Install] +WantedBy=default.target +""") + + unit2 = systemd_dir / "service-runner-test2.service" + unit2.write_text("""[Unit] +Description=Test Service 2 + +[Service] +ExecStart=python3 -c "print('test')" + +[Install] +WantedBy=default.target +""") + + with patch("clan_lib.service_runner.systemd_user.run") as mock_run: + mock_cmd_out = MagicMock() + mock_cmd_out.returncode = 0 + mock_cmd_out.stdout = "inactive" + mock_cmd_out.stderr = "" + mock_run.return_value = mock_cmd_out + + services = service_runner.list_running_services() + + assert len(services) == 2 + + service_names = [s["service_name"] for s in services] + assert "service-runner-test1" in service_names + assert "service-runner-test2" in service_names + + # Check command extraction + commands = [s["command"] for s in services] + assert "echo test1" in commands + assert "python3 -c \"print('test')\"" in commands