clan_lib: init platform independent service_runner

This commit is contained in:
Qubasa
2025-10-04 02:22:50 +02:00
parent 8803343ae1
commit c6b0b114c5
4 changed files with 761 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
"""Systemd user service runner abstraction.
This module provides a simple interface for managing systemd user services
based on command arrays. Each service is identified by a hash of the command,
allowing you to start/stop services using the same command that was used to create them.
"""
from .protocols import create_service_manager
from .systemd_user import SystemdUserService
__all__ = ["SystemdUserService", "create_service_manager"]

View File

@@ -0,0 +1,139 @@
"""Protocol definitions for platform-independent service management."""
import platform
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from clan_lib.errors import ClanError
from .systemd_user import ServiceStatus
@runtime_checkable
class ServiceManagerProtocol(Protocol):
"""Protocol for platform-independent service management backends."""
def start_service(
self,
name: str,
command: list[str],
working_dir: Path | None = None,
extra_env_vars: dict[str, str] | None = None,
description: str | None = None,
autostart: bool = False,
) -> str:
"""Start a service with the given configuration.
Args:
name: Service identifier
command: Command and arguments to run
working_dir: Working directory for the service
extra_env_vars: Additional environment variables
description: Human-readable service description
autostart: Whether to enable service on boot
Returns:
Service name/identifier
Raises:
ClanError: If service creation or start fails
"""
...
def stop_service(self, name: str) -> bool:
"""Stop and remove a service.
Args:
name: Service identifier
Returns:
True if successful, False otherwise
Raises:
ClanError: If name is empty
"""
...
def get_status(self, name: str) -> ServiceStatus:
"""Get the current status of a service.
Args:
name: Service identifier
Returns:
Current service status
Raises:
ClanError: If name is empty
"""
...
def restart_service(self, name: str) -> bool:
"""Restart a service.
Args:
name: Service identifier
Returns:
True if successful, False otherwise
Raises:
ClanError: If name is empty
"""
...
def get_service_logs(self, name: str, lines: int = 50) -> str:
"""Get recent logs for a service.
Args:
name: Service identifier
lines: Number of recent lines to retrieve
Returns:
Service logs as string
Raises:
ClanError: If name is empty
"""
...
def list_running_services(self) -> list[dict[str, Any]]:
"""List all services managed by this backend.
Returns:
List of service information dictionaries
"""
...
def create_service_manager() -> ServiceManagerProtocol:
"""Create a platform-appropriate service manager.
Returns:
Service manager implementation for current platform
Raises:
ClanError: If platform is not supported
"""
system = platform.system().lower()
if system == "linux":
from .systemd_user import SystemdUserService # noqa: PLC0415
return SystemdUserService(
user_systemd_dir=Path.home() / ".config" / "systemd" / "user"
)
supported_platforms = ["linux"]
msg = (
f"Platform '{system}' is not supported. "
f"Supported platforms: {', '.join(supported_platforms)}"
)
raise ClanError(msg)

View File

@@ -0,0 +1,307 @@
import os
import shlex
import shutil
import textwrap
from collections.abc import Generator
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from clan_lib.cmd import RunOpts, run
from clan_lib.errors import ClanError
if TYPE_CHECKING:
from clan_lib.errors import CmdOut
ServiceStatus = Literal["running", "stopped", "failed", "unknown"]
@dataclass(frozen=True)
class SystemdUserService:
"""Manages systemd user services by name"""
user_systemd_dir: Path
def __post_init__(self) -> None:
self.user_systemd_dir.mkdir(parents=True, exist_ok=True)
def _service_name(self, name: str) -> str:
"""Generate service name from given name"""
return f"service-runner-{name}"
def _unit_file_path(self, name: str) -> Path:
"""Get the path to the systemd unit file for this service name"""
service_name = self._service_name(name)
return self.user_systemd_dir / f"{service_name}.service"
@contextmanager
def _cleanup_on_error(self, unit_file: Path) -> Generator[None]:
"""Context manager to clean up created files if an exception occurs"""
try:
yield
except Exception:
# Clean up the unit file if it was created
if unit_file.exists():
with suppress(OSError):
unit_file.unlink()
raise
def _create_unit_file(
self,
name: str,
command: list[str],
working_dir: Path | None = None,
env_vars: dict[str, str] | None = None,
description: str | None = None,
autostart: bool = False,
) -> Path:
"""Create systemd unit file for the given command"""
unit_file = self._unit_file_path(name)
with self._cleanup_on_error(unit_file):
executable = shutil.which(command[0])
if not executable:
msg = f"Executable not found: {command[0]}"
raise ClanError(msg)
exec_start = f"{executable} {' '.join(command[1:])}"
if not description:
description = f"Service runner for {shlex.quote(command[0])}"
unit_content = textwrap.dedent(
f"""
[Unit]
Description="{description}"
After=multi-user.target
[Service]
Type=simple
ExecStart={exec_start}
"""
)
if working_dir:
unit_content += f"WorkingDirectory={working_dir}\n"
if env_vars:
for key, value in env_vars.items():
# Properly quote the value for systemd
quoted_value = shlex.quote(value)
unit_content += f"Environment={key}={quoted_value}\n"
if autostart:
unit_content += textwrap.dedent(
"""
[Install]
WantedBy=default.target
"""
)
unit_file.touch(exist_ok=True)
unit_file.chmod(0o600)
with unit_file.open("w") as f:
f.write(unit_content)
return unit_file
def _run_systemctl(self, action: str, service_name: str) -> "CmdOut":
"""Run systemctl command with --user flag"""
cmd = ["systemctl", "--user", action, f"{service_name}.service"]
return run(cmd, RunOpts(check=False))
def start_service(
self,
name: str,
command: list[str],
working_dir: Path | None = None,
extra_env_vars: dict[str, str] | None = None,
description: str | None = None,
autostart: bool = False,
) -> str:
"""Start a systemd user service for the given command.
Returns the service name.
"""
if not command:
msg = "Command cannot be empty"
raise ClanError(msg)
if not name:
msg = "Service name cannot be empty"
raise ClanError(msg)
service_name = self._service_name(name)
# Collect essential environment variables for user services
env_vars = {}
# Essential variables that user services typically need
essential_vars = [
"PATH",
"HOME",
"USER",
"LOGNAME",
"XDG_CONFIG_HOME",
"XDG_DATA_HOME",
"XDG_CACHE_HOME",
"XDG_RUNTIME_DIR",
"XDG_SESSION_ID",
"XDG_SESSION_TYPE",
"DBUS_SESSION_BUS_ADDRESS",
"SSH_AUTH_SOCK",
"SSH_AGENT_PID",
"GPG_AGENT_INFO",
"GNUPGHOME",
]
# Add essential vars if they exist in the current environment
for var in essential_vars:
value = os.environ.get(var)
if value is not None:
env_vars[var] = value
# Allow extra_env_vars to override defaults
env_vars.update(extra_env_vars or {})
# Create the unit file
self._create_unit_file(
name, command, working_dir, env_vars, description, autostart
)
run(["systemctl", "--user", "daemon-reload"])
# Enable the service only if autostart is True
if autostart:
result = self._run_systemctl("enable", service_name)
if result.returncode != 0:
msg = f"Failed to enable service: {result.stderr}"
raise ClanError(msg)
# Start the service
result = self._run_systemctl("start", service_name)
if result.returncode != 0:
msg = f"Failed to start service: {result.stderr}"
raise ClanError(msg)
return name
def stop_service(self, name: str) -> bool:
"""Stop the systemd user service for the given name.
Returns True if successful, False otherwise.
"""
if not name:
msg = "Service name cannot be empty"
raise ClanError(msg)
service_name = self._service_name(name)
# Stop the service
result = self._run_systemctl("stop", service_name)
if result.returncode != 0:
return False
# Disable the service
result = self._run_systemctl("disable", service_name)
if result.returncode != 0:
return False
# Remove the unit file
unit_file = self._unit_file_path(name)
try:
unit_file.unlink(missing_ok=True)
except OSError:
return False
run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False))
return True
def get_status(self, name: str) -> ServiceStatus:
"""Get the status of the service for the given name"""
if not name:
msg = "Service name cannot be empty"
raise ClanError(msg)
service_name = self._service_name(name)
# Check if unit file exists
unit_file = self._unit_file_path(name)
if not unit_file.exists():
return "unknown"
result = self._run_systemctl("is-active", service_name)
status_output = result.stdout.strip()
if status_output == "active":
return "running"
if status_output == "inactive":
return "stopped"
if status_output == "failed":
return "failed"
return "unknown"
def restart_service(self, name: str) -> bool:
"""Restart the service for the given name"""
if not name:
msg = "Service name cannot be empty"
raise ClanError(msg)
service_name = self._service_name(name)
result = self._run_systemctl("restart", service_name)
return result.returncode == 0
def get_service_logs(self, name: str, lines: int = 50) -> str:
"""Get recent logs for the service"""
if not name:
msg = "Service name cannot be empty"
raise ClanError(msg)
service_name = self._service_name(name)
cmd = [
"journalctl",
"--user",
"-u",
f"{service_name}.service",
"-n",
str(lines),
"--no-pager",
]
result = run(cmd, RunOpts(check=False))
if result.returncode == 0:
return result.stdout
return f"Failed to get logs: {result.stderr}"
def list_running_services(self) -> list[dict[str, Any]]:
"""List all running service-runner services"""
services = []
# Get all service files
for unit_file in self.user_systemd_dir.glob("service-runner-*.service"):
service_name = unit_file.stem
# Get status
result = self._run_systemctl("is-active", service_name)
status = result.stdout.strip()
# Try to extract command from unit file
try:
with unit_file.open() as f:
content = f.read()
# Simple parsing - look for ExecStart line
for line in content.split("\n"):
if line.startswith("ExecStart="):
exec_start = line[10:] # Remove "ExecStart="
services.append(
{
"service_name": service_name,
"status": status,
"command": exec_start,
"unit_file": str(unit_file),
}
)
break
except OSError:
continue
return services

View File

@@ -0,0 +1,304 @@
import shutil
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from clan_lib.errors import ClanError
from .systemd_user import SystemdUserService
@pytest.fixture
def service_runner(temporary_home: Path) -> SystemdUserService:
"""Create a ServiceRunner instance with temporary home directory"""
systemd_dir = temporary_home / ".config" / "systemd" / "user"
return SystemdUserService(user_systemd_dir=systemd_dir)
@pytest.fixture
def systemd_service(temporary_home: Path) -> SystemdUserService:
"""Create a SystemdUserService instance with temporary home directory"""
systemd_dir = temporary_home / ".config" / "systemd" / "user"
return SystemdUserService(user_systemd_dir=systemd_dir)
class TestSystemdUserService:
def test_service_name_generation(self, systemd_service: SystemdUserService) -> None:
"""Test service name generation from name"""
name = "test-service"
service_name = systemd_service._service_name(name)
assert service_name == "service-runner-test-service"
def test_unit_file_path(
self, systemd_service: SystemdUserService, temporary_home: Path
) -> None:
"""Test unit file path generation"""
name = "test-service"
unit_file = systemd_service._unit_file_path(name)
expected_dir = temporary_home / ".config" / "systemd" / "user"
assert unit_file.parent == expected_dir
assert unit_file.suffix == ".service"
assert unit_file.name == "service-runner-test-service.service"
def test_create_unit_file(
self, systemd_service: SystemdUserService, temporary_home: Path
) -> None:
"""Test systemd unit file creation"""
name = "test-service"
command = ["python3", "-c", "print('test')"]
working_dir = temporary_home
env_vars = {"TEST_VAR": "test_value", "ANOTHER": "value"}
description = "Test service"
unit_file = systemd_service._create_unit_file(
name, command, working_dir, env_vars, description
)
assert unit_file.exists()
content = unit_file.read_text()
# Check basic structure
assert "[Unit]" in content
assert "[Service]" in content
# Check specific values
assert f'Description="{description}"' in content
assert f"WorkingDirectory={working_dir}" in content
assert "Environment=TEST_VAR=test_value" in content
assert "Environment=ANOTHER=value" in content
def test_create_unit_file_with_spaces(
self, systemd_service: SystemdUserService
) -> None:
"""Test unit file creation with commands containing spaces"""
name = "test-service"
command = ["python3", "-c", "print('hello world')"]
unit_file = systemd_service._create_unit_file(name, command)
content = unit_file.read_text()
executable = shutil.which(command[0])
expect = f"ExecStart={executable} -c print('hello world')"
# Should properly escape arguments with spaces
assert expect in content
@patch("clan_lib.service_runner.systemd_user.run")
def test_run_systemctl(
self, mock_run: MagicMock, systemd_service: SystemdUserService
) -> None:
"""Test systemctl command execution"""
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = "active"
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
result = systemd_service._run_systemctl("status", "test-service")
mock_run.assert_called_once()
assert result.returncode == 0
@patch("clan_lib.service_runner.systemd_user.run")
def test_get_status_running(
self, mock_run: MagicMock, systemd_service: SystemdUserService
) -> None:
"""Test status detection for running service"""
# Mock unit file existence
name = "test-service"
unit_file = systemd_service._unit_file_path(name)
unit_file.parent.mkdir(parents=True, exist_ok=True)
unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test")
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = "active"
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
status = systemd_service.get_status(name)
assert status == "running"
@patch("clan_lib.service_runner.systemd_user.run")
def test_get_status_stopped(
self, mock_run: MagicMock, systemd_service: SystemdUserService
) -> None:
"""Test status detection for stopped service"""
# Mock unit file existence
name = "test-service"
unit_file = systemd_service._unit_file_path(name)
unit_file.parent.mkdir(parents=True, exist_ok=True)
unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test")
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = "inactive"
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
status = systemd_service.get_status(name)
assert status == "stopped"
def test_get_status_unknown_no_unit_file(
self, systemd_service: SystemdUserService
) -> None:
"""Test status detection when no unit file exists"""
name = "nonexistent"
status = systemd_service.get_status(name)
assert status == "unknown"
class TestServiceRunner:
def test_empty_name_raises_error(self, service_runner: SystemdUserService) -> None:
"""Test that empty service name raises ClanError"""
with pytest.raises(ClanError, match="Service name cannot be empty"):
service_runner.start_service("", ["echo", "test"])
def test_empty_command_raises_error(
self, service_runner: SystemdUserService
) -> None:
"""Test that empty command raises ClanError"""
with pytest.raises(ClanError, match="Command cannot be empty"):
service_runner.start_service("test-service", [])
@patch("clan_lib.service_runner.systemd_user.run")
def test_start_service_mocked(
self, mock_run: MagicMock, service_runner: SystemdUserService
) -> None:
"""Test service start with mocked systemctl calls"""
# Mock successful systemctl calls
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = ""
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
name = "test-service"
command = ["echo", "test"]
service_name = service_runner.start_service(
name, command, description="Test service"
)
assert service_name == name
# Verify systemctl calls were made
assert mock_run.call_count >= 2 # At least daemon-reload, enable, start
@patch("clan_lib.service_runner.systemd_user.run")
def test_stop_service_mocked(
self, mock_run: MagicMock, service_runner: SystemdUserService
) -> None:
"""Test service stop with mocked systemctl calls"""
# First create a unit file
name = "test-service"
command = ["echo", "test"]
unit_file = service_runner._create_unit_file(name, command)
# Mock successful systemctl calls
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = ""
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
success = service_runner.stop_service(name)
assert success is True
# Check unit file was removed
assert not unit_file.exists()
@patch("clan_lib.service_runner.systemd_user.run")
def test_restart_service_mocked(
self, mock_run: MagicMock, service_runner: SystemdUserService
) -> None:
"""Test service restart with mocked systemctl calls"""
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = ""
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
name = "test-service"
success = service_runner.restart_service(name)
assert success is True
@patch("clan_lib.service_runner.systemd_user.run")
def test_logs_service_mocked(
self, mock_run: MagicMock, service_runner: SystemdUserService
) -> None:
"""Test getting service logs with mocked journalctl"""
expected_logs = "Test log output\nAnother log line"
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = expected_logs
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
name = "test-service"
logs = service_runner.get_service_logs(name, lines=25)
assert logs == expected_logs
mock_run.assert_called_once()
# Check journalctl command structure
call_args = mock_run.call_args[0][0]
assert "journalctl" in call_args
assert "--user" in call_args
assert "-n" in call_args
assert "25" in call_args
def test_list_services_empty(self, service_runner: SystemdUserService) -> None:
"""Test listing services when none exist"""
services = service_runner.list_running_services()
assert services == []
def test_list_services_with_unit_files(
self, service_runner: SystemdUserService
) -> None:
"""Test listing services when unit files exist"""
# Create some mock unit files
systemd_dir = service_runner.user_systemd_dir
unit1 = systemd_dir / "service-runner-test1.service"
unit1.write_text("""[Unit]
Description=Test Service 1
[Service]
ExecStart=echo test1
[Install]
WantedBy=default.target
""")
unit2 = systemd_dir / "service-runner-test2.service"
unit2.write_text("""[Unit]
Description=Test Service 2
[Service]
ExecStart=python3 -c "print('test')"
[Install]
WantedBy=default.target
""")
with patch("clan_lib.service_runner.systemd_user.run") as mock_run:
mock_cmd_out = MagicMock()
mock_cmd_out.returncode = 0
mock_cmd_out.stdout = "inactive"
mock_cmd_out.stderr = ""
mock_run.return_value = mock_cmd_out
services = service_runner.list_running_services()
assert len(services) == 2
service_names = [s["service_name"] for s in services]
assert "service-runner-test1" in service_names
assert "service-runner-test2" in service_names
# Check command extraction
commands = [s["command"] for s in services]
assert "echo test1" in commands
assert "python3 -c \"print('test')\"" in commands