service_runner: add grouping feature
This commit is contained in:
@@ -86,6 +86,7 @@ in
|
||||
|
||||
# Container Tests
|
||||
nixos-test-container = self.clanLib.test.containerTest ./container nixosTestArgs;
|
||||
nixos-systemd-abstraction = self.clanLib.test.containerTest ./systemd-abstraction nixosTestArgs;
|
||||
nixos-test-user-firewall-iptables = self.clanLib.test.containerTest ./user-firewall/iptables.nix nixosTestArgs;
|
||||
nixos-test-user-firewall-nftables = self.clanLib.test.containerTest ./user-firewall/nftables.nix nixosTestArgs;
|
||||
nixos-test-extra-python-packages = self.clanLib.test.containerTest ./test-extra-python-packages nixosTestArgs;
|
||||
|
||||
67
checks/systemd-abstraction/default.nix
Normal file
67
checks/systemd-abstraction/default.nix
Normal file
@@ -0,0 +1,67 @@
|
||||
{ self, pkgs, ... }:
|
||||
|
||||
let
|
||||
|
||||
cli = self.packages.${pkgs.hostPlatform.system}.clan-cli-full;
|
||||
in
|
||||
{
|
||||
name = "systemd-abstraction";
|
||||
|
||||
nodes = {
|
||||
peer1 = {
|
||||
|
||||
users.users.text-user = {
|
||||
isNormalUser = true;
|
||||
linger = true;
|
||||
uid = 1000;
|
||||
extraGroups = [ "systemd-journal" ];
|
||||
};
|
||||
|
||||
# Set environment variables for user systemd
|
||||
environment.extraInit = ''
|
||||
if [ "$(id -u)" = "1000" ]; then
|
||||
export XDG_RUNTIME_DIR="/run/user/1000"
|
||||
export DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/1000/bus"
|
||||
fi
|
||||
'';
|
||||
|
||||
# Enable PAM for user systemd sessions
|
||||
security.pam.services.systemd-user = {
|
||||
startSession = true;
|
||||
# Workaround for containers - use pam_permit to avoid helper binary issues
|
||||
text = pkgs.lib.mkForce ''
|
||||
account required pam_permit.so
|
||||
session required pam_permit.so
|
||||
session required pam_env.so conffile=/etc/pam/environment readenv=0
|
||||
session required ${pkgs.systemd}/lib/security/pam_systemd.so
|
||||
'';
|
||||
};
|
||||
|
||||
environment.systemPackages = [
|
||||
cli
|
||||
(cli.pythonRuntime.withPackages (
|
||||
ps: with ps; [
|
||||
pytest
|
||||
pytest-xdist
|
||||
]
|
||||
))
|
||||
];
|
||||
};
|
||||
};
|
||||
|
||||
testScript =
|
||||
{ ... }:
|
||||
''
|
||||
start_all()
|
||||
|
||||
peer1.wait_for_unit("multi-user.target")
|
||||
peer1.wait_for_unit("user@1000.service")
|
||||
|
||||
# Fix user journal permissions so text-user can read their own logs
|
||||
peer1.succeed("chown text-user:systemd-journal /var/log/journal/*/user-1000.journal*")
|
||||
peer1.succeed("chmod 640 /var/log/journal/*/user-1000.journal*")
|
||||
|
||||
# Run tests as text-user (environment variables are set automatically)
|
||||
peer1.succeed("su - text-user -c 'pytest -s -n0 ${cli}/${cli.pythonRuntime.sitePackages}/clan_lib/service_runner'")
|
||||
'';
|
||||
}
|
||||
381
pkgs/clan-cli/clan_lib/service_runner/container_test.py
Executable file
381
pkgs/clan-cli/clan_lib/service_runner/container_test.py
Executable file
@@ -0,0 +1,381 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Test suite for service runner with group functionality."""
|
||||
|
||||
# Allow assert statements and magic values in test code
|
||||
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
from contextlib import suppress
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from clan_lib.errors import ClanError
|
||||
from clan_lib.service_runner import create_service_manager
|
||||
from clan_lib.service_runner.protocols import ServiceManagerProtocol
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service_manager() -> Generator[ServiceManagerProtocol]:
|
||||
"""Create a service manager and ensure cleanup after test."""
|
||||
manager = create_service_manager()
|
||||
|
||||
# List of services that might be created during tests
|
||||
test_services = [
|
||||
"simple-service",
|
||||
"nginx-service",
|
||||
"api-service",
|
||||
"postgres-service",
|
||||
"autostart-service",
|
||||
"log-test",
|
||||
"restart-test",
|
||||
]
|
||||
|
||||
# Test groups that might be created
|
||||
test_groups = ["web", "database"]
|
||||
|
||||
# Yield the manager to the test
|
||||
yield manager
|
||||
|
||||
# Cleanup after test (runs even if test fails)
|
||||
for service in test_services:
|
||||
with suppress(ClanError):
|
||||
manager.stop_service(service)
|
||||
|
||||
for group in test_groups:
|
||||
with suppress(ClanError):
|
||||
manager.stop_services_by_group(group)
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_transient_service(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test transient service (no autostart, uses systemd-run)."""
|
||||
# Start a transient service
|
||||
name = service_manager.start_service(
|
||||
name="simple-service",
|
||||
command=["sleep", "300"],
|
||||
description="A simple transient service",
|
||||
autostart=False,
|
||||
)
|
||||
assert name == "simple-service", f"Expected 'simple-service', got {name}"
|
||||
|
||||
# Give systemd time to start the service
|
||||
time.sleep(0.5)
|
||||
|
||||
# Check status
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status == "running", f"Expected 'running', got {status}"
|
||||
|
||||
# Verify it's listed
|
||||
services = service_manager.list_running_services()
|
||||
service_names = [s["service_name"] for s in services]
|
||||
assert "service-runner-simple-service" in service_names, "Service not in list"
|
||||
|
||||
# Check it's marked as transient (no unit file)
|
||||
simple_service = next(
|
||||
s for s in services if s["service_name"] == "service-runner-simple-service"
|
||||
)
|
||||
assert simple_service["unit_file"] == "(transient)", (
|
||||
f"Should be transient, got {simple_service['unit_file']!r}"
|
||||
)
|
||||
|
||||
# Stop the service
|
||||
service_manager.stop_service("simple-service")
|
||||
|
||||
# Verify it's stopped
|
||||
time.sleep(0.5)
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status in ("stopped", "unknown"), f"Expected stopped/unknown, got {status}"
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_autostart_service(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test autostart service (creates persistent unit file)."""
|
||||
# Start an autostart service
|
||||
service_manager.start_service(
|
||||
name="autostart-service",
|
||||
command=["sleep", "300"],
|
||||
description="An autostart service",
|
||||
autostart=True,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Check status
|
||||
status = service_manager.get_status("autostart-service")
|
||||
assert status == "running", f"Expected 'running', got {status}"
|
||||
|
||||
# Verify it has a unit file (not transient)
|
||||
services = service_manager.list_running_services()
|
||||
autostart_service = next(
|
||||
s for s in services if s["service_name"] == "service-runner-autostart-service"
|
||||
)
|
||||
assert autostart_service["unit_file"] != "(transient)", "Should have unit file"
|
||||
assert autostart_service["unit_file"].endswith(".service"), (
|
||||
"Should be .service file"
|
||||
)
|
||||
|
||||
# Verify unit file exists
|
||||
unit_file = Path(autostart_service["unit_file"])
|
||||
assert unit_file.exists(), f"Unit file should exist: {unit_file}"
|
||||
|
||||
# Stop and verify unit file is removed
|
||||
service_manager.stop_service("autostart-service")
|
||||
|
||||
time.sleep(0.5)
|
||||
assert not unit_file.exists(), f"Unit file should be removed: {unit_file}"
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_grouped_services(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test services with groups."""
|
||||
# Start services in the "web" group
|
||||
service_manager.start_service(
|
||||
name="nginx-service",
|
||||
command=["sleep", "300"],
|
||||
description="Web server",
|
||||
autostart=True,
|
||||
group="web",
|
||||
)
|
||||
|
||||
service_manager.start_service(
|
||||
name="api-service",
|
||||
command=["sleep", "300"],
|
||||
description="API server",
|
||||
autostart=True,
|
||||
group="web",
|
||||
)
|
||||
|
||||
# Start service in "database" group
|
||||
service_manager.start_service(
|
||||
name="postgres-service",
|
||||
command=["sleep", "300"],
|
||||
description="Database server",
|
||||
autostart=True,
|
||||
group="database",
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify all services are running
|
||||
all_services = service_manager.list_running_services()
|
||||
service_names = {s["service_name"] for s in all_services}
|
||||
assert "service-runner-nginx-service" in service_names
|
||||
assert "service-runner-api-service" in service_names
|
||||
assert "service-runner-postgres-service" in service_names
|
||||
|
||||
# List services by group
|
||||
web_services = service_manager.list_services_by_group("web")
|
||||
assert len(web_services) == 2, f"Expected 2 web services, got {len(web_services)}"
|
||||
web_service_names = {s["service_name"] for s in web_services}
|
||||
assert "service-runner-nginx-service" in web_service_names
|
||||
assert "service-runner-api-service" in web_service_names
|
||||
|
||||
db_services = service_manager.list_services_by_group("database")
|
||||
assert len(db_services) == 1, f"Expected 1 db service, got {len(db_services)}"
|
||||
assert db_services[0]["service_name"] == "service-runner-postgres-service"
|
||||
assert db_services[0]["group"] == "database"
|
||||
|
||||
# Verify all grouped services have unit files
|
||||
for service in web_services + db_services:
|
||||
assert service["unit_file"] != "(transient)", (
|
||||
f"{service['service_name']} should have unit file"
|
||||
)
|
||||
assert service["status"] == "active", (
|
||||
f"{service['service_name']} should be active"
|
||||
)
|
||||
|
||||
# Stop services by group
|
||||
service_manager.stop_services_by_group("web")
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify web services are stopped
|
||||
web_services_after = service_manager.list_services_by_group("web")
|
||||
assert len(web_services_after) == 0, "Web services should be stopped"
|
||||
|
||||
# Verify database service is still running
|
||||
db_services_after = service_manager.list_services_by_group("database")
|
||||
assert len(db_services_after) == 1, "Database service should still be running"
|
||||
|
||||
# Clean up database group
|
||||
service_manager.stop_services_by_group("database")
|
||||
|
||||
time.sleep(0.5)
|
||||
db_services_final = service_manager.list_services_by_group("database")
|
||||
assert len(db_services_final) == 0, "Database services should be stopped"
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_service_logs(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test retrieving service logs."""
|
||||
# Start a service
|
||||
service_manager.start_service(
|
||||
name="log-test",
|
||||
command=["sleep", "300"],
|
||||
description="Log test service",
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Get logs - just verify we can retrieve them (may be empty)
|
||||
logs = service_manager.get_service_logs("log-test", lines=20)
|
||||
assert isinstance(logs, str), "Logs should be a string"
|
||||
|
||||
# Clean up
|
||||
service_manager.stop_service("log-test")
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_nonexistent_group(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test listing services in nonexistent group."""
|
||||
# List services in nonexistent group
|
||||
services = service_manager.list_services_by_group("nonexistent-group")
|
||||
assert services == [], f"Expected empty list, got {services}"
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_restart_service(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test restarting a service."""
|
||||
# Start a service
|
||||
service_manager.start_service(
|
||||
name="restart-test",
|
||||
command=["sleep", "300"],
|
||||
description="Restart test service",
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify it's running
|
||||
status = service_manager.get_status("restart-test")
|
||||
assert status == "running", f"Expected 'running', got {status}"
|
||||
|
||||
# Restart it
|
||||
service_manager.restart_service("restart-test")
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify it's still running
|
||||
status = service_manager.get_status("restart-test")
|
||||
assert status == "running", f"Expected 'running' after restart, got {status}"
|
||||
|
||||
# Clean up
|
||||
service_manager.stop_service("restart-test")
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_cleanup_on_failure(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test that services are cleaned up even when test fails."""
|
||||
# Start a service
|
||||
service_manager.start_service(
|
||||
name="simple-service",
|
||||
command=["sleep", "300"],
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify it's running
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status == "running"
|
||||
|
||||
# Service will be cleaned up by fixture even if we don't explicitly stop it
|
||||
# This test passes, demonstrating that cleanup happens automatically
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_start_service_twice_transient(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test starting the same transient service twice (should fail or replace)."""
|
||||
# Start a transient service
|
||||
service_manager.start_service(
|
||||
name="simple-service",
|
||||
command=["sleep", "300"],
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify it's running
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status == "running"
|
||||
|
||||
# Try to start the same service again - this should fail
|
||||
# systemd won't allow starting a unit with the same name
|
||||
with pytest.raises(ClanError, match="Failed to start service"):
|
||||
service_manager.start_service(
|
||||
name="simple-service",
|
||||
command=["sleep", "300"],
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
# Original service should still be running
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status == "running"
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_start_service_twice_autostart(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test starting the same autostart service twice (just restarts it)."""
|
||||
# Start an autostart service
|
||||
service_manager.start_service(
|
||||
name="autostart-service",
|
||||
command=["sleep", "300"],
|
||||
autostart=True,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Verify it's running
|
||||
status = service_manager.get_status("autostart-service")
|
||||
assert status == "running"
|
||||
|
||||
# Try to start the same service again
|
||||
# For autostart services, systemd will just restart the service
|
||||
# (unlike transient services which fail)
|
||||
service_manager.start_service(
|
||||
name="autostart-service",
|
||||
command=["sleep", "300"],
|
||||
autostart=True,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
# Service should still be running after "restart"
|
||||
status = service_manager.get_status("autostart-service")
|
||||
assert status == "running"
|
||||
|
||||
|
||||
@pytest.mark.service_runner
|
||||
def test_start_stopped_service_again(service_manager: ServiceManagerProtocol) -> None:
|
||||
"""Test starting a service, stopping it, then starting it again."""
|
||||
# Start a service
|
||||
service_manager.start_service(
|
||||
name="simple-service",
|
||||
command=["sleep", "300"],
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status == "running"
|
||||
|
||||
# Stop the service
|
||||
service_manager.stop_service("simple-service")
|
||||
time.sleep(0.5)
|
||||
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status in ("stopped", "unknown")
|
||||
|
||||
# Start the service again with a different command - this should work
|
||||
service_manager.start_service(
|
||||
name="simple-service",
|
||||
command=["sleep", "600"],
|
||||
description="Restarted service with different command",
|
||||
autostart=False,
|
||||
)
|
||||
|
||||
time.sleep(0.5)
|
||||
status = service_manager.get_status("simple-service")
|
||||
assert status == "running"
|
||||
@@ -2,11 +2,11 @@
|
||||
|
||||
import platform
|
||||
from pathlib import Path
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from clan_lib.errors import ClanError
|
||||
|
||||
from .systemd_user import ServiceStatus
|
||||
from .systemd_user import GroupedServiceInfo, ServiceInfo, ServiceStatus
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
@@ -18,9 +18,10 @@ class ServiceManagerProtocol(Protocol):
|
||||
name: str,
|
||||
command: list[str],
|
||||
working_dir: Path | None = None,
|
||||
extra_env_vars: dict[str, str] | None = None,
|
||||
env_vars: dict[str, str] | None = None,
|
||||
description: str | None = None,
|
||||
autostart: bool = False,
|
||||
group: str | None = None,
|
||||
) -> str:
|
||||
"""Start a service with the given configuration.
|
||||
|
||||
@@ -28,9 +29,10 @@ class ServiceManagerProtocol(Protocol):
|
||||
name: Service identifier
|
||||
command: Command and arguments to run
|
||||
working_dir: Working directory for the service
|
||||
extra_env_vars: Additional environment variables
|
||||
env_vars: Environment variables for the service
|
||||
description: Human-readable service description
|
||||
autostart: Whether to enable service on boot
|
||||
group: Optional group name for service grouping
|
||||
|
||||
Returns:
|
||||
Service name/identifier
|
||||
@@ -41,17 +43,14 @@ class ServiceManagerProtocol(Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def stop_service(self, name: str) -> bool:
|
||||
def stop_service(self, name: str) -> None:
|
||||
"""Stop and remove a service.
|
||||
|
||||
Args:
|
||||
name: Service identifier
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
|
||||
Raises:
|
||||
ClanError: If name is empty
|
||||
ClanError: If name is empty or operation fails
|
||||
|
||||
"""
|
||||
...
|
||||
@@ -71,17 +70,14 @@ class ServiceManagerProtocol(Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def restart_service(self, name: str) -> bool:
|
||||
def restart_service(self, name: str) -> None:
|
||||
"""Restart a service.
|
||||
|
||||
Args:
|
||||
name: Service identifier
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
|
||||
Raises:
|
||||
ClanError: If name is empty
|
||||
ClanError: If name is empty or operation fails
|
||||
|
||||
"""
|
||||
...
|
||||
@@ -102,7 +98,7 @@ class ServiceManagerProtocol(Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def list_running_services(self) -> list[dict[str, Any]]:
|
||||
def list_running_services(self) -> list[ServiceInfo]:
|
||||
"""List all services managed by this backend.
|
||||
|
||||
Returns:
|
||||
@@ -111,6 +107,33 @@ class ServiceManagerProtocol(Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def list_services_by_group(self, group: str) -> list[GroupedServiceInfo]:
|
||||
"""List all services in the specified group.
|
||||
|
||||
Args:
|
||||
group: Group name to filter by
|
||||
|
||||
Returns:
|
||||
List of service information dictionaries for the group
|
||||
|
||||
Raises:
|
||||
ClanError: If group name is empty
|
||||
|
||||
"""
|
||||
...
|
||||
|
||||
def stop_services_by_group(self, group: str) -> None:
|
||||
"""Stop all services in the specified group.
|
||||
|
||||
Args:
|
||||
group: Group name to stop services for
|
||||
|
||||
Raises:
|
||||
ClanError: If group name is empty or operation fails
|
||||
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
def create_service_manager() -> ServiceManagerProtocol:
|
||||
"""Create a platform-appropriate service manager.
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import textwrap
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager, suppress
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
from typing import TYPE_CHECKING, Literal, TypedDict
|
||||
|
||||
from clan_lib.cmd import RunOpts, run
|
||||
from clan_lib.errors import ClanError
|
||||
@@ -17,9 +14,28 @@ if TYPE_CHECKING:
|
||||
ServiceStatus = Literal["running", "stopped", "failed", "unknown"]
|
||||
|
||||
|
||||
class ServiceInfo(TypedDict):
|
||||
"""Information about a running service."""
|
||||
|
||||
service_name: str
|
||||
status: str
|
||||
command: str
|
||||
unit_file: str
|
||||
|
||||
|
||||
class GroupedServiceInfo(TypedDict):
|
||||
"""Information about a service in a group."""
|
||||
|
||||
service_name: str
|
||||
status: str
|
||||
command: str
|
||||
unit_file: str
|
||||
group: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SystemdUserService:
|
||||
"""Manages systemd user services by name"""
|
||||
"""Manages systemd user services using systemd-run for transient units."""
|
||||
|
||||
user_systemd_dir: Path
|
||||
|
||||
@@ -27,27 +43,125 @@ class SystemdUserService:
|
||||
self.user_systemd_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _service_name(self, name: str) -> str:
|
||||
"""Generate service name from given name"""
|
||||
return f"service-runner-{name}"
|
||||
|
||||
def _target_name(self, group: str) -> str:
|
||||
return f"service-runner-{group}"
|
||||
|
||||
def _target_file_path(self, group: str) -> Path:
|
||||
return self.user_systemd_dir / f"{self._target_name(group)}.target"
|
||||
|
||||
def _unit_file_path(self, name: str) -> Path:
|
||||
"""Get the path to the systemd unit file for this service name"""
|
||||
service_name = self._service_name(name)
|
||||
return self.user_systemd_dir / f"{service_name}.service"
|
||||
return self.user_systemd_dir / f"{self._service_name(name)}.service"
|
||||
|
||||
@contextmanager
|
||||
def _cleanup_on_error(self, unit_file: Path) -> Generator[None]:
|
||||
"""Context manager to clean up created files if an exception occurs"""
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
# Clean up the unit file if it was created
|
||||
if unit_file.exists():
|
||||
with suppress(OSError):
|
||||
unit_file.unlink()
|
||||
raise
|
||||
def _validate_name(self, name: str, type_name: str = "Service") -> None:
|
||||
if not name:
|
||||
msg = f"{type_name} name cannot be empty"
|
||||
raise ClanError(msg)
|
||||
|
||||
def _create_unit_file(
|
||||
def _check_executable(self, command: list[str]) -> str:
|
||||
executable = shutil.which(command[0])
|
||||
if not executable:
|
||||
msg = f"Executable not found: {command[0]}"
|
||||
raise ClanError(msg)
|
||||
return executable
|
||||
|
||||
def _systemctl(self, action: str, service_name: str) -> "CmdOut":
|
||||
"""Run systemctl command with --user flag."""
|
||||
return run(
|
||||
["systemctl", "--user", action, f"{service_name}.service"],
|
||||
RunOpts(check=False),
|
||||
)
|
||||
|
||||
def _get_property(self, service_name: str, prop: str) -> str:
|
||||
"""Get a systemd unit property value."""
|
||||
result = run(
|
||||
[
|
||||
"systemctl",
|
||||
"--user",
|
||||
"show",
|
||||
f"{service_name}.service",
|
||||
f"--property={prop}",
|
||||
"--no-pager",
|
||||
],
|
||||
RunOpts(check=False),
|
||||
)
|
||||
prefix = f"{prop}="
|
||||
for line in result.stdout.split("\n"):
|
||||
if line.startswith(prefix):
|
||||
return line[len(prefix) :].strip()
|
||||
return ""
|
||||
|
||||
def _create_target_file(self, group: str) -> None:
|
||||
"""Create systemd target file for a group if it doesn't exist."""
|
||||
target_file = self._target_file_path(group)
|
||||
if target_file.exists():
|
||||
return
|
||||
|
||||
content = textwrap.dedent(
|
||||
f"""
|
||||
[Unit]
|
||||
Description=Service runner group: {group}
|
||||
After=multi-user.target
|
||||
"""
|
||||
)
|
||||
target_file.touch(exist_ok=True)
|
||||
target_file.chmod(0o600)
|
||||
target_file.write_text(content)
|
||||
run(["systemctl", "--user", "daemon-reload"])
|
||||
|
||||
def _create_autostart_unit(
|
||||
self,
|
||||
name: str,
|
||||
command: list[str],
|
||||
working_dir: Path | None,
|
||||
env_vars: dict[str, str] | None,
|
||||
description: str | None,
|
||||
group: str | None,
|
||||
) -> None:
|
||||
"""Create persistent unit file for autostart services."""
|
||||
executable = self._check_executable(command)
|
||||
exec_start = f"{executable} {' '.join(shlex.quote(arg) for arg in command[1:])}"
|
||||
description = description or f"Service runner for {command[0]}"
|
||||
|
||||
content = textwrap.dedent(
|
||||
f"""
|
||||
[Unit]
|
||||
Description={description}
|
||||
After=multi-user.target
|
||||
"""
|
||||
)
|
||||
|
||||
if group:
|
||||
content += f"PartOf={self._target_name(group)}.target\n"
|
||||
|
||||
content += textwrap.dedent(
|
||||
f"""
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart={exec_start}
|
||||
"""
|
||||
)
|
||||
|
||||
if working_dir:
|
||||
content += f"WorkingDirectory={working_dir}\n"
|
||||
|
||||
for key, value in (env_vars or {}).items():
|
||||
content += f"Environment={key}={shlex.quote(value)}\n"
|
||||
|
||||
content += textwrap.dedent(
|
||||
f"""
|
||||
[Install]
|
||||
WantedBy={self._target_name(group) if group else "default"}.target
|
||||
"""
|
||||
)
|
||||
|
||||
unit_file = self._unit_file_path(name)
|
||||
unit_file.touch(exist_ok=True)
|
||||
unit_file.chmod(0o600)
|
||||
unit_file.write_text(content)
|
||||
|
||||
def start_service(
|
||||
self,
|
||||
name: str,
|
||||
command: list[str],
|
||||
@@ -55,253 +169,238 @@ class SystemdUserService:
|
||||
env_vars: dict[str, str] | None = None,
|
||||
description: str | None = None,
|
||||
autostart: bool = False,
|
||||
) -> Path:
|
||||
"""Create systemd unit file for the given command"""
|
||||
unit_file = self._unit_file_path(name)
|
||||
|
||||
with self._cleanup_on_error(unit_file):
|
||||
executable = shutil.which(command[0])
|
||||
if not executable:
|
||||
msg = f"Executable not found: {command[0]}"
|
||||
raise ClanError(msg)
|
||||
exec_start = f"{executable} {' '.join(command[1:])}"
|
||||
|
||||
if not description:
|
||||
description = f"Service runner for {shlex.quote(command[0])}"
|
||||
|
||||
unit_content = textwrap.dedent(
|
||||
f"""
|
||||
[Unit]
|
||||
Description="{description}"
|
||||
After=multi-user.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart={exec_start}
|
||||
"""
|
||||
)
|
||||
|
||||
if working_dir:
|
||||
unit_content += f"WorkingDirectory={working_dir}\n"
|
||||
|
||||
if env_vars:
|
||||
for key, value in env_vars.items():
|
||||
# Properly quote the value for systemd
|
||||
quoted_value = shlex.quote(value)
|
||||
unit_content += f"Environment={key}={quoted_value}\n"
|
||||
|
||||
if autostart:
|
||||
unit_content += textwrap.dedent(
|
||||
"""
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
"""
|
||||
)
|
||||
|
||||
unit_file.touch(exist_ok=True)
|
||||
unit_file.chmod(0o600)
|
||||
with unit_file.open("w") as f:
|
||||
f.write(unit_content)
|
||||
|
||||
return unit_file
|
||||
|
||||
def _run_systemctl(self, action: str, service_name: str) -> "CmdOut":
|
||||
"""Run systemctl command with --user flag"""
|
||||
cmd = ["systemctl", "--user", action, f"{service_name}.service"]
|
||||
return run(cmd, RunOpts(check=False))
|
||||
|
||||
def start_service(
|
||||
self,
|
||||
name: str,
|
||||
command: list[str],
|
||||
working_dir: Path | None = None,
|
||||
extra_env_vars: dict[str, str] | None = None,
|
||||
description: str | None = None,
|
||||
autostart: bool = False,
|
||||
group: str | None = None,
|
||||
) -> str:
|
||||
"""Start a systemd user service for the given command.
|
||||
Returns the service name.
|
||||
"""Start a systemd user service.
|
||||
|
||||
autostart=False: Uses systemd-run (transient, no files).
|
||||
autostart=True: Creates unit files (persistent across reboots).
|
||||
"""
|
||||
self._validate_name(name)
|
||||
if not command:
|
||||
msg = "Command cannot be empty"
|
||||
raise ClanError(msg)
|
||||
if not name:
|
||||
msg = "Service name cannot be empty"
|
||||
raise ClanError(msg)
|
||||
|
||||
service_name = self._service_name(name)
|
||||
self._check_executable(command)
|
||||
|
||||
# Collect essential environment variables for user services
|
||||
env_vars = {}
|
||||
|
||||
# Essential variables that user services typically need
|
||||
essential_vars = [
|
||||
"PATH",
|
||||
"HOME",
|
||||
"USER",
|
||||
"LOGNAME",
|
||||
"XDG_CONFIG_HOME",
|
||||
"XDG_DATA_HOME",
|
||||
"XDG_CACHE_HOME",
|
||||
"XDG_RUNTIME_DIR",
|
||||
"XDG_SESSION_ID",
|
||||
"XDG_SESSION_TYPE",
|
||||
"DBUS_SESSION_BUS_ADDRESS",
|
||||
"SSH_AUTH_SOCK",
|
||||
"SSH_AGENT_PID",
|
||||
"GPG_AGENT_INFO",
|
||||
"GNUPGHOME",
|
||||
]
|
||||
|
||||
# Add essential vars if they exist in the current environment
|
||||
for var in essential_vars:
|
||||
value = os.environ.get(var)
|
||||
if value is not None:
|
||||
env_vars[var] = value
|
||||
|
||||
# Allow extra_env_vars to override defaults
|
||||
env_vars.update(extra_env_vars or {})
|
||||
|
||||
# Create the unit file
|
||||
self._create_unit_file(
|
||||
name, command, working_dir, env_vars, description, autostart
|
||||
)
|
||||
|
||||
run(["systemctl", "--user", "daemon-reload"])
|
||||
|
||||
# Enable the service only if autostart is True
|
||||
if autostart:
|
||||
result = self._run_systemctl("enable", service_name)
|
||||
if group:
|
||||
self._create_target_file(group)
|
||||
self._create_autostart_unit(
|
||||
name, command, working_dir, env_vars, description, group
|
||||
)
|
||||
run(["systemctl", "--user", "daemon-reload"])
|
||||
|
||||
result = self._systemctl("enable", service_name)
|
||||
if result.returncode != 0:
|
||||
msg = f"Failed to enable service: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
|
||||
# Start the service
|
||||
result = self._run_systemctl("start", service_name)
|
||||
if result.returncode != 0:
|
||||
msg = f"Failed to start service: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
result = self._systemctl("start", service_name)
|
||||
if result.returncode != 0:
|
||||
msg = f"Failed to start service: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
# Use systemd-run for transient services
|
||||
desc = description or f"Service runner for {command[0]}"
|
||||
cmd = [
|
||||
"systemd-run",
|
||||
"--user",
|
||||
f"--unit={service_name}",
|
||||
f"--description={desc}",
|
||||
]
|
||||
|
||||
if working_dir:
|
||||
cmd.append(f"--working-directory={working_dir}")
|
||||
|
||||
for key, value in (env_vars or {}).items():
|
||||
cmd.append(f"--setenv={key}={value}")
|
||||
|
||||
if group:
|
||||
self._create_target_file(group)
|
||||
cmd.append(f"--property=PartOf={self._target_name(group)}.target")
|
||||
|
||||
cmd.extend(command)
|
||||
|
||||
result = run(cmd, RunOpts(check=False))
|
||||
if result.returncode != 0:
|
||||
msg = f"Failed to start service: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
|
||||
return name
|
||||
|
||||
def stop_service(self, name: str) -> bool:
|
||||
"""Stop the systemd user service for the given name.
|
||||
Returns True if successful, False otherwise.
|
||||
"""
|
||||
if not name:
|
||||
msg = "Service name cannot be empty"
|
||||
raise ClanError(msg)
|
||||
|
||||
def stop_service(self, name: str) -> None:
|
||||
"""Stop a systemd user service."""
|
||||
self._validate_name(name)
|
||||
service_name = self._service_name(name)
|
||||
|
||||
# Stop the service
|
||||
result = self._run_systemctl("stop", service_name)
|
||||
if result.returncode != 0:
|
||||
return False
|
||||
result = self._systemctl("stop", service_name)
|
||||
if result.returncode != 0 and "not loaded" not in result.stderr.lower():
|
||||
msg = f"Failed to stop service: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
|
||||
# Disable the service
|
||||
result = self._run_systemctl("disable", service_name)
|
||||
if result.returncode != 0:
|
||||
return False
|
||||
self._systemctl("disable", service_name) # Ignore errors for transient units
|
||||
|
||||
# Remove the unit file
|
||||
unit_file = self._unit_file_path(name)
|
||||
try:
|
||||
if unit_file.exists():
|
||||
unit_file.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False))
|
||||
|
||||
return True
|
||||
run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False))
|
||||
|
||||
def get_status(self, name: str) -> ServiceStatus:
|
||||
"""Get the status of the service for the given name"""
|
||||
if not name:
|
||||
msg = "Service name cannot be empty"
|
||||
"""Get the status of a service."""
|
||||
self._validate_name(name)
|
||||
result = self._systemctl("is-active", self._service_name(name))
|
||||
status_map: dict[str, ServiceStatus] = {
|
||||
"active": "running",
|
||||
"inactive": "stopped",
|
||||
"failed": "failed",
|
||||
}
|
||||
return status_map.get(result.stdout.strip(), "unknown")
|
||||
|
||||
def restart_service(self, name: str) -> None:
|
||||
"""Restart a service."""
|
||||
self._validate_name(name)
|
||||
result = self._systemctl("restart", self._service_name(name))
|
||||
if result.returncode != 0:
|
||||
msg = f"Failed to restart service: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
|
||||
service_name = self._service_name(name)
|
||||
|
||||
# Check if unit file exists
|
||||
unit_file = self._unit_file_path(name)
|
||||
if not unit_file.exists():
|
||||
return "unknown"
|
||||
|
||||
result = self._run_systemctl("is-active", service_name)
|
||||
status_output = result.stdout.strip()
|
||||
|
||||
if status_output == "active":
|
||||
return "running"
|
||||
if status_output == "inactive":
|
||||
return "stopped"
|
||||
if status_output == "failed":
|
||||
return "failed"
|
||||
return "unknown"
|
||||
|
||||
def restart_service(self, name: str) -> bool:
|
||||
"""Restart the service for the given name"""
|
||||
if not name:
|
||||
msg = "Service name cannot be empty"
|
||||
raise ClanError(msg)
|
||||
|
||||
service_name = self._service_name(name)
|
||||
|
||||
result = self._run_systemctl("restart", service_name)
|
||||
return result.returncode == 0
|
||||
|
||||
def get_service_logs(self, name: str, lines: int = 50) -> str:
|
||||
"""Get recent logs for the service"""
|
||||
if not name:
|
||||
msg = "Service name cannot be empty"
|
||||
raise ClanError(msg)
|
||||
"""Get recent logs for a service."""
|
||||
self._validate_name(name)
|
||||
result = run(
|
||||
[
|
||||
"journalctl",
|
||||
"--user",
|
||||
"-u",
|
||||
f"{self._service_name(name)}.service",
|
||||
"-n",
|
||||
str(lines),
|
||||
"--no-pager",
|
||||
]
|
||||
)
|
||||
return result.stdout
|
||||
|
||||
service_name = self._service_name(name)
|
||||
def _get_service_info(self, unit_name: str) -> tuple[str, str, str]:
|
||||
"""Get status, command, and unit file for a service."""
|
||||
status = self._get_property(unit_name, "ActiveState")
|
||||
command = self._get_property(unit_name, "ExecStart")
|
||||
fragment_path = self._get_property(unit_name, "FragmentPath")
|
||||
|
||||
cmd = [
|
||||
"journalctl",
|
||||
"--user",
|
||||
"-u",
|
||||
f"{service_name}.service",
|
||||
"-n",
|
||||
str(lines),
|
||||
"--no-pager",
|
||||
]
|
||||
result = run(cmd, RunOpts(check=False))
|
||||
if result.returncode == 0:
|
||||
return result.stdout
|
||||
return f"Failed to get logs: {result.stderr}"
|
||||
# Transient units are stored in /run/user/.../systemd/transient/
|
||||
if not fragment_path or "/transient/" in fragment_path:
|
||||
unit_file = "(transient)"
|
||||
else:
|
||||
unit_file = fragment_path
|
||||
|
||||
def list_running_services(self) -> list[dict[str, Any]]:
|
||||
"""List all running service-runner services"""
|
||||
services = []
|
||||
return status, command, unit_file
|
||||
|
||||
# Get all service files
|
||||
for unit_file in self.user_systemd_dir.glob("service-runner-*.service"):
|
||||
service_name = unit_file.stem
|
||||
def list_running_services(self) -> list[ServiceInfo]:
|
||||
"""List all service-runner services."""
|
||||
result = run(
|
||||
[
|
||||
"systemctl",
|
||||
"--user",
|
||||
"list-units",
|
||||
"service-runner-*.service",
|
||||
"--all",
|
||||
"--no-legend",
|
||||
"--no-pager",
|
||||
"--plain",
|
||||
],
|
||||
RunOpts(check=False),
|
||||
)
|
||||
|
||||
# Get status
|
||||
result = self._run_systemctl("is-active", service_name)
|
||||
status = result.stdout.strip()
|
||||
|
||||
# Try to extract command from unit file
|
||||
try:
|
||||
with unit_file.open() as f:
|
||||
content = f.read()
|
||||
# Simple parsing - look for ExecStart line
|
||||
for line in content.split("\n"):
|
||||
if line.startswith("ExecStart="):
|
||||
exec_start = line[10:] # Remove "ExecStart="
|
||||
services.append(
|
||||
{
|
||||
"service_name": service_name,
|
||||
"status": status,
|
||||
"command": exec_start,
|
||||
"unit_file": str(unit_file),
|
||||
}
|
||||
)
|
||||
break
|
||||
except OSError:
|
||||
services: list[ServiceInfo] = []
|
||||
# systemctl list-units format: UNIT LOAD ACTIVE SUB DESCRIPTION
|
||||
min_required_fields = 4
|
||||
for line in result.stdout.strip().split("\n"):
|
||||
if not line.strip():
|
||||
continue
|
||||
|
||||
parts = line.split(None, 4)
|
||||
if len(parts) < min_required_fields:
|
||||
continue
|
||||
|
||||
unit_name = parts[0].replace(".service", "")
|
||||
if not unit_name.startswith("service-runner-"):
|
||||
continue
|
||||
|
||||
status, command, unit_file = self._get_service_info(unit_name)
|
||||
services.append(
|
||||
{
|
||||
"service_name": unit_name,
|
||||
"status": status,
|
||||
"command": command,
|
||||
"unit_file": unit_file,
|
||||
}
|
||||
)
|
||||
|
||||
return services
|
||||
|
||||
def list_services_by_group(self, group: str) -> list[GroupedServiceInfo]:
|
||||
"""List all services in a group."""
|
||||
self._validate_name(group, "Group")
|
||||
|
||||
if not self._target_file_path(group).exists():
|
||||
return []
|
||||
|
||||
result = run(
|
||||
[
|
||||
"systemctl",
|
||||
"--user",
|
||||
"list-dependencies",
|
||||
f"{self._target_name(group)}.target",
|
||||
"--plain",
|
||||
],
|
||||
RunOpts(check=False),
|
||||
)
|
||||
|
||||
services: list[GroupedServiceInfo] = []
|
||||
for raw_line in result.stdout.split("\n"):
|
||||
line = raw_line.strip()
|
||||
if not (line.endswith(".service") and line.startswith("service-runner-")):
|
||||
continue
|
||||
|
||||
service_name = line.replace(".service", "")
|
||||
status, command, unit_file = self._get_service_info(service_name)
|
||||
services.append(
|
||||
{
|
||||
"service_name": service_name,
|
||||
"status": status,
|
||||
"command": command,
|
||||
"unit_file": unit_file,
|
||||
"group": group,
|
||||
}
|
||||
)
|
||||
|
||||
return services
|
||||
|
||||
def stop_services_by_group(self, group: str) -> None:
|
||||
"""Stop all services in a group."""
|
||||
self._validate_name(group, "Group")
|
||||
|
||||
target_file = self._target_file_path(group)
|
||||
if not target_file.exists():
|
||||
return
|
||||
|
||||
services = self.list_services_by_group(group)
|
||||
|
||||
# Stop the target (stops all PartOf services)
|
||||
result = run(
|
||||
["systemctl", "--user", "stop", f"{self._target_name(group)}.target"],
|
||||
RunOpts(check=False),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
msg = f"Failed to stop target: {result.stderr}"
|
||||
raise ClanError(msg)
|
||||
|
||||
# Disable and remove unit files for non-transient services
|
||||
for service in services:
|
||||
if service["unit_file"] != "(transient)":
|
||||
self._systemctl("disable", service["service_name"])
|
||||
Path(service["unit_file"]).unlink(missing_ok=True)
|
||||
|
||||
target_file.unlink(missing_ok=True)
|
||||
run(["systemctl", "--user", "daemon-reload"], RunOpts(check=False))
|
||||
|
||||
@@ -1,304 +0,0 @@
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from clan_lib.errors import ClanError
|
||||
|
||||
from .systemd_user import SystemdUserService
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def service_runner(temporary_home: Path) -> SystemdUserService:
|
||||
"""Create a ServiceRunner instance with temporary home directory"""
|
||||
systemd_dir = temporary_home / ".config" / "systemd" / "user"
|
||||
return SystemdUserService(user_systemd_dir=systemd_dir)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def systemd_service(temporary_home: Path) -> SystemdUserService:
|
||||
"""Create a SystemdUserService instance with temporary home directory"""
|
||||
systemd_dir = temporary_home / ".config" / "systemd" / "user"
|
||||
return SystemdUserService(user_systemd_dir=systemd_dir)
|
||||
|
||||
|
||||
class TestSystemdUserService:
|
||||
def test_service_name_generation(self, systemd_service: SystemdUserService) -> None:
|
||||
"""Test service name generation from name"""
|
||||
name = "test-service"
|
||||
service_name = systemd_service._service_name(name)
|
||||
|
||||
assert service_name == "service-runner-test-service"
|
||||
|
||||
def test_unit_file_path(
|
||||
self, systemd_service: SystemdUserService, temporary_home: Path
|
||||
) -> None:
|
||||
"""Test unit file path generation"""
|
||||
name = "test-service"
|
||||
unit_file = systemd_service._unit_file_path(name)
|
||||
|
||||
expected_dir = temporary_home / ".config" / "systemd" / "user"
|
||||
assert unit_file.parent == expected_dir
|
||||
assert unit_file.suffix == ".service"
|
||||
assert unit_file.name == "service-runner-test-service.service"
|
||||
|
||||
def test_create_unit_file(
|
||||
self, systemd_service: SystemdUserService, temporary_home: Path
|
||||
) -> None:
|
||||
"""Test systemd unit file creation"""
|
||||
name = "test-service"
|
||||
command = ["python3", "-c", "print('test')"]
|
||||
working_dir = temporary_home
|
||||
env_vars = {"TEST_VAR": "test_value", "ANOTHER": "value"}
|
||||
description = "Test service"
|
||||
|
||||
unit_file = systemd_service._create_unit_file(
|
||||
name, command, working_dir, env_vars, description
|
||||
)
|
||||
|
||||
assert unit_file.exists()
|
||||
content = unit_file.read_text()
|
||||
|
||||
# Check basic structure
|
||||
assert "[Unit]" in content
|
||||
assert "[Service]" in content
|
||||
|
||||
# Check specific values
|
||||
assert f'Description="{description}"' in content
|
||||
assert f"WorkingDirectory={working_dir}" in content
|
||||
assert "Environment=TEST_VAR=test_value" in content
|
||||
assert "Environment=ANOTHER=value" in content
|
||||
|
||||
def test_create_unit_file_with_spaces(
|
||||
self, systemd_service: SystemdUserService
|
||||
) -> None:
|
||||
"""Test unit file creation with commands containing spaces"""
|
||||
name = "test-service"
|
||||
command = ["python3", "-c", "print('hello world')"]
|
||||
|
||||
unit_file = systemd_service._create_unit_file(name, command)
|
||||
content = unit_file.read_text()
|
||||
executable = shutil.which(command[0])
|
||||
expect = f"ExecStart={executable} -c print('hello world')"
|
||||
# Should properly escape arguments with spaces
|
||||
assert expect in content
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_run_systemctl(
|
||||
self, mock_run: MagicMock, systemd_service: SystemdUserService
|
||||
) -> None:
|
||||
"""Test systemctl command execution"""
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = "active"
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
result = systemd_service._run_systemctl("status", "test-service")
|
||||
|
||||
mock_run.assert_called_once()
|
||||
assert result.returncode == 0
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_get_status_running(
|
||||
self, mock_run: MagicMock, systemd_service: SystemdUserService
|
||||
) -> None:
|
||||
"""Test status detection for running service"""
|
||||
# Mock unit file existence
|
||||
name = "test-service"
|
||||
unit_file = systemd_service._unit_file_path(name)
|
||||
unit_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test")
|
||||
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = "active"
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
status = systemd_service.get_status(name)
|
||||
assert status == "running"
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_get_status_stopped(
|
||||
self, mock_run: MagicMock, systemd_service: SystemdUserService
|
||||
) -> None:
|
||||
"""Test status detection for stopped service"""
|
||||
# Mock unit file existence
|
||||
name = "test-service"
|
||||
unit_file = systemd_service._unit_file_path(name)
|
||||
unit_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
unit_file.write_text("[Unit]\nDescription=test\n[Service]\nExecStart=echo test")
|
||||
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = "inactive"
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
status = systemd_service.get_status(name)
|
||||
assert status == "stopped"
|
||||
|
||||
def test_get_status_unknown_no_unit_file(
|
||||
self, systemd_service: SystemdUserService
|
||||
) -> None:
|
||||
"""Test status detection when no unit file exists"""
|
||||
name = "nonexistent"
|
||||
|
||||
status = systemd_service.get_status(name)
|
||||
assert status == "unknown"
|
||||
|
||||
|
||||
class TestServiceRunner:
|
||||
def test_empty_name_raises_error(self, service_runner: SystemdUserService) -> None:
|
||||
"""Test that empty service name raises ClanError"""
|
||||
with pytest.raises(ClanError, match="Service name cannot be empty"):
|
||||
service_runner.start_service("", ["echo", "test"])
|
||||
|
||||
def test_empty_command_raises_error(
|
||||
self, service_runner: SystemdUserService
|
||||
) -> None:
|
||||
"""Test that empty command raises ClanError"""
|
||||
with pytest.raises(ClanError, match="Command cannot be empty"):
|
||||
service_runner.start_service("test-service", [])
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_start_service_mocked(
|
||||
self, mock_run: MagicMock, service_runner: SystemdUserService
|
||||
) -> None:
|
||||
"""Test service start with mocked systemctl calls"""
|
||||
# Mock successful systemctl calls
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = ""
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
name = "test-service"
|
||||
command = ["echo", "test"]
|
||||
service_name = service_runner.start_service(
|
||||
name, command, description="Test service"
|
||||
)
|
||||
|
||||
assert service_name == name
|
||||
|
||||
# Verify systemctl calls were made
|
||||
assert mock_run.call_count >= 2 # At least daemon-reload, enable, start
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_stop_service_mocked(
|
||||
self, mock_run: MagicMock, service_runner: SystemdUserService
|
||||
) -> None:
|
||||
"""Test service stop with mocked systemctl calls"""
|
||||
# First create a unit file
|
||||
name = "test-service"
|
||||
command = ["echo", "test"]
|
||||
unit_file = service_runner._create_unit_file(name, command)
|
||||
|
||||
# Mock successful systemctl calls
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = ""
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
success = service_runner.stop_service(name)
|
||||
assert success is True
|
||||
|
||||
# Check unit file was removed
|
||||
assert not unit_file.exists()
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_restart_service_mocked(
|
||||
self, mock_run: MagicMock, service_runner: SystemdUserService
|
||||
) -> None:
|
||||
"""Test service restart with mocked systemctl calls"""
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = ""
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
name = "test-service"
|
||||
success = service_runner.restart_service(name)
|
||||
|
||||
assert success is True
|
||||
|
||||
@patch("clan_lib.service_runner.systemd_user.run")
|
||||
def test_logs_service_mocked(
|
||||
self, mock_run: MagicMock, service_runner: SystemdUserService
|
||||
) -> None:
|
||||
"""Test getting service logs with mocked journalctl"""
|
||||
expected_logs = "Test log output\nAnother log line"
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = expected_logs
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
name = "test-service"
|
||||
logs = service_runner.get_service_logs(name, lines=25)
|
||||
|
||||
assert logs == expected_logs
|
||||
mock_run.assert_called_once()
|
||||
# Check journalctl command structure
|
||||
call_args = mock_run.call_args[0][0]
|
||||
assert "journalctl" in call_args
|
||||
assert "--user" in call_args
|
||||
assert "-n" in call_args
|
||||
assert "25" in call_args
|
||||
|
||||
def test_list_services_empty(self, service_runner: SystemdUserService) -> None:
|
||||
"""Test listing services when none exist"""
|
||||
services = service_runner.list_running_services()
|
||||
assert services == []
|
||||
|
||||
def test_list_services_with_unit_files(
|
||||
self, service_runner: SystemdUserService
|
||||
) -> None:
|
||||
"""Test listing services when unit files exist"""
|
||||
# Create some mock unit files
|
||||
systemd_dir = service_runner.user_systemd_dir
|
||||
|
||||
unit1 = systemd_dir / "service-runner-test1.service"
|
||||
unit1.write_text("""[Unit]
|
||||
Description=Test Service 1
|
||||
|
||||
[Service]
|
||||
ExecStart=echo test1
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
""")
|
||||
|
||||
unit2 = systemd_dir / "service-runner-test2.service"
|
||||
unit2.write_text("""[Unit]
|
||||
Description=Test Service 2
|
||||
|
||||
[Service]
|
||||
ExecStart=python3 -c "print('test')"
|
||||
|
||||
[Install]
|
||||
WantedBy=default.target
|
||||
""")
|
||||
|
||||
with patch("clan_lib.service_runner.systemd_user.run") as mock_run:
|
||||
mock_cmd_out = MagicMock()
|
||||
mock_cmd_out.returncode = 0
|
||||
mock_cmd_out.stdout = "inactive"
|
||||
mock_cmd_out.stderr = ""
|
||||
mock_run.return_value = mock_cmd_out
|
||||
|
||||
services = service_runner.list_running_services()
|
||||
|
||||
assert len(services) == 2
|
||||
|
||||
service_names = [s["service_name"] for s in services]
|
||||
assert "service-runner-test1" in service_names
|
||||
assert "service-runner-test2" in service_names
|
||||
|
||||
# Check command extraction
|
||||
commands = [s["command"] for s in services]
|
||||
assert "echo test1" in commands
|
||||
assert "python3 -c \"print('test')\"" in commands
|
||||
@@ -97,12 +97,13 @@ let
|
||||
sourceWithoutTests = cliSource (
|
||||
nixFilter.filter {
|
||||
root = ./.;
|
||||
include = [
|
||||
exclude = [
|
||||
# exclude if
|
||||
(
|
||||
_root: path: _type:
|
||||
(builtins.match ".*/test_[^/]+\.py" path) == null
|
||||
&& (builtins.match ".*/[^/]+_test\.py" path) == null
|
||||
# && (builtins.match ".*/tests/.+" path) == null
|
||||
(builtins.match ".*/test_[^/]+\.py" path) != null # matches test_*.py
|
||||
&& (builtins.match ".*/[^/]+_test\.py" path) != null # matches *_test.py
|
||||
&& (builtins.match ".*/container_test\.py" path) == null # doesn't match container_test.py
|
||||
)
|
||||
];
|
||||
}
|
||||
@@ -194,7 +195,7 @@ pythonRuntime.pkgs.buildPythonApplication {
|
||||
# limit build cores to 16
|
||||
jobs="$((NIX_BUILD_CORES>16 ? 16 : NIX_BUILD_CORES))"
|
||||
|
||||
python -m pytest -m "not impure and not with_core" -n "$jobs" \
|
||||
python -m pytest -m "not service_runner and not impure and not with_core" -n "$jobs" \
|
||||
./clan_cli \
|
||||
./clan_lib \
|
||||
--cov ./clan_cli \
|
||||
@@ -281,7 +282,7 @@ pythonRuntime.pkgs.buildPythonApplication {
|
||||
jobs="$((NIX_BUILD_CORES>16 ? 16 : NIX_BUILD_CORES))"
|
||||
|
||||
# Run all tests with core marker
|
||||
python -m pytest -m "not impure and with_core" -n "$jobs" \
|
||||
python -m pytest -m "not service_runner and not impure and with_core" -n "$jobs" \
|
||||
./clan_cli \
|
||||
./clan_lib \
|
||||
--cov ./clan_cli \
|
||||
|
||||
@@ -48,7 +48,7 @@ norecursedirs = ["clan_cli/tests/helpers", "clan_lib/nixpkgs"]
|
||||
# machines. In the CI pipeline we run these tests in a separate derivation
|
||||
# depending on clan-core. All other tests do not need to depend on clan-core
|
||||
# and can be cached more effectively.
|
||||
markers = ["impure", "with_core"]
|
||||
markers = ["impure", "with_core", "service_runner"]
|
||||
filterwarnings = "default::ResourceWarning"
|
||||
python_files = ["test_*.py", "*_test.py"]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user