Merge pull request 'clan-lib: Add llm API for clan service selection' (#5626) from Qubasa/clan-core:llm_api into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/5626
This commit is contained in:
Luis Hebendanz
2025-10-22 13:39:35 +00:00
27 changed files with 7167 additions and 167 deletions

View File

@@ -87,6 +87,7 @@ in
# Container Tests
nixos-test-container = self.clanLib.test.containerTest ./container nixosTestArgs;
nixos-systemd-abstraction = self.clanLib.test.containerTest ./systemd-abstraction nixosTestArgs;
nixos-llm-test = self.clanLib.test.containerTest ./llm nixosTestArgs;
nixos-test-user-firewall-iptables = self.clanLib.test.containerTest ./user-firewall/iptables.nix nixosTestArgs;
nixos-test-user-firewall-nftables = self.clanLib.test.containerTest ./user-firewall/nftables.nix nixosTestArgs;
nixos-test-extra-python-packages = self.clanLib.test.containerTest ./test-extra-python-packages nixosTestArgs;

83
checks/llm/default.nix Normal file
View File

@@ -0,0 +1,83 @@
{ self, pkgs, ... }:
let
cli = self.packages.${pkgs.hostPlatform.system}.clan-cli-full;
ollama-model = pkgs.callPackage ./qwen3-4b-instruct.nix { };
in
{
name = "llm";
nodes = {
peer1 =
{ pkgs, ... }:
{
users.users.text-user = {
isNormalUser = true;
linger = true;
uid = 1000;
extraGroups = [ "systemd-journal" ];
};
# Set environment variables for user systemd
environment.extraInit = ''
if [ "$(id -u)" = "1000" ]; then
export XDG_RUNTIME_DIR="/run/user/1000"
export DBUS_SESSION_BUS_ADDRESS="unix:path=/run/user/1000/bus"
ollama_dir="$HOME/.ollama"
mkdir -p "$ollama_dir"
ln -sf ${ollama-model}/models "$ollama_dir"/models
fi
'';
# Enable PAM for user systemd sessions
security.pam.services.systemd-user = {
startSession = true;
# Workaround for containers - use pam_permit to avoid helper binary issues
text = pkgs.lib.mkForce ''
account required pam_permit.so
session required pam_permit.so
session required pam_env.so conffile=/etc/pam/environment readenv=0
session required ${pkgs.systemd}/lib/security/pam_systemd.so
'';
};
environment.systemPackages = [
cli
pkgs.ollama
(cli.pythonRuntime.withPackages (
ps: with ps; [
pytest
pytest-xdist
(cli.pythonRuntime.pkgs.toPythonModule cli)
self.legacyPackages.${pkgs.hostPlatform.system}.nixosTestLib
]
))
];
};
};
testScript =
{ ... }:
''
start_all()
peer1.wait_for_unit("multi-user.target")
peer1.wait_for_unit("user@1000.service")
# Fix user journal permissions so text-user can read their own logs
peer1.succeed("chown text-user:systemd-journal /var/log/journal/*/user-1000.journal*")
peer1.succeed("chmod 640 /var/log/journal/*/user-1000.journal*")
# the -o adopts="" is needed to overwrite any args coming from pyproject.toml
# -p no:cacheprovider disables pytest's cacheprovider which tries to write to the nix store in this case
cmd = "su - text-user -c 'pytest -s -n0 -m service_runner -p no:cacheprovider -o addopts="" ${cli.passthru.sourceWithTests}/clan_lib/llm'"
print("Running tests with command: " + cmd)
# Run tests as text-user (environment variables are set automatically)
peer1.succeed(cmd)
'';
}

View File

@@ -0,0 +1,70 @@
{ pkgs }:
let
# Got them from https://github.com/Gholamrezadar/ollama-direct-downloader
# Download manifest
manifest = pkgs.fetchurl {
url = "https://registry.ollama.ai/v2/library/qwen3/manifests/4b-instruct";
# You'll need to calculate this hash - run the derivation once and it will tell you the correct hash
hash = "sha256-Dtze80WT6sGqK+nH0GxDLc+BlFrcpeyi8nZiwY8Wi6A=";
};
# Download blobs
blob1 = pkgs.fetchurl {
url = "https://registry.ollama.ai/v2/library/qwen3/blobs/sha256:b72accf9724e93698c57cbd3b1af2d3341b3d05ec2089d86d273d97964853cd2";
hash = "sha256-tyrM+XJOk2mMV8vTsa8tM0Gz0F7CCJ2G0nPZeWSFPNI=";
};
blob2 = pkgs.fetchurl {
url = "https://registry.ollama.ai/v2/library/qwen3/blobs/sha256:85e4a5b7b8ef0e48af0e8658f5aaab9c2324c76c1641493f4d1e25fce54b18b9";
hash = "sha256-heSlt7jvDkivDoZY9aqrnCMkx2wWQUk/TR4l/OVLGLk=";
};
blob3 = pkgs.fetchurl {
url = "https://registry.ollama.ai/v2/library/qwen3/blobs/sha256:eade0a07cac7712787bbce23d12f9306adb4781d873d1df6e16f7840fa37afec";
hash = "sha256-6t4KB8rHcSeHu84j0S+TBq20eB2HPR324W94QPo3r+w=";
};
blob4 = pkgs.fetchurl {
url = "https://registry.ollama.ai/v2/library/qwen3/blobs/sha256:d18a5cc71b84bc4af394a31116bd3932b42241de70c77d2b76d69a314ec8aa12";
hash = "sha256-0YpcxxuEvErzlKMRFr05MrQiQd5wx30rdtaaMU7IqhI=";
};
blob5 = pkgs.fetchurl {
url = "https://registry.ollama.ai/v2/library/qwen3/blobs/sha256:0914c7781e001948488d937994217538375b4fd8c1466c5e7a625221abd3ea7a";
hash = "sha256-CRTHeB4AGUhIjZN5lCF1ODdbT9jBRmxeemJSIavT6no=";
};
in
pkgs.stdenv.mkDerivation {
pname = "ollama-qwen3-4b-instruct";
version = "1.0";
dontUnpack = true;
buildPhase = ''
mkdir -p $out/models/manifests/registry.ollama.ai/library/qwen3
mkdir -p $out/models/blobs
# Copy manifest
cp ${manifest} $out/models/manifests/registry.ollama.ai/library/qwen3/4b-instruct
# Copy blobs with correct names
cp ${blob1} $out/models/blobs/sha256-b72accf9724e93698c57cbd3b1af2d3341b3d05ec2089d86d273d97964853cd2
cp ${blob2} $out/models/blobs/sha256-85e4a5b7b8ef0e48af0e8658f5aaab9c2324c76c1641493f4d1e25fce54b18b9
cp ${blob3} $out/models/blobs/sha256-eade0a07cac7712787bbce23d12f9306adb4781d873d1df6e16f7840fa37afec
cp ${blob4} $out/models/blobs/sha256-d18a5cc71b84bc4af394a31116bd3932b42241de70c77d2b76d69a314ec8aa12
cp ${blob5} $out/models/blobs/sha256-0914c7781e001948488d937994217538375b4fd8c1466c5e7a625221abd3ea7a
'';
installPhase = ''
# buildPhase already created everything in $out
:
'';
meta = with pkgs.lib; {
description = "Qwen3 4B Instruct model for Ollama";
license = "apache-2.0";
platforms = platforms.all;
};
}

View File

@@ -186,3 +186,16 @@ class ClanCmdError(ClanError):
def __repr__(self) -> str:
return f"ClanCmdError({self.cmd})"
class ClanAiError(ClanError):
"""Exception raised for errors during AI/LLM operations."""
def __init__(
self,
msg: str,
*,
description: str | None = None,
location: str = "AI Processing",
) -> None:
super().__init__(msg, description=description, location=location)

View File

@@ -0,0 +1,200 @@
"""High-level API functions for LLM interactions, suitable for HTTP APIs and web UIs.
This module provides a clean, stateless API for integrating LLM functionality into
web applications and HTTP services. It wraps the complex multi-stage workflow into
simple function calls with serializable inputs and outputs.
"""
from pathlib import Path
from typing import Any, Literal, TypedDict, cast
from clan_lib.api import API
from clan_lib.flake.flake import Flake
from .llm import (
DEFAULT_MODELS,
ChatResult,
DiscoveryProgressEvent,
FinalDecisionProgressEvent,
ModelConfig,
ProgressCallback,
ProgressEvent,
ReadmeFetchProgressEvent,
get_model_config,
process_chat_turn,
)
from .schemas import ChatMessage, ConversationHistory, SessionState
class ChatTurnRequest(TypedDict, total=False):
"""Request payload for a chat turn.
Attributes:
user_message: The user's message/request
conversation_history: Optional list of prior messages in the conversation
provider: The LLM provider to use (default: "claude")
trace_file: Optional path to write LLM interaction traces for debugging
session_state: Opaque state returned from the previous turn
"""
user_message: str
conversation_history: ConversationHistory | None
provider: Literal["openai", "ollama", "claude"]
trace_file: Path | None
session_state: SessionState | None
class ChatTurnResponse(TypedDict):
"""Response payload for a chat turn.
Attributes:
proposed_instances: List of inventory instances suggested by the LLM
conversation_history: Updated conversation history after this turn
assistant_message: Message from the assistant
requires_user_response: Whether the assistant is waiting for user input
error: Error message if something went wrong (None on success)
session_state: State blob to pass into the next turn when continuing the workflow
"""
proposed_instances: list[dict[str, Any]]
conversation_history: list[ChatMessage]
assistant_message: str
requires_user_response: bool
error: str | None
session_state: SessionState
class ProgressEventResponse(TypedDict):
"""Progress event for streaming updates.
Attributes:
stage: The current stage of processing
status: The status within that stage (if applicable)
count: Count of items (for readme_fetch stage)
message: Message content (for conversation stage)
"""
stage: str
status: str | None
count: int | None
message: str | None
@API.register
def get_llm_turn(
flake: Flake,
request: ChatTurnRequest,
progress_callback: ProgressCallback | None = None,
) -> ChatTurnResponse:
"""Process a single chat turn through the LLM workflow.
This is the main entry point for HTTP APIs and web UIs to interact with
the LLM functionality. It handles:
- Service discovery
- Documentation fetching
- Final decision making
- Conversation management
Args:
flake: The Flake object representing the clan configuration
request: The chat turn request containing user message and optional history
progress_callback: Optional callback for progress updates
Returns:
ChatTurnResponse with proposed instances and conversation state
Example:
>>> from clan_lib.flake.flake import Flake
>>> flake = Flake("/path/to/clan")
>>> request: ChatTurnRequest = {
... "user_message": "Set up a web server",
... "provider": "claude"
... }
>>> response = chat_turn(flake, request)
>>> if response["proposed_instances"]:
... print("LLM suggests:", response["proposed_instances"])
>>> if response["requires_user_response"]:
... print("Assistant asks:", response["assistant_message"])
"""
result: ChatResult = process_chat_turn(
user_request=request["user_message"],
flake=flake,
conversation_history=request.get("conversation_history"),
provider=request.get("provider", "claude"),
progress_callback=progress_callback,
trace_file=request.get("trace_file"),
session_state=request.get("session_state"),
)
# Convert frozen tuples to lists for JSON serialization
return ChatTurnResponse(
proposed_instances=[dict(inst) for inst in result.proposed_instances],
conversation_history=list(result.conversation_history),
assistant_message=result.assistant_message,
requires_user_response=result.requires_user_response,
error=result.error,
session_state=cast("SessionState", dict(result.session_state)),
)
def progress_event_to_dict(event: ProgressEvent) -> ProgressEventResponse:
"""Convert a ProgressEvent to a dictionary suitable for JSON serialization.
This helper function is useful for streaming progress updates over HTTP
(e.g., Server-Sent Events or WebSockets).
Args:
event: The progress event to convert
Returns:
Dictionary representation of the event
Example:
>>> from clan_lib.llm.llm import DiscoveryProgressEvent
>>> event = DiscoveryProgressEvent(status="analyzing")
>>> progress_event_to_dict(event)
{'stage': 'discovery', 'status': 'analyzing', 'count': None, 'message': None}
"""
base_response: ProgressEventResponse = {
"stage": event.stage,
"status": None,
"count": None,
"message": None,
}
if isinstance(event, (DiscoveryProgressEvent, FinalDecisionProgressEvent)):
base_response["status"] = event.status
elif isinstance(event, ReadmeFetchProgressEvent):
base_response["status"] = event.status
base_response["count"] = event.count
# ConversationProgressEvent has message field
elif hasattr(event, "message"):
base_response["message"] = event.message # type: ignore[attr-defined]
if hasattr(event, "awaiting_response"):
base_response["status"] = (
"awaiting_response"
if event.awaiting_response # type: ignore[attr-defined]
else "complete"
)
return base_response
# Re-export types for convenience
__all__ = [
"DEFAULT_MODELS",
"ChatTurnRequest",
"ChatTurnResponse",
"ModelConfig",
"ProgressCallback",
"ProgressEvent",
"ProgressEventResponse",
"get_llm_turn",
"get_model_config",
"progress_event_to_dict",
]

View File

@@ -0,0 +1,46 @@
{
"instances": {},
"meta": {
"description": null,
"icon": null,
"name": "Qubasas_Clan"
},
"machines": {
"gchq-local": {
"deploy": {
"buildHost": null,
"targetHost": null
},
"description": null,
"icon": null,
"installedAt": 1756814302,
"machineClass": "nixos",
"name": "gchq-local",
"tags": ["all", "nixos"]
},
"qube-email": {
"deploy": {
"buildHost": null,
"targetHost": null
},
"description": null,
"icon": null,
"installedAt": 1756814302,
"machineClass": "nixos",
"name": "qube-email",
"tags": ["all", "nixos"]
},
"wintux": {
"deploy": {
"buildHost": null,
"targetHost": null
},
"description": null,
"icon": null,
"installedAt": 1756814302,
"machineClass": "nixos",
"name": "wintux",
"tags": ["all", "nixos"]
}
}
}

View File

@@ -0,0 +1,7 @@
{
"tags": {
"all": ["gchq-local", "qube-email", "wintux"],
"darwin": [],
"nixos": ["gchq-local", "qube-email", "wintux"]
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,274 @@
import contextlib
import json
from collections.abc import Iterator
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from clan_lib.flake.flake import Flake
from clan_lib.llm.llm import (
process_chat_turn,
)
from clan_lib.llm.service import create_llm_model, run_llm_service
from clan_lib.service_runner import create_service_manager
@pytest.fixture
def mock_flake() -> MagicMock:
"""Create a mock Flake object with test data."""
flake_mock = MagicMock(spec=Flake)
test_data_dir = Path(__file__).parent / "container_data"
def load_json(filename: str) -> dict | list:
"""Load and parse a JSON file from container_data directory."""
return json.loads((test_data_dir / filename).read_text())
# Configure flake.select to return values based on the argument
def select_side_effect(arg: str) -> dict | list:
# Handle staticModules readme requests dynamically
if arg.startswith(
"clanInternals.inventoryClass.staticModules.{"
) and arg.endswith("}.manifest.readme"):
# Extract service names from the pattern: {service1,service2,...}
services_part = arg.split("{")[1].split("}")[0]
requested_services = [s.strip() for s in services_part.split(",")]
# Load all VPN readmes (always returns a dict for this file)
all_readmes = load_json("vpns_readme.json")
assert isinstance(all_readmes, dict), (
"vpns_readme.json should contain a dict"
)
# Return only the requested services
return {
svc: all_readmes[svc]
for svc in requested_services
if svc in all_readmes
}
match arg:
case "clanInternals.inventoryClass.inventory.{instances,machines,meta}":
return load_json("inventory_instances_machines_meta.json")
case "clanInternals.inventoryClass.inventory.{tags}":
return load_json("inventory_tags.json")
case "clanInternals.inventoryClass.modulesPerSource":
return load_json("modules_per_source.json")
case "clanInternals.inventoryClass.staticModules":
return load_json("static_modules.json")
case _:
msg = f"Unexpected flake.select argument: {arg}"
raise ValueError(msg)
flake_mock.select.side_effect = select_side_effect
return flake_mock
@pytest.fixture
def mock_nix_shell() -> Iterator[MagicMock]:
"""Patch nix_shell function with test data."""
# Configure nix_shell to return values based on the arguments
def nix_shell_side_effect(packages: list[str], cmd: list[str]) -> list[str]:
match (tuple(packages), tuple(cmd)):
case (("ollama",), ("ollama", "pull", _)):
return ["ollama", "list"]
case (("ollama",), _):
return cmd
case _:
msg = f"Unexpected nix_shell arguments: packages={packages}, cmd={cmd}"
raise ValueError(msg)
with patch("clan_lib.llm.service.nix_shell") as mock:
mock.side_effect = nix_shell_side_effect
yield mock
@pytest.fixture
def llm_service() -> Iterator[None]:
"""Start LLM service and create model, ensuring cleanup."""
service_manager = create_service_manager()
try:
run_llm_service()
create_llm_model()
yield
finally:
# Always attempt to stop the service, even if setup failed
with contextlib.suppress(Exception):
service_manager.stop_service("ollama")
@pytest.mark.service_runner
@pytest.mark.usefixtures("mock_nix_shell", "llm_service")
def test_full_conversation_flow(mock_flake: MagicMock) -> None:
"""Comprehensive test that exercises the complete conversation flow with the actual LLM service.
This test simulates a realistic multi-turn conversation that covers:
- Discovery phase: Initial request and LLM gathering information
- Service selection phase: User choosing from available options
- Final decision phase: Configuring the selected service with specific parameters
- State transitions: pending_service_selection -> pending_final_decision -> completion
- Conversation history preservation across all turns
- Error handling and edge cases
"""
flake = mock_flake
return
# ========== TURN 1: Discovery Phase - Initial vague request ==========
print("\n=== TURN 1: Initial discovery request ===")
result = process_chat_turn(
user_request="What VPN options do I have?",
flake=flake,
provider="ollama",
)
# Verify discovery phase behavior
assert result.requires_user_response is True, (
"Should require user response in discovery"
)
assert len(result.conversation_history) >= 2, (
"Should have user + assistant messages"
)
assert result.conversation_history[0]["role"] == "user"
assert result.conversation_history[0]["content"] == "What VPN options do I have?"
assert result.conversation_history[-1]["role"] == "assistant"
assert len(result.assistant_message) > 0, "Assistant should provide a response"
# Should transition to service selection phase with pending state
assert "pending_service_selection" in result.session_state, (
"Should have pending service selection"
)
assert "readme_results" in result.session_state["pending_service_selection"]
# No instances yet
assert len(result.proposed_instances) == 0
assert result.error is None
print(f"Assistant: {result.assistant_message[:200]}...")
print(f"State: {list(result.session_state.keys())}")
print(f"History length: {len(result.conversation_history)}")
# ========== TURN 2: Service Selection Phase - User makes a choice ==========
print("\n=== TURN 2: User selects ZeroTier ===")
result = process_chat_turn(
user_request="I'll use ZeroTier please",
flake=flake,
conversation_history=list(result.conversation_history),
provider="ollama",
session_state=result.session_state,
)
# Verify conversation history growth and preservation
assert len(result.conversation_history) > 2, "History should grow"
assert result.conversation_history[0]["content"] == "What VPN options do I have?"
assert result.conversation_history[2]["content"] == "I'll use ZeroTier please"
# Should either ask for configuration details or provide direct config
# Most likely will ask for more details (pending_final_decision)
if result.requires_user_response:
# LLM is asking for configuration details
assert len(result.assistant_message) > 0
# Should transition to final decision phase
if "pending_final_decision" not in result.session_state:
# Might still be in service selection asking clarifications
assert "pending_service_selection" in result.session_state
else:
# LLM provided configuration immediately (less likely)
assert len(result.proposed_instances) > 0
assert result.proposed_instances[0]["module"]["name"] == "zerotier"
print(
f"Assistant: {result.assistant_message[:200] if result.assistant_message else 'No message'}..."
)
print(f"State: {list(result.session_state.keys())}")
print(f"Requires response: {result.requires_user_response}")
# ========== Continue conversation until we reach final decision or completion ==========
max_turns = 10
turn_count = 2
while result.requires_user_response and turn_count < max_turns:
turn_count += 1
print(f"\n=== TURN {turn_count}: Continuing conversation ===")
# Determine appropriate response based on current state
if "pending_service_selection" in result.session_state:
# Still selecting service
user_request = "Yes, ZeroTier"
elif "pending_final_decision" in result.session_state:
# Configuring the service
user_request = "Set up gchq-local as controller, qube-email as moon, and wintux as peer"
else:
# Generic continuation
user_request = "Yes, that sounds good. Use gchq-local as controller."
print(f"User: {user_request}")
result = process_chat_turn(
user_request=user_request,
flake=flake,
conversation_history=list(result.conversation_history),
provider="ollama",
session_state=result.session_state,
)
# Verify conversation history continues to grow
assert len(result.conversation_history) == (turn_count * 2), (
f"History should have {turn_count * 2} messages (turn {turn_count})"
)
# Verify history preservation
assert (
result.conversation_history[0]["content"] == "What VPN options do I have?"
)
print(
f"Assistant: {result.assistant_message[:200] if result.assistant_message else 'No message'}..."
)
print(f"State: {list(result.session_state.keys())}")
print(f"Requires response: {result.requires_user_response}")
print(f"Proposed instances: {len(result.proposed_instances)}")
# Check for completion
if not result.requires_user_response:
print("\n=== Conversation completed! ===")
break
# ========== Final Verification ==========
assert turn_count < max_turns, f"Conversation took too many turns ({turn_count})"
# If conversation completed, verify we have valid configuration
if not result.requires_user_response:
assert len(result.proposed_instances) > 0, (
"Should have at least one proposed instance"
)
instance = result.proposed_instances[0]
# Verify instance structure
assert "module" in instance
assert "name" in instance["module"]
assert instance["module"]["name"] in [
"zerotier",
"wireguard",
"yggdrasil",
"mycelium",
]
# Should have roles configuration
if "roles" in instance:
print(f"\nConfiguration roles: {list(instance['roles'].keys())}")
# Should not be in pending state anymore
assert "pending_service_selection" not in result.session_state
assert "pending_final_decision" not in result.session_state
assert result.error is None, f"Should not have error: {result.error}"
print(f"\nFinal instance: {instance['module']['name']}")
print(f"Total conversation turns: {turn_count}")
print(f"Final history length: {len(result.conversation_history)}")
else:
# Conversation didn't complete but should have made progress
assert len(result.conversation_history) > 2
assert result.error is None
print(f"\nConversation in progress after {turn_count} turns")
print(f"Current state: {list(result.session_state.keys())}")

View File

@@ -0,0 +1,555 @@
"""API client code for LLM providers (OpenAI and Ollama)."""
import json
import logging
import os
import time
import urllib.request
from collections.abc import Sequence
from http import HTTPStatus
from pathlib import Path
from typing import Any, cast
from urllib.error import HTTPError, URLError
from clan_lib.errors import ClanError
from .schemas import (
ChatCompletionRequestPayload,
ChatMessage,
FunctionCallType,
MessageContent,
OllamaChatResponse,
OpenAIChatCompletionResponse,
ToolDefinition,
)
from .trace import (
format_messages_for_trace,
format_tools_for_trace,
write_trace_entry,
)
log = logging.getLogger(__name__)
def _stringify_message_content(content: MessageContent | None) -> str:
"""Convert message content payloads to human-readable text for logging."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict) and "text" in item:
text_part = item.get("text")
if isinstance(text_part, str):
parts.append(text_part)
continue
parts.append(json.dumps(item, ensure_ascii=False))
return "\n".join(parts)
return json.dumps(content, ensure_ascii=False)
def _summarize_tools(
tools: Sequence[ToolDefinition],
) -> str:
"""Create a concise comma-separated list of tool names for logging."""
names: list[str] = []
for tool in tools:
if not isinstance(tool, dict):
continue
function_block = tool.get("function")
if isinstance(function_block, dict) and "name" in function_block:
name = function_block.get("name")
else:
name = tool.get("name")
if isinstance(name, str):
names.append(name)
return ", ".join(names)
def _debug_log_request(
provider: str,
messages: list[ChatMessage],
tools: Sequence[ToolDefinition],
) -> None:
"""Emit structured debug logs for outbound LLM requests."""
if not log.isEnabledFor(logging.DEBUG):
return
log.debug("[%s] >>> sending %d message(s)", provider, len(messages))
for idx, message in enumerate(messages):
role = message.get("role", "unknown")
content_str = _stringify_message_content(message.get("content"))
log.debug(
"[%s] >>> message[%02d] role=%s len=%d",
provider,
idx,
role,
len(content_str),
)
if content_str:
log.debug("[%s] >>> message[%02d] content:\n%s", provider, idx, content_str)
if tools:
log.debug("[%s] >>> tool summary: %s", provider, _summarize_tools(tools))
log.debug(
"[%s] >>> tool payload:\n%s",
provider,
json.dumps(list(tools), indent=2, ensure_ascii=False),
)
def _debug_log_response(
provider: str,
text: str,
function_calls: list[FunctionCallType],
) -> None:
"""Emit structured debug logs for inbound LLM responses."""
if not log.isEnabledFor(logging.DEBUG):
return
if text:
log.debug(
"[%s] <<< response text len=%d\n%s",
provider,
len(text),
text,
)
else:
log.debug("[%s] <<< no textual response", provider)
if not function_calls:
log.debug("[%s] <<< no function calls", provider)
return
for idx, call in enumerate(function_calls):
args_repr = call.get("arguments", "")
formatted_args = args_repr
if isinstance(args_repr, str):
try:
parsed_args = json.loads(args_repr)
formatted_args = json.dumps(parsed_args, indent=2, ensure_ascii=False)
except json.JSONDecodeError:
formatted_args = args_repr
log.debug(
"[%s] <<< call[%02d] name=%s\n%s",
provider,
idx,
call.get("name"),
formatted_args,
)
def call_openai_api(
model: str,
messages: list[ChatMessage],
tools: Sequence[ToolDefinition],
timeout: int = 60,
trace_file: Path | None = None,
stage: str = "unknown",
trace_metadata: dict[str, Any] | None = None,
) -> OpenAIChatCompletionResponse:
"""Call the OpenAI API for chat completion.
Args:
model: The OpenAI model to use
messages: List of message dictionaries
tools: List of OpenAI function schemas
timeout: Request timeout in seconds (default: 60)
trace_file: Optional path to write trace entries for debugging
stage: Stage name for trace entries (default: "unknown")
trace_metadata: Optional metadata to include in trace entries
Returns:
The parsed JSON response from the API
Raises:
ClanError: If the API call fails
"""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
msg = "OPENAI_API_KEY environment variable is required for OpenAI provider"
raise ClanError(msg)
payload: ChatCompletionRequestPayload = {
"model": model,
"messages": messages,
"tools": list(tools),
}
_debug_log_request("openai", messages, tools)
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
start_time = time.time()
try:
req = urllib.request.Request( # noqa: S310
url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
)
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310
if resp.getcode() != HTTPStatus.OK.value:
msg = f"OpenAI API returned status {resp.getcode()}"
raise ClanError(msg)
raw = resp.read().decode("utf-8")
response = cast("OpenAIChatCompletionResponse", json.loads(raw))
# Write trace if requested
if trace_file:
duration_ms = (time.time() - start_time) * 1000
function_calls, message_content = parse_openai_response(
response, provider="openai"
)
write_trace_entry(
trace_file=trace_file,
provider="openai",
model=model,
stage=stage,
request={
"messages": format_messages_for_trace(messages),
"tools": format_tools_for_trace(
cast("list[dict[str, Any]]", list(tools))
),
},
response={
"function_calls": [
{
"name": call["name"],
"arguments": json.loads(call["arguments"])
if isinstance(call["arguments"], str)
else call["arguments"],
}
for call in function_calls
],
"message": message_content,
},
duration_ms=duration_ms,
metadata=trace_metadata,
)
return response
except HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else ""
msg = f"OpenAI returned HTTP {e.code}: {error_body}"
raise ClanError(msg) from e
except URLError as e:
msg = "OpenAI API not reachable"
raise ClanError(msg) from e
except json.JSONDecodeError as e:
msg = "Failed to parse OpenAI API response"
raise ClanError(msg) from e
def call_claude_api(
model: str,
messages: list[ChatMessage],
tools: Sequence[ToolDefinition],
base_url: str | None = None,
timeout: int = 60,
trace_file: Path | None = None,
stage: str = "unknown",
trace_metadata: dict[str, Any] | None = None,
) -> OpenAIChatCompletionResponse:
"""Call the Claude API (via OpenAI-compatible endpoint) for chat completion.
Args:
model: The Claude model to use
messages: List of message dictionaries
tools: List of function schemas (OpenAI format)
base_url: Optional base URL for the API (defaults to https://api.anthropic.com/v1/)
timeout: Request timeout in seconds (default: 60)
trace_file: Optional path to write trace entries for debugging
stage: Stage name for trace entries (default: "unknown")
trace_metadata: Optional metadata to include in trace entries
Returns:
The parsed JSON response from the API
Raises:
ClanError: If the API call fails
"""
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
msg = "ANTHROPIC_API_KEY environment variable is required for Claude provider"
raise ClanError(msg)
if base_url is None:
base_url = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com/v1/")
# Ensure base_url ends with /
if not base_url.endswith("/"):
base_url += "/"
payload: ChatCompletionRequestPayload = {
"model": model,
"messages": messages,
"tools": list(tools),
}
_debug_log_request("claude", messages, tools)
url = f"{base_url}chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
}
start_time = time.time()
try:
req = urllib.request.Request( # noqa: S310
url,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
)
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310
if resp.getcode() != HTTPStatus.OK.value:
msg = f"Claude API returned status {resp.getcode()}"
raise ClanError(msg)
raw = resp.read().decode("utf-8")
response = cast("OpenAIChatCompletionResponse", json.loads(raw))
# Write trace if requested
if trace_file:
duration_ms = (time.time() - start_time) * 1000
function_calls, message_content = parse_openai_response(
response, provider="claude"
)
write_trace_entry(
trace_file=trace_file,
provider="claude",
model=model,
stage=stage,
request={
"messages": format_messages_for_trace(messages),
"tools": format_tools_for_trace(
cast("list[dict[str, Any]]", list(tools))
),
},
response={
"function_calls": [
{
"name": call["name"],
"arguments": json.loads(call["arguments"])
if isinstance(call["arguments"], str)
else call["arguments"],
}
for call in function_calls
],
"message": message_content,
},
duration_ms=duration_ms,
metadata=trace_metadata,
)
return response
except HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else ""
msg = f"Claude returned HTTP {e.code}: {error_body}"
raise ClanError(msg) from e
except URLError as e:
msg = f"Claude API not reachable at {url}"
raise ClanError(msg) from e
except json.JSONDecodeError as e:
msg = "Failed to parse Claude API response"
raise ClanError(msg) from e
def call_ollama_api(
model: str,
messages: list[ChatMessage],
tools: Sequence[ToolDefinition],
timeout: int = 120,
trace_file: Path | None = None,
stage: str = "unknown",
max_tokens: int | None = None,
trace_metadata: dict[str, Any] | None = None,
) -> OllamaChatResponse:
"""Call the Ollama API for chat completion.
Args:
model: The Ollama model to use
messages: List of message dictionaries
tools: List of Ollama function schemas
timeout: Request timeout in seconds (default: 120)
trace_file: Optional path to write trace entries for debugging
stage: Stage name for trace entries (default: "unknown")
max_tokens: Maximum number of tokens to generate (default: None = unlimited)
trace_metadata: Optional metadata to include in trace entries
Returns:
The parsed JSON response from the API
Raises:
ClanError: If the API call fails
"""
payload: ChatCompletionRequestPayload = {
"model": model,
"messages": messages,
"stream": False,
"tools": list(tools),
}
# Add max_tokens limit if specified
if max_tokens is not None:
payload["options"] = {"num_predict": max_tokens} # type: ignore[typeddict-item]
_debug_log_request("ollama", messages, tools)
url = "http://localhost:11434/api/chat"
start_time = time.time()
try:
req = urllib.request.Request( # noqa: S310
url,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=timeout) as resp: # noqa: S310
if resp.getcode() != HTTPStatus.OK.value:
msg = f"Ollama API returned status {resp.getcode()}"
raise ClanError(msg)
raw = resp.read().decode("utf-8")
response = cast("OllamaChatResponse", json.loads(raw))
# Write trace if requested
if trace_file:
duration_ms = (time.time() - start_time) * 1000
function_calls, message_content = parse_ollama_response(
response, provider="ollama"
)
write_trace_entry(
trace_file=trace_file,
provider="ollama",
model=model,
stage=stage,
request={
"messages": format_messages_for_trace(messages),
"tools": format_tools_for_trace(
cast("list[dict[str, Any]]", list(tools))
),
},
response={
"function_calls": [
{
"name": call["name"],
"arguments": json.loads(call["arguments"])
if isinstance(call["arguments"], str)
else call["arguments"],
}
for call in function_calls
],
"message": message_content,
},
duration_ms=duration_ms,
metadata=trace_metadata,
)
return response
except HTTPError as e:
msg = f"Ollama returned HTTP {e.code} when requesting chat completion."
raise ClanError(msg) from e
except URLError as e:
msg = "Ollama API not reachable at http://localhost:11434"
raise ClanError(msg) from e
except json.JSONDecodeError as e:
msg = "Failed to parse Ollama API response"
raise ClanError(msg) from e
def parse_openai_response(
response_data: OpenAIChatCompletionResponse,
provider: str = "openai",
) -> tuple[list[FunctionCallType], str]:
"""Parse OpenAI API response to extract function calls.
Args:
response_data: The raw response from OpenAI API
provider: The provider name for logging purposes (default: "openai")
Returns:
Tuple of (function_calls, message_content)
"""
choices = response_data.get("choices") or []
if not choices:
return [], ""
message = choices[0].get("message") or {}
tool_calls = message.get("tool_calls") or []
raw_content = message.get("content") or ""
model_content = _stringify_message_content(raw_content)
result: list[FunctionCallType] = []
for tool_call in tool_calls:
tc_id = tool_call.get("id") or f"call_{int(time.time() * 1000)}"
function = tool_call.get("function") or {}
function_name = function.get("name") or ""
function_args = function.get("arguments") or "{}"
result.append(
FunctionCallType(
id=tc_id,
call_id=tc_id,
type="function_call",
name=function_name,
arguments=function_args,
)
)
_debug_log_response(provider, model_content, result)
return result, model_content
def parse_ollama_response(
response_data: OllamaChatResponse,
provider: str = "ollama",
) -> tuple[list[FunctionCallType], str]:
"""Parse Ollama API response to extract function calls.
Args:
response_data: The raw response from Ollama API
provider: The provider name for logging purposes (default: "ollama")
Returns:
Tuple of (function_calls, message_content)
"""
message = response_data.get("message") or {}
tool_calls = message.get("tool_calls") or []
raw_content = message.get("content") or ""
model_content = _stringify_message_content(raw_content)
result: list[FunctionCallType] = []
for idx, tool_call in enumerate(tool_calls):
function = tool_call.get("function") or {}
function_name = function.get("name") or ""
function_args = function.get("arguments") or {}
# Generate unique IDs (similar to OpenAI format)
call_id = f"call_{idx}_{int(time.time() * 1000)}"
fc_id = f"fc_{idx}_{int(time.time() * 1000)}"
result.append(
FunctionCallType(
id=fc_id,
call_id=call_id,
type="function_call",
name=function_name,
arguments=json.dumps(function_args),
)
)
_debug_log_response(provider, model_content, result)
return result, model_content

View File

@@ -0,0 +1,65 @@
"""High-level LLM orchestration functions.
This module re-exports the LLM orchestration API from submodules.
"""
# Re-export types and dataclasses
from .llm_types import ( # noqa: F401
DEFAULT_MODELS,
ChatResult,
ConversationProgressEvent,
DiscoveryProgressEvent,
FinalDecisionProgressEvent,
ModelConfig,
ProgressCallback,
ProgressEvent,
ReadmeFetchProgressEvent,
ServiceSelectionProgressEvent,
ServiceSelectionResult,
get_model_config,
)
# Re-export high-level orchestrator
from .orchestrator import process_chat_turn # noqa: F401
# Re-export low-level phase functions
from .phases import ( # noqa: F401
execute_readme_requests,
get_llm_discovery_phase,
get_llm_final_decision,
get_llm_service_selection,
llm_final_decision_to_inventory_instances,
)
# Re-export commonly used functions and types from schemas
from .schemas import ( # noqa: F401
AiAggregate,
ChatMessage,
ConversationHistory,
FunctionCallType,
JSONValue,
MachineDescription,
OllamaFunctionSchema,
OpenAIFunctionSchema,
PendingFinalDecisionState,
PendingServiceSelectionState,
ReadmeRequest,
SessionState,
SimplifiedServiceSchema,
TagDescription,
aggregate_ollama_function_schemas,
aggregate_openai_function_schemas,
create_get_readme_tool,
create_select_service_tool,
create_simplified_service_schemas,
)
# Re-export service functions
from .service import create_llm_model, run_llm_service # noqa: F401
# Re-export utility functions and constants
from .utils import ( # noqa: F401
ASSISTANT_MODE_DISCOVERY,
ASSISTANT_MODE_FINAL,
ASSISTANT_MODE_SELECTION,
)

View File

@@ -3,10 +3,13 @@ from collections.abc import Callable
import pytest
from clan_cli.tests.fixtures_flakes import nested_dict
from clan_lib.flake.flake import Flake
from clan_lib.services.llm import LLMFunctionSchema, clan_module_to_llm_function
from clan_lib.services.modules import (
list_service_modules,
from clan_lib.llm.llm import (
OpenAIFunctionSchema,
aggregate_openai_function_schemas,
llm_final_decision_to_inventory_instances,
)
from clan_lib.llm.schemas import FunctionCallType, clan_module_to_openai_spec
from clan_lib.services.modules import list_service_modules
@pytest.mark.with_core
@@ -25,6 +28,23 @@ def test_clan_module_to_llm_func(
config["inventory"]["instances"]["bar"]["module"]["input"] = None
config["inventory"]["instances"]["bar"]["module"]["name"] = "sshd"
config["inventory"]["machines"] = {
"machine1": {
"tags": ["production", "backup"],
},
"machine2": {
"tags": ["client"],
},
"machine3": {
"tags": ["client"],
},
}
config["inventory"]["tags"] = {
"production": [],
"backup": [],
"client": [],
}
# Omit input
config["inventory"]["instances"]["baz"]["module"]["name"] = "sshd"
# external input
@@ -42,11 +62,11 @@ def test_clan_module_to_llm_func(
available_machines = ["machine1", "machine2", "server1"]
available_tags = ["production", "backup", "client"]
generated_tool_func = clan_module_to_llm_function(
generated_tool_func = clan_module_to_openai_spec(
borgbackup_service, available_tags, available_machines
)
expected_tool_func: LLMFunctionSchema = {
expected_tool_func: OpenAIFunctionSchema = {
"type": "function",
"name": "borgbackup",
"description": "Efficient, deduplicating backup program with optional compression and secure encryption.",
@@ -55,13 +75,7 @@ def test_clan_module_to_llm_func(
"properties": {
"module": {
"type": "object",
"properties": {
# "input": {
# "type": "string",
# "description": "Source / Input name of the module, e.g. 'clan-core' or null for built-in modules",
# "enum": ["Y2xhbi1jaW9yZS1uZXZlci1kZXBlbmQtb24tbWU"],
# }
},
"properties": {},
},
"roles": {
"type": "object",
@@ -79,7 +93,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Machines for this role with empty configuration objects",
"description": 'Machines to assign this role to. Format: each machine name is a key with an empty object {} as value. Example: {"wintux": {}, "gchq-local": {}}',
},
"tags": {
"type": "object",
@@ -90,7 +104,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Tags for this role with empty configuration objects",
"description": 'Tags to assign this role to. Format: each tag name is a key with an empty object {} as value. Example: {"all": {}, "nixos": {}}',
},
},
"additionalProperties": False,
@@ -108,7 +122,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Machines for this role with empty configuration objects",
"description": 'Machines to assign this role to. Format: each machine name is a key with an empty object {} as value. Example: {"wintux": {}, "gchq-local": {}}',
},
"tags": {
"type": "object",
@@ -119,7 +133,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Tags for this role with empty configuration objects",
"description": 'Tags to assign this role to. Format: each tag name is a key with an empty object {} as value. Example: {"all": {}, "nixos": {}}',
},
},
"additionalProperties": False,
@@ -141,11 +155,11 @@ def test_clan_module_to_llm_func(
)
assert certificate_service is not None
generated_tool_func2 = clan_module_to_llm_function(
generated_tool_func2 = clan_module_to_openai_spec(
certificate_service, available_tags, available_machines
)
expected_tool_func2: LLMFunctionSchema = {
expected_tool_func2: OpenAIFunctionSchema = {
"type": "function",
"name": "certificates",
"description": "Sets up a PKI certificate chain using step-ca",
@@ -172,7 +186,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Machines for this role with empty configuration objects",
"description": 'Machines to assign this role to. Format: each machine name is a key with an empty object {} as value. Example: {"wintux": {}, "gchq-local": {}}',
},
"tags": {
"type": "object",
@@ -183,7 +197,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Tags for this role with empty configuration objects",
"description": 'Tags to assign this role to. Format: each tag name is a key with an empty object {} as value. Example: {"all": {}, "nixos": {}}',
},
},
"additionalProperties": False,
@@ -201,7 +215,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Machines for this role with empty configuration objects",
"description": 'Machines to assign this role to. Format: each machine name is a key with an empty object {} as value. Example: {"wintux": {}, "gchq-local": {}}',
},
"tags": {
"type": "object",
@@ -212,7 +226,7 @@ def test_clan_module_to_llm_func(
}
},
"additionalProperties": False,
"description": "Tags for this role with empty configuration objects",
"description": 'Tags to assign this role to. Format: each tag name is a key with an empty object {} as value. Example: {"all": {}, "nixos": {}}',
},
},
"additionalProperties": False,
@@ -228,3 +242,48 @@ def test_clan_module_to_llm_func(
}
assert generated_tool_func2 == expected_tool_func2
aggregate = aggregate_openai_function_schemas(flake)
assert len(aggregate.tools) >= 2
def test_llm_final_decision_to_inventory_conversion() -> None:
"""Test conversion of LLM final decision to inventory format."""
final_decision: list[FunctionCallType] = [
{
"id": "toolu_01XHjHUMzZVTcDCqaYQJEWu5",
"call_id": "toolu_01XHjHUMzZVTcDCqaYQJEWu5",
"type": "function_call",
"name": "matrix-synapse",
"arguments": '{"roles": {"default": {"machines": {"gchq-local": {}}}}}',
},
{
"id": "toolu_01TsjKZ87J3fi6RNzNzu33ff",
"call_id": "toolu_01TsjKZ87J3fi6RNzNzu33ff",
"type": "function_call",
"name": "monitoring",
"arguments": '{"module": { "input": "qubasas-clan" }, "roles": {"telegraf": {"tags": {"all": {}}}}}',
},
]
assert isinstance(final_decision, list)
expected = [
{
"module": {
"input": None,
"name": "matrix-synapse",
},
"roles": {"default": {"machines": {"gchq-local": {}}}},
},
{
"module": {
"input": "qubasas-clan",
"name": "monitoring",
},
"roles": {"telegraf": {"tags": {"all": {}}}},
},
]
result = llm_final_decision_to_inventory_instances(final_decision)
assert result == expected

View File

@@ -0,0 +1,142 @@
"""Type definitions and dataclasses for LLM orchestration."""
from collections.abc import Callable
from dataclasses import dataclass
from typing import Literal
from clan_lib.nix_models.clan import InventoryInstance
from .schemas import ChatMessage, SessionState
@dataclass(frozen=True)
class DiscoveryProgressEvent:
"""Progress event during discovery phase."""
service_names: list[str] | None = None
stage: Literal["discovery"] = "discovery"
status: Literal["analyzing", "complete"] = "analyzing"
@dataclass(frozen=True)
class ReadmeFetchProgressEvent:
"""Progress event during readme fetching."""
count: int
service_names: list[str]
stage: Literal["readme_fetch"] = "readme_fetch"
status: Literal["fetching", "complete"] = "fetching"
@dataclass(frozen=True)
class ServiceSelectionProgressEvent:
"""Progress event during service selection phase."""
service_names: list[str]
stage: Literal["service_selection"] = "service_selection"
status: Literal["selecting", "complete"] = "selecting"
@dataclass(frozen=True)
class FinalDecisionProgressEvent:
"""Progress event during final decision phase."""
stage: Literal["final_decision"] = "final_decision"
status: Literal["reviewing", "complete"] = "reviewing"
@dataclass(frozen=True)
class ConversationProgressEvent:
"""Progress event for conversation continuation."""
message: str
stage: Literal["conversation"] = "conversation"
awaiting_response: bool = True
@dataclass(frozen=True)
class ServiceSelectionResult:
"""Result from service selection step.
Attributes:
selected_service: Name of the selected service (None if clarification needed)
service_summary: LLM-generated summary of the service (None if clarification needed)
clarifying_message: Clarifying question from LLM (empty string if service selected)
"""
selected_service: str | None
service_summary: str | None
clarifying_message: str
ProgressEvent = (
DiscoveryProgressEvent
| ReadmeFetchProgressEvent
| ServiceSelectionProgressEvent
| FinalDecisionProgressEvent
| ConversationProgressEvent
)
ProgressCallback = Callable[[ProgressEvent], None]
@dataclass(frozen=True)
class ChatResult:
"""Result of a complete chat turn through the multi-stage workflow.
Attributes:
proposed_instances: List of inventory instances suggested by the LLM (empty if none)
conversation_history: Updated conversation history after this turn
assistant_message: Message from the assistant (questions, recommendations, or diff preview)
requires_user_response: True if the assistant asked a question and needs a response
error: Error message if something went wrong (None on success)
session_state: Serializable state to pass into the next turn when continuing a workflow
"""
proposed_instances: tuple[InventoryInstance, ...]
conversation_history: tuple[ChatMessage, ...]
assistant_message: str
requires_user_response: bool
session_state: SessionState
error: str | None = None
@dataclass(frozen=True)
class ModelConfig:
"""Configuration for an LLM model.
Attributes:
name: The model identifier/name
provider: The LLM provider
timeout: Request timeout in seconds (default: 120)
"""
name: str
provider: Literal["openai", "ollama", "claude"]
timeout: int = 120
# Default model configurations for each provider
DEFAULT_MODELS: dict[Literal["openai", "ollama", "claude"], ModelConfig] = {
"openai": ModelConfig(name="gpt-4o", provider="openai", timeout=60),
"claude": ModelConfig(name="claude-sonnet-4-5", provider="claude", timeout=60),
"ollama": ModelConfig(name="qwen3:4b-instruct", provider="ollama", timeout=120),
}
def get_model_config(
provider: Literal["openai", "ollama", "claude"],
) -> ModelConfig:
"""Get the default model configuration for a provider.
Args:
provider: The LLM provider name
Returns:
ModelConfig for the specified provider
"""
return DEFAULT_MODELS[provider]

View File

@@ -0,0 +1,415 @@
"""High-level LLM orchestration for multi-stage chat workflow."""
import json
from pathlib import Path
from typing import Literal, cast
from clan_lib.errors import ClanAiError
from clan_lib.flake.flake import Flake
from clan_lib.services.modules import InputName, ServiceReadmeCollection
from .llm_types import (
ChatResult,
DiscoveryProgressEvent,
FinalDecisionProgressEvent,
ProgressCallback,
ReadmeFetchProgressEvent,
ServiceSelectionProgressEvent,
)
from .phases import (
execute_readme_requests,
get_llm_discovery_phase,
get_llm_final_decision,
get_llm_service_selection,
llm_final_decision_to_inventory_instances,
)
from .schemas import (
ConversationHistory,
JSONValue,
PendingFinalDecisionState,
PendingServiceSelectionState,
SessionState,
)
from .utils import (
ASSISTANT_MODE_DISCOVERY,
ASSISTANT_MODE_FINAL,
ASSISTANT_MODE_SELECTION,
_assistant_message,
_deserialize_readme_results,
_serialize_readme_results,
_user_message,
)
def process_chat_turn(
user_request: str,
flake: Flake,
conversation_history: ConversationHistory | None = None,
provider: Literal["openai", "ollama", "claude"] = "ollama",
progress_callback: ProgressCallback | None = None,
trace_file: Path | None = None,
session_state: SessionState | None = None,
) -> ChatResult:
"""High-level API that orchestrates the entire multi-stage chat workflow.
This function handles the complete flow:
1. Discovery phase - LLM selects relevant services
2. Readme fetching - Retrieves detailed documentation
3. Final decision - LLM makes informed suggestions
4. Conversion - Transforms suggestions to inventory instances
Args:
user_request: The user's message/request
flake: The Flake object to get services from
conversation_history: Optional list of prior messages in the conversation
provider: The LLM provider to use
progress_callback: Optional callback for progress updates
trace_file: Optional path to write LLM interaction traces for debugging
session_state: Optional cross-turn state to resume pending workflows
Returns:
ChatResult containing proposed instances, updated history, and assistant message
Example:
>>> result = process_chat_turn(
... "Set up a web server",
... flake,
... progress_callback=lambda event: print(f"Stage: {event.stage}")
... )
>>> if result.proposed_instances:
... print("LLM suggested:", result.proposed_instances)
>>> if result.requires_user_response:
... print("Assistant asks:", result.assistant_message)
"""
history = list(conversation_history) if conversation_history else []
state: SessionState = cast(
"SessionState", dict(session_state) if session_state else {}
)
def _state_snapshot() -> dict[str, JSONValue]:
try:
return json.loads(json.dumps(state))
except (TypeError, ValueError):
return dict(state) # type: ignore[arg-type]
def _metadata(extra: dict[str, JSONValue] | None = None) -> dict[str, JSONValue]:
base: dict[str, JSONValue] = {"session_state_before": _state_snapshot()}
if extra:
base.update(extra)
return base
def _state_copy() -> SessionState:
return cast("SessionState", dict(state))
pending_final_raw = state.get("pending_final_decision")
pending_final: PendingFinalDecisionState | None = (
pending_final_raw if isinstance(pending_final_raw, dict) else None
)
pending_selection_raw = state.get("pending_service_selection")
pending_selection: PendingServiceSelectionState | None = (
pending_selection_raw if isinstance(pending_selection_raw, dict) else None
)
resume_readme_results: dict[InputName, ServiceReadmeCollection] | None = None
if pending_selection is not None:
serialized_results = pending_selection.get("readme_results")
if serialized_results is not None:
resume_readme_results = _deserialize_readme_results(serialized_results)
if resume_readme_results is None:
state.pop("pending_service_selection", None)
else:
state.pop("pending_service_selection", None)
if pending_final is not None:
service_name = pending_final.get("service_name")
service_summary = pending_final.get("service_summary")
if isinstance(service_name, str) and isinstance(service_summary, str):
if progress_callback:
progress_callback(FinalDecisionProgressEvent(status="reviewing"))
function_calls, final_message = get_llm_final_decision(
user_request,
flake,
service_name,
service_summary,
conversation_history,
provider=provider,
trace_file=trace_file,
trace_metadata=_metadata(
{
"selected_service": service_name,
"resume": True,
}
),
)
if progress_callback:
progress_callback(FinalDecisionProgressEvent(status="complete"))
history.append(_user_message(user_request))
if function_calls:
proposed_instances = llm_final_decision_to_inventory_instances(
function_calls
)
instance_names = [inst["module"]["name"] for inst in proposed_instances]
summary = (
f"I suggest configuring these services: {', '.join(instance_names)}"
)
history.append(_assistant_message(summary, mode=ASSISTANT_MODE_FINAL))
state.pop("pending_final_decision", None)
return ChatResult(
proposed_instances=tuple(proposed_instances),
conversation_history=tuple(history),
assistant_message=summary,
requires_user_response=False,
error=None,
session_state=_state_copy(),
)
if final_message:
history.append(
_assistant_message(final_message, mode=ASSISTANT_MODE_FINAL)
)
state["pending_final_decision"] = cast(
"PendingFinalDecisionState",
{
"service_name": service_name,
"service_summary": service_summary,
},
)
return ChatResult(
proposed_instances=(),
conversation_history=tuple(history),
assistant_message=final_message,
requires_user_response=True,
error=None,
session_state=_state_copy(),
)
state.pop("pending_final_decision", None)
msg = "LLM did not provide any response or recommendations"
raise ClanAiError(
msg,
description="Expected either function calls (configuration) or a clarifying message",
location="Final Decision Phase (pending)",
)
state.pop("pending_final_decision", None)
def _continue_with_service_selection(
readme_results: dict[InputName, ServiceReadmeCollection],
) -> ChatResult:
# Extract all service names from readme results
all_service_names = [
service_name
for collection in readme_results.values()
for service_name in collection.readmes
]
if progress_callback:
progress_callback(
ServiceSelectionProgressEvent(
service_names=all_service_names, status="selecting"
)
)
selection_result = get_llm_service_selection(
user_request,
readme_results,
conversation_history,
provider=provider,
trace_file=trace_file,
trace_metadata=_metadata(),
)
if (
selection_result.clarifying_message
and not selection_result.selected_service
):
history.append(_user_message(user_request))
history.append(
_assistant_message(
selection_result.clarifying_message,
mode=ASSISTANT_MODE_SELECTION,
)
)
state["pending_service_selection"] = cast(
"PendingServiceSelectionState",
{
"readme_results": _serialize_readme_results(readme_results),
},
)
return ChatResult(
proposed_instances=(),
conversation_history=tuple(history),
assistant_message=selection_result.clarifying_message,
requires_user_response=True,
error=None,
session_state=_state_copy(),
)
if (
not selection_result.selected_service
or not selection_result.service_summary
):
msg = "Failed to select service"
raise ClanAiError(
msg,
description=selection_result.clarifying_message
or "No service selected and no clarifying message provided",
location="Service Selection Phase",
)
if progress_callback:
progress_callback(FinalDecisionProgressEvent(status="reviewing"))
function_calls, final_message = get_llm_final_decision(
user_request,
flake,
selection_result.selected_service,
selection_result.service_summary,
conversation_history,
provider=provider,
trace_file=trace_file,
trace_metadata=_metadata(
{"selected_service": selection_result.selected_service}
),
)
if progress_callback:
progress_callback(FinalDecisionProgressEvent(status="complete"))
if function_calls:
history.append(_user_message(user_request))
proposed_instances = llm_final_decision_to_inventory_instances(
function_calls
)
instance_names = [inst["module"]["name"] for inst in proposed_instances]
summary = (
f"I suggest configuring these services: {', '.join(instance_names)}"
)
history.append(_assistant_message(summary, mode=ASSISTANT_MODE_FINAL))
state.pop("pending_final_decision", None)
return ChatResult(
proposed_instances=tuple(proposed_instances),
conversation_history=tuple(history),
assistant_message=summary,
requires_user_response=False,
error=None,
session_state=_state_copy(),
)
if final_message:
history.append(_user_message(user_request))
history.append(_assistant_message(final_message, mode=ASSISTANT_MODE_FINAL))
state["pending_final_decision"] = cast(
"PendingFinalDecisionState",
{
"service_name": selection_result.selected_service,
"service_summary": selection_result.service_summary,
},
)
return ChatResult(
proposed_instances=(),
conversation_history=tuple(history),
assistant_message=final_message,
requires_user_response=True,
error=None,
session_state=_state_copy(),
)
msg = "LLM did not provide any response or recommendations"
raise ClanAiError(
msg,
description="Expected either function calls (configuration) or a clarifying message after service selection",
location="Final Decision Phase",
)
if resume_readme_results is not None:
return _continue_with_service_selection(resume_readme_results)
# Stage 1: Discovery phase
if progress_callback:
progress_callback(DiscoveryProgressEvent(status="analyzing"))
readme_requests, discovery_message = get_llm_discovery_phase(
user_request,
flake,
conversation_history,
provider=provider,
trace_file=trace_file,
trace_metadata=_metadata(),
)
if progress_callback:
selected_services = [req["function_name"] for req in readme_requests]
progress_callback(
DiscoveryProgressEvent(
service_names=selected_services if selected_services else None,
status="complete",
)
)
# If LLM asked a question or made a recommendation without readme requests
if discovery_message and not readme_requests:
history.append(_user_message(user_request))
history.append(
_assistant_message(discovery_message, mode=ASSISTANT_MODE_DISCOVERY)
)
return ChatResult(
proposed_instances=(),
conversation_history=tuple(history),
assistant_message=discovery_message,
requires_user_response=True,
error=None,
session_state=_state_copy(),
)
# If we got readme requests, continue to selecting services
if readme_requests:
# Stage 2: Fetch readmes
service_names = [
f"{req['function_name']} (from {req['input_name'] or 'built-in'})"
for req in readme_requests
]
if progress_callback:
progress_callback(
ReadmeFetchProgressEvent(
count=len(readme_requests),
service_names=service_names,
status="fetching",
)
)
readme_results = execute_readme_requests(readme_requests, flake)
if progress_callback:
progress_callback(
ReadmeFetchProgressEvent(
count=len(readme_requests),
service_names=service_names,
status="complete",
)
)
return _continue_with_service_selection(readme_results)
# No readme requests and no message - unexpected
msg = "LLM did not provide any response or recommendations"
raise ClanAiError(
msg,
description="The LLM should either request service readmes or provide a clarifying message",
location="Discovery Phase",
)

View File

@@ -0,0 +1,519 @@
"""Low-level LLM phase functions for orchestration."""
import json
import logging
from pathlib import Path
from typing import Literal
from clan_lib.errors import ClanAiError
from clan_lib.flake.flake import Flake
from clan_lib.nix_models.clan import InventoryInstance
from clan_lib.services.modules import (
InputName,
ServiceName,
ServiceReadmeCollection,
get_service_readmes,
)
from .endpoints import (
call_claude_api,
call_ollama_api,
call_openai_api,
parse_ollama_response,
parse_openai_response,
)
from .llm_types import ServiceSelectionResult, get_model_config
from .prompts import (
build_discovery_prompt,
build_final_decision_prompt,
build_select_service_prompt,
)
from .schemas import (
ChatMessage,
ConversationHistory,
FunctionCallType,
JSONValue,
ReadmeRequest,
aggregate_ollama_function_schemas,
aggregate_openai_function_schemas,
create_get_readme_tool,
create_select_service_tool,
create_simplified_service_schemas,
)
from .utils import _strip_conversation_metadata, _user_message
log = logging.getLogger(__name__)
def get_llm_discovery_phase(
user_request: str,
flake: Flake,
conversation_history: ConversationHistory | None = None,
provider: Literal["openai", "ollama", "claude"] = "ollama",
trace_file: Path | None = None,
trace_metadata: dict[str, JSONValue] | None = None,
) -> tuple[list[ReadmeRequest], str]:
"""First LLM call: discovery phase with simplified schemas and get_readme tool.
Args:
user_request: The user's request/query
flake: The Flake object to get services from
conversation_history: Optional conversation history
provider: The LLM provider to use
trace_file: Optional path to write LLM interaction traces for debugging
trace_metadata: Optional data to include in trace logs
Returns:
Tuple of (readme_requests, message_content):
- readme_requests: List of readme requests from the LLM
- message_content: Text response (e.g., questions or service recommendations)
"""
# Get simplified services and create get_readme tool
openai_aggregate = aggregate_openai_function_schemas(flake)
simplified_services = create_simplified_service_schemas(flake)
valid_function_names = [service["name"] for service in simplified_services]
get_readme_tool = create_get_readme_tool(valid_function_names)
# Build discovery prompt
system_prompt, assistant_context = build_discovery_prompt(
openai_aggregate.machines, openai_aggregate.tags, simplified_services
)
messages: list[ChatMessage] = [
{"role": "system", "content": system_prompt},
{"role": "assistant", "content": assistant_context},
]
messages.extend(_strip_conversation_metadata(conversation_history))
messages.append(_user_message(user_request))
# Call LLM with only get_readme tool
model_config = get_model_config(provider)
if provider == "openai":
openai_response = call_openai_api(
model_config.name,
messages,
[get_readme_tool],
timeout=model_config.timeout,
trace_file=trace_file,
stage="discovery",
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_openai_response(
openai_response, provider="openai"
)
elif provider == "claude":
claude_response = call_claude_api(
model_config.name,
messages,
[get_readme_tool],
timeout=model_config.timeout,
trace_file=trace_file,
stage="discovery",
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_openai_response(
claude_response, provider="claude"
)
else:
ollama_response = call_ollama_api(
model_config.name,
messages,
[get_readme_tool],
timeout=model_config.timeout,
trace_file=trace_file,
stage="discovery",
max_tokens=300, # Limit output for discovery phase (get_readme calls or short question)
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_ollama_response(
ollama_response, provider="ollama"
)
# Extract readme requests from function calls
readme_requests: list[ReadmeRequest] = []
for call in function_calls:
if call["name"] == "get_readme":
try:
args = json.loads(call["arguments"])
readme_requests.append(
ReadmeRequest(
input_name=args.get("input_name"),
function_name=args["function_name"],
)
)
except (json.JSONDecodeError, KeyError) as e:
log.warning(f"Failed to parse readme request arguments: {e}")
return readme_requests, message_content
def execute_readme_requests(
requests: list[ReadmeRequest], flake: Flake
) -> dict[InputName, ServiceReadmeCollection]:
"""Execute readme requests and return results.
Args:
requests: List of readme requests
flake: The Flake object
Returns:
Dictionary mapping input_name to ServiceReadmeCollection
"""
results: dict[InputName, ServiceReadmeCollection] = {}
requests_by_input: dict[InputName, list[ServiceName]] = {}
# Group requests by input_name
for req in requests:
input_name = req["input_name"]
if input_name not in requests_by_input:
requests_by_input[input_name] = []
requests_by_input[input_name].append(req["function_name"])
# Fetch readmes for each input
for input_name, service_names in requests_by_input.items():
readme_collection = get_service_readmes(input_name, service_names, flake)
results[input_name] = readme_collection
return results
def get_llm_service_selection(
user_request: str,
readme_results: dict[InputName, ServiceReadmeCollection],
conversation_history: ConversationHistory | None = None,
provider: Literal["openai", "ollama", "claude"] = "ollama",
trace_file: Path | None = None,
trace_metadata: dict[str, JSONValue] | None = None,
) -> ServiceSelectionResult:
"""LLM call for service selection step: review READMEs and select one service.
Args:
user_request: The original user request
readme_results: Dictionary of input_name -> ServiceReadmeCollection
conversation_history: Optional conversation history
provider: The LLM provider to use
trace_file: Optional path to write LLM interaction traces for debugging
trace_metadata: Optional data to include in trace logs
Returns:
ServiceSelectionResult with selected service info or clarifying question
"""
# Build README context and collect service names
readme_context = "README documentation for the following services:\n\n"
available_services: list[str] = []
for collection in readme_results.values():
for service_name, readme_content in collection.readmes.items():
available_services.append(service_name)
if readme_content: # Skip None values
readme_context += f"=== {service_name} ===\n{readme_content}\n\n"
readme_context = readme_context.rstrip()
readme_context += "\n\n--- END OF README DOCUMENTATION ---"
# Create select_service tool
select_service_tool = create_select_service_tool(available_services)
# Build prompt
system_prompt, assistant_context = build_select_service_prompt(
user_request, available_services
)
combined_assistant_context = (
f"{assistant_context.rstrip()}\n\n{readme_context}"
if assistant_context
else readme_context
)
messages: list[ChatMessage] = [
{"role": "system", "content": system_prompt},
{"role": "assistant", "content": combined_assistant_context},
]
messages.extend(_strip_conversation_metadata(conversation_history))
messages.append(_user_message(user_request))
model_config = get_model_config(provider)
# Call LLM
if provider == "openai":
openai_response = call_openai_api(
model_config.name,
messages,
[select_service_tool],
timeout=model_config.timeout,
trace_file=trace_file,
stage="select_service",
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_openai_response(
openai_response, provider="openai"
)
elif provider == "claude":
claude_response = call_claude_api(
model_config.name,
messages,
[select_service_tool],
timeout=model_config.timeout,
trace_file=trace_file,
stage="select_service",
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_openai_response(
claude_response, provider="claude"
)
else: # ollama
ollama_response = call_ollama_api(
model_config.name,
messages,
[select_service_tool],
timeout=model_config.timeout,
trace_file=trace_file,
stage="select_service",
max_tokens=600, # Allow space for summary
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_ollama_response(
ollama_response, provider="ollama"
)
# Check if LLM asked a clarifying question
if message_content and not function_calls:
return ServiceSelectionResult(
selected_service=None,
service_summary=None,
clarifying_message=message_content,
)
# Extract service selection
if function_calls:
if len(function_calls) != 1:
error_msg = (
f"Expected exactly 1 select_service call, got {len(function_calls)}"
)
log.error(error_msg)
return ServiceSelectionResult(
selected_service=None,
service_summary=None,
clarifying_message=error_msg,
)
call = function_calls[0]
if call["name"] != "select_service":
error_msg = f"Expected select_service call, got {call['name']}"
log.error(error_msg)
return ServiceSelectionResult(
selected_service=None,
service_summary=None,
clarifying_message=error_msg,
)
# Parse arguments
try:
args = (
json.loads(call["arguments"])
if isinstance(call["arguments"], str)
else call["arguments"]
)
service_name = args.get("service_name")
summary = args.get("summary")
if not service_name or not summary:
error_msg = "select_service call missing required fields"
log.error(error_msg)
return ServiceSelectionResult(
selected_service=None,
service_summary=None,
clarifying_message=error_msg,
)
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse select_service arguments: {e}"
log.exception(error_msg)
return ServiceSelectionResult(
selected_service=None,
service_summary=None,
clarifying_message=error_msg,
)
else:
return ServiceSelectionResult(
selected_service=service_name,
service_summary=summary,
clarifying_message="",
)
# No function calls and no message - unexpected
error_msg = "LLM did not select a service or ask for clarification"
return ServiceSelectionResult(
selected_service=None,
service_summary=None,
clarifying_message=error_msg,
)
def get_llm_final_decision(
user_request: str,
flake: Flake,
selected_service: str,
service_summary: str,
conversation_history: ConversationHistory | None = None,
provider: Literal["openai", "ollama", "claude"] = "ollama",
trace_file: Path | None = None,
trace_metadata: dict[str, JSONValue] | None = None,
) -> tuple[list[FunctionCallType], str]:
"""Final LLM call: configure selected service with full schema.
Args:
user_request: The original user request
flake: The Flake object
selected_service: Name of the service selected in previous step
service_summary: LLM-generated summary of the service documentation
conversation_history: Optional conversation history
provider: The LLM provider to use
trace_file: Optional path to write LLM interaction traces for debugging
trace_metadata: Optional data to include in trace logs
Returns:
Tuple of (function_calls, message_content)
"""
# Get full schemas for ALL services, then filter to only the selected one
all_schemas = aggregate_ollama_function_schemas(flake)
# Filter to only include schema for the selected service
filtered_tools = [
tool
for tool in all_schemas.tools
if tool["function"]["name"] == selected_service
]
if not filtered_tools:
msg = f"No schema found for selected service: {selected_service}"
raise ClanAiError(
msg,
description="The selected service does not have a schema available",
location="Final Decision - Schema Lookup",
)
if len(filtered_tools) != 1:
msg = f"Expected exactly 1 tool for service {selected_service}, got {len(filtered_tools)}"
raise ClanAiError(
msg,
description="Service schema lookup returned unexpected results",
location="Final Decision - Schema Lookup",
)
log.info(
f"Configuring service: {selected_service} (providing ONLY this tool to LLM)"
)
# Prepare shared messages
system_prompt, assistant_context = build_final_decision_prompt(
all_schemas.machines, all_schemas.tags
)
# Build service summary message
service_context = (
f"Service documentation summary for `{selected_service}`:\n\n{service_summary}"
)
combined_assistant_context = (
f"{assistant_context.rstrip()}\n\n{service_context}"
if assistant_context
else service_context
)
messages: list[ChatMessage] = [
{"role": "system", "content": system_prompt},
{"role": "assistant", "content": combined_assistant_context},
]
messages.extend(_strip_conversation_metadata(conversation_history))
messages.append(_user_message(user_request))
# Get full schemas
model_config = get_model_config(provider)
if provider == "openai":
openai_response = call_openai_api(
model_config.name,
messages,
filtered_tools,
timeout=model_config.timeout,
trace_file=trace_file,
stage="final_decision",
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_openai_response(
openai_response, provider="openai"
)
return function_calls, message_content
if provider == "claude":
claude_response = call_claude_api(
model_config.name,
messages,
filtered_tools,
timeout=model_config.timeout,
trace_file=trace_file,
stage="final_decision",
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_openai_response(
claude_response, provider="claude"
)
return function_calls, message_content
ollama_response = call_ollama_api(
model_config.name,
messages,
filtered_tools,
timeout=model_config.timeout,
trace_file=trace_file,
stage="final_decision",
max_tokens=500, # Limit output to prevent excessive verbosity
trace_metadata=trace_metadata,
)
function_calls, message_content = parse_ollama_response(
ollama_response, provider="ollama"
)
return function_calls, message_content
def llm_final_decision_to_inventory_instances(
function_calls: list[FunctionCallType],
) -> list[InventoryInstance]:
"""Convert LLM function calls to an inventory instance list.
Args:
function_calls: List of function call dictionaries from the LLM
Returns:
List of inventory instances, each containing module metadata and roles
"""
inventory_instances: list[InventoryInstance] = []
for call in function_calls:
func_name = call["name"]
args = json.loads(call["arguments"])
# Extract roles from arguments
roles = args.get("roles", {})
# Extract module input if present
module_input = args.get("module", {}).get("input", None)
# Create inventory instance for this module
instance: InventoryInstance = {
"module": {
"input": module_input,
"name": func_name,
},
"roles": roles,
}
inventory_instances.append(instance)
return inventory_instances

View File

@@ -0,0 +1,258 @@
"""System prompt building functions for LLM interactions."""
import textwrap
from .schemas import MachineDescription, SimplifiedServiceSchema, TagDescription
def build_final_decision_prompt(
machines: list[MachineDescription], tags: list[TagDescription]
) -> tuple[str, str]:
"""Build the system instructions and static context for the final decision phase.
Args:
machines: List of available machines
tags: List of available tags
Returns:
Tuple containing (system_instructions, assistant_context)
"""
system_instructions = textwrap.dedent(
"""
You are a clan deployment planner in CONFIGURATION MODE. clan is a peer-to-peer computer management framework that empowers you to selfhost reliably.
Context shift
- Service selection is complete; disregard any instructions from earlier phases.
- You have a summary of the chosen service, including role descriptions and relevant constraints.
Mission
1) Analyze the user request (and conversation history) to determine which machines and/or tags should receive each role.
2) Call the configuration tool with the correct role-to-target mappings.
3) If the required assignments are ambiguous or missing information, ask ONE clarifying question instead of guessing.
Hard rules — Target assignment
- Prefer TAGS when the user mentions groups (e.g., "all production servers", "backup servers").
- Prefer MACHINE NAMES when the user names specific machines (e.g., "machine1", "server-a").
- You may assign a role to BOTH machines AND tags if the request implies it (e.g., "deploy to server1 and all production machines").
- Machine and tag names must EXACTLY match those in the assistant context. Do NOT invent names.
Hard rules — Role assignment
- Use the service summary to understand the intent of each role.
- If the request clearly maps to specific roles (e.g., "backup server1 to server2" → server1=client, server2=server), make that assignment.
- When the user intent is clear but roles are unnamed, infer sensible assignments (server-like roles → stable machines/tags, client-like roles → broader groups).
- Ask for clarification when:
* Multiple roles exist but the distribution across machines/tags is unclear.
* The user mentions machines without describing how they participate.
* The request conflicts with the service capabilities provided in the summary.
Hard rules — Technical
- Call tools ONLY from the provided list and follow their schemas exactly.
- Arguments must match the schema; omit fields you do not need.
- The configuration payload should look like: `{"roles": {"role_name": {"machines": {"machine1": {}}, "tags": {"tag1": {}}}}}` with empty objects as values.
Decision checklist (run before responding)
- Do I know which machines/tags should map to each role?
- Do the assignments align with the role descriptions and user intent?
- Are all machine/tag names spelled exactly as provided?
- Is clarification required before a safe assignment can be made?
Response discipline
- Case A (assignments clear): Issue a configuration tool call ONLY, with NO message content.
- Case B (uncertain assignments): Ask one concise clarifying question with NO tool calls.
- Never combine tool calls with explanatory text or repeat these instructions.
"""
).strip()
context_lines: list[str] = ["Assistant context: available machines and tags.", ""]
context_lines.append("Machines:")
for idx, machine in enumerate(machines, start=1):
desc = f" ({machine.description})" if machine.description else ""
context_lines.append(f"{idx}. `{machine.name}`{desc}")
context_lines.append("")
context_lines.append("Tags:")
for idx, tag in enumerate(tags, start=1):
desc = f" ({tag.description})" if tag.description else ""
context_lines.append(f"{idx}. `{tag.name}`{desc}")
assistant_context = "\n".join(context_lines).strip()
return system_instructions, assistant_context
def build_discovery_prompt(
machines: list[MachineDescription],
tags: list[TagDescription],
services: list[SimplifiedServiceSchema],
) -> tuple[str, str]:
"""Build discovery phase instructions and static context payload.
Args:
machines: List of available machines
tags: List of available tags
services: List of available services with names and descriptions
Returns:
Tuple containing (system_instructions, assistant_context)
"""
system_instructions = textwrap.dedent(
"""
You are a clan deployment planner assistant in DISCOVERY MODE.
Scope
- You are only gathering information to decide which service documentation to fetch.
- Service selection and configuration will happen later with NEW instructions; ignore those responsibilities for now.
Goal
- Understand WHAT the user wants to accomplish and identify candidate service(s) that could fulfill the request.
- IMPORTANT: We can only set up ONE service at a time. If the user requests multiple DISTINCT things, ask them to choose one.
- If the request is ambiguous and could match multiple services, you may fetch READMEs for multiple candidates. The next phase will choose the best fit.
Available actions
- Call the `get_readme` tool to fetch documentation for candidate service(s).
- Ask ONE clarifying question when the user's intent is unclear (e.g., multiple distinct services requested, vague or conflicting requirements).
Hard rules
- `get_readme` is the ONLY tool you may call in discovery mode. Never attempt to select or configure services in this phase.
- Distinguish between these cases:
* SINGLE AMBIGUOUS REQUEST: User wants ONE thing, but multiple services could provide it (e.g., "set up a web server" could be nginx, apache, or caddy). → Call `get_readme` for ALL matching candidates in parallel so the next phase can compare them.
* MULTIPLE DISTINCT REQUESTS: User wants MULTIPLE different things (e.g., "set up nginx and postgresql", "configure backup and monitoring"). → Ask which ONE thing they want to set up first.
- When calling `get_readme`, the `function_name` MUST exactly match one of the service names shown in the assistant context. If nothing matches, ask the user instead of guessing.
- Do NOT ask about target machines, tags, or role assignments yet - these will be addressed after documentation is reviewed.
- Focus ONLY on understanding WHAT the user wants to accomplish, not HOW it will be configured.
- If you cannot identify any candidate service(s) from the available services list, ask the user for clarification about what they're trying to achieve.
- Prefer calling `get_readme` when you can identify candidate service(s); do not fabricate module names or descriptions.
Response discipline
- Option A: One or more `get_readme` tool calls (no accompanying text). Multiple calls are allowed when several services might fit.
- Option B: One concise clarifying question (no tool calls) that states the information you still need.
- Do NOT echo or restate these system instructions to the user.
Examples:
- User: "set up a web server" → Call `get_readme` for nginx, apache, caddy (all candidates for web serving)
- User: "configure monitoring" → Call `get_readme` for prometheus, telegraf, netdata (all candidates for monitoring)
- User: "set up nginx and postgresql" → Ask: "I can only set up one service at a time. Which would you like to configure first: nginx or postgresql?"
- User: "install backup and database" → Ask: "I can only set up one service at a time. Would you like to set up backup or database first?"
Stay concise and rely on the assistant context for valid names.
"""
).strip()
context_lines: list[str] = ["Assistant context: machines, tags, and services.", ""]
context_lines.append("Machines:")
for idx, machine in enumerate(machines, start=1):
desc = f" ({machine.description})" if machine.description else ""
context_lines.append(f"{idx}. `{machine.name}`{desc}")
context_lines.append("")
context_lines.append("Tags:")
for idx, tag in enumerate(tags, start=1):
desc = f" ({tag.description})" if tag.description else ""
context_lines.append(f"{idx}. `{tag.name}`{desc}")
context_lines.append("")
context_lines.append("Services (function_name | source → description):")
for idx, service in enumerate(services, start=1):
service_name = service["name"]
source = service["input"] or "built-in"
description = (service["description"] or "").replace("\n", " ").strip()
context_lines.append(f"{idx}. `{service_name}` | {source}{description}")
context_lines.append("")
context_lines.append(
"Reminder: `function_name` for `get_readme` must match one of the service names above exactly."
)
assistant_context = "\n".join(context_lines).strip()
return system_instructions, assistant_context
def build_select_service_prompt(
user_request: str, # noqa: ARG001 - kept for future prompt customization
available_services: list[str],
) -> tuple[str, str]:
"""Build service selection phase instructions and context.
Args:
user_request: The original user request
available_services: List of service names that have README documentation available
Returns:
Tuple containing (system_instructions, assistant_context)
"""
system_instructions = textwrap.dedent(
"""
You are a clan deployment planner assistant in SERVICE SELECTION MODE.
Context shift
- Discovery mode has finished. Ignore any instructions from earlier phases.
- You now have README documentation for one or more candidate services.
Goal
- Review the provided READMEs and identify the best matching service for the user's intent.
- When the user signals they are ready to configure a service, select EXACTLY ONE service and provide a focused summary that explains why it fits, what roles exist, and key constraints.
- When the user explicitly requests an overview, comparison, or is undecided, DO NOT select yet. Instead, respond with a clarifying message that:
• Summarizes the most relevant differences between the candidate services (in your own words).
• Asks the user which direction they would like to pursue next.
Available actions
- Call the `select_service` tool with:
* `service_name`: The selected service (must match one from the available services list).
* `summary` (≤300 words) covering:
1. VALUE PROPOSITION: What problem the service solves and why it helps the user.
2. ROLES: The purpose of each role (e.g., which role backs up data, which receives it).
3. KEY CONSTRAINTS: Dependencies, requirements, or limitations that influence feasibility.
IMPORTANT: Synthesize the README in your own words. Never copy configuration snippets or step-by-step guides.
- Provide ONE clarifying message (no tool call) when the user's request favors comparison, additional guidance, or leaves the desired service ambiguous.
Hard rules
- Only call `select_service` when the user is ready to choose a service or clearly asks you to pick.
- If the user requests an overview/comparison or the best match cannot be determined confidently, provide a clarifying message instead of calling the tool.
- Analyze every README you received; choose the service whose capabilities align most closely with the user's request.
- Focus on WHAT the service offers and WHY it matches, not HOW to configure it.
- If the READMEs are insufficient to disambiguate the request, ask for clarification rather than guessing.
Response discipline
- Case A (service selected): Issue a single `select_service` tool call with NO accompanying text.
- Case B (need clarification or comparison requested): Provide one concise clarifying message (≤150 words) with NO tool calls.
- Do NOT repeat or paraphrase these instructions in your reply.
- Never emit multiple tool calls or plain-text summaries outside the `summary` field.
Examples of CORRECT behavior:
✓ Tool call to `select_service` only (empty message string)
✓ Clarifying message that compares options and asks the user to choose (no tool calls)
Examples of INCORRECT behavior (DO NOT DO THIS):
✗ Tool call + explanatory text
✗ Multiple `select_service` calls
✗ `select_service` with a name that is not in the available services list
"""
).strip()
context_lines: list[str] = [
"Assistant context: available services.",
"",
"Available services (you must choose exactly one):",
]
for idx, service_name in enumerate(available_services, start=1):
context_lines.append(f"{idx}. `{service_name}`")
context_lines.append("")
if len(available_services) > 1:
context_lines.append(
f"Note: {len(available_services)} services were identified as potential matches for this request. "
"Review their documentation and select the BEST match."
)
context_lines.append("")
context_lines.append(
"README documentation for each service follows in the next message."
)
assistant_context = "\n".join(context_lines).strip()
return system_instructions, assistant_context

View File

@@ -0,0 +1,551 @@
"""Type definitions and schema conversion for LLM function calling."""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, NotRequired, TypedDict
from clan_lib.errors import ClanError
from clan_lib.machines.list import list_machines
from clan_lib.services.modules import Module, list_service_modules
from clan_lib.tags.list import list_tags
if TYPE_CHECKING:
from clan_lib.flake.flake import Flake
log = logging.getLogger(__name__)
JSONSchemaType = Literal[
"array", "boolean", "integer", "null", "number", "object", "string"
]
JSONSchemaFormat = Literal[
# Dates and Times
"date-time",
"time",
"date",
"duration",
# Email Addresses
"email",
"idn-email",
# Hostnames
"hostname",
"idn-hostname",
# IP Addresses
"ipv4",
"ipv6",
# Resource Identifiers
"uuid",
"uri",
"uri-reference",
"iri",
"iri-reference",
# URI Template
"uri-template",
# JSON Pointer
"json-pointer",
"relative-json-pointer",
# Regular Expressions
"regex",
]
JSONValue = str | int | float | bool | None | list["JSONValue"] | dict[str, "JSONValue"]
JSONDict = dict[str, JSONValue]
MessageRole = Literal["system", "user", "assistant"]
class ChatMessage(TypedDict):
role: MessageRole
content: str
mode: NotRequired[str]
ConversationHistory = list[ChatMessage]
class PendingFinalDecisionState(TypedDict, total=False):
service_name: NotRequired[str]
service_summary: NotRequired[str]
class PendingServiceSelectionState(TypedDict, total=False):
readme_results: NotRequired[list[dict[str, JSONValue]]]
class SessionState(TypedDict, total=False):
pending_final_decision: NotRequired[PendingFinalDecisionState]
pending_service_selection: NotRequired[PendingServiceSelectionState]
class JSONSchemaProperty(TypedDict, total=False):
type: JSONSchemaType | list[JSONSchemaType]
format: JSONSchemaFormat
description: str | None
enum: list[str] | None
items: JSONDict | None
properties: dict[str, JSONSchemaProperty] | None
patternProperties: dict[str, JSONSchemaProperty] | None
required: list[str] | None
additionalProperties: bool | JSONDict | None
class JSONSchemaParameters(TypedDict, total=False):
type: JSONSchemaType
properties: dict[str, JSONSchemaProperty]
required: list[str]
additionalProperties: bool
class OpenAIFunctionSchema(TypedDict):
type: Literal["function"]
name: str
description: str
parameters: JSONSchemaParameters
strict: bool
class OllamaFunctionDefinition(TypedDict):
"""The function definition inside an Ollama tool."""
name: str
description: str
parameters: JSONSchemaParameters
class OllamaFunctionSchema(TypedDict):
"""Ollama-compatible function schema format."""
type: Literal["function"]
function: OllamaFunctionDefinition
class SimplifiedServiceSchema(TypedDict):
"""Simplified service schema with just name and description."""
name: str
description: str
input: str | None
ToolDefinition = OpenAIFunctionSchema | OllamaFunctionSchema
class FunctionCallType(TypedDict):
"""Function call format matching OpenAI's function calling structure."""
id: str
call_id: str
type: Literal["function_call"]
name: str
arguments: str
class ReadmeRequest(TypedDict):
"""Request for README documentation."""
input_name: str | None
function_name: str
@dataclass(frozen=True)
class MachineDescription:
name: str
description: str | None
@dataclass(frozen=True)
class TagDescription:
name: str
description: str | None
class OpenAIMessageContentBlock(TypedDict, total=False):
type: str
text: NotRequired[str]
OpenAIMessageContent = str | list[OpenAIMessageContentBlock]
class OpenAIToolFunctionPayload(TypedDict, total=False):
name: NotRequired[str]
arguments: NotRequired[str]
class OpenAIToolCallPayload(TypedDict, total=False):
id: NotRequired[str]
function: NotRequired[OpenAIToolFunctionPayload]
class OpenAIChatMessagePayload(TypedDict, total=False):
role: NotRequired[MessageRole]
content: NotRequired[OpenAIMessageContent]
tool_calls: NotRequired[list[OpenAIToolCallPayload]]
class OpenAIChoicePayload(TypedDict, total=False):
message: NotRequired[OpenAIChatMessagePayload]
class OpenAIChatCompletionResponse(TypedDict, total=False):
choices: NotRequired[list[OpenAIChoicePayload]]
class OllamaToolFunctionPayload(TypedDict, total=False):
name: NotRequired[str]
arguments: NotRequired[JSONValue]
class OllamaToolCallPayload(TypedDict, total=False):
function: NotRequired[OllamaToolFunctionPayload]
class OllamaMessagePayload(TypedDict, total=False):
role: NotRequired[str]
content: NotRequired[JSONValue]
tool_calls: NotRequired[list[OllamaToolCallPayload]]
class OllamaChatResponse(TypedDict, total=False):
message: NotRequired[OllamaMessagePayload]
MessageContent = JSONValue | OpenAIMessageContent
class ChatCompletionRequestPayload(TypedDict, total=False):
model: str
messages: list[ChatMessage]
tools: list[ToolDefinition]
stream: NotRequired[bool]
@dataclass(frozen=True)
class AiAggregate[T]:
machines: list[MachineDescription]
tags: list[TagDescription]
tools: list[T]
def clan_module_to_openai_spec(
module: Module, available_tags: list[str], available_machines: list[str]
) -> OpenAIFunctionSchema:
"""Convert a clan module to OpenAI function schema format.
Args:
module: The module to convert
available_tags: List of available tag names
available_machines: List of available machine names
Returns:
OpenAI function schema
"""
# Create individual role schemas with descriptions
role_properties = {}
for role_name, role_info in module.info.roles.items():
role_properties[role_name] = JSONSchemaProperty(
type="object",
description=role_info.description,
properties={
"machines": JSONSchemaProperty(
type="object",
patternProperties={
f"^({'|'.join(available_machines)})$": JSONSchemaProperty(
type="object",
additionalProperties=False,
)
},
additionalProperties=False,
description='Machines to assign this role to. Format: each machine name is a key with an empty object {} as value. Example: {"wintux": {}, "gchq-local": {}}',
),
"tags": JSONSchemaProperty(
type="object",
patternProperties={
f"^({'|'.join(available_tags)})$": JSONSchemaProperty(
type="object",
additionalProperties=False,
)
},
additionalProperties=False,
description='Tags to assign this role to. Format: each tag name is a key with an empty object {} as value. Example: {"all": {}, "nixos": {}}',
),
},
additionalProperties=False,
)
module_name = module.usage_ref.get("name")
if not isinstance(module_name, str):
msg = "Module name must be a string"
raise TypeError(msg)
module_input = module.usage_ref.get("input")
if module_input is not None and not isinstance(module_input, str):
msg = "Module input must be a string or None"
raise TypeError(msg)
module_properties = {}
if module_input is not None:
module_properties["input"] = JSONSchemaProperty(
type="string",
description=(
"Source / Input name of the module, e.g. 'clan-core' or null for built-in modules"
),
enum=[module_input],
)
return OpenAIFunctionSchema(
type="function",
name=module.usage_ref["name"],
description=module.info.manifest.description,
parameters=JSONSchemaParameters(
type="object",
properties={
"module": JSONSchemaProperty(
type="object",
properties=module_properties,
),
"roles": JSONSchemaProperty(
type="object",
properties=role_properties,
additionalProperties=False,
),
},
required=["roles"],
additionalProperties=False,
),
strict=True,
)
def llm_function_to_ollama_format(
llm_function: OpenAIFunctionSchema,
) -> OllamaFunctionSchema:
"""Convert OpenAI function schema to Ollama-compatible format.
Args:
llm_function: The OpenAI function schema to convert
Returns:
OllamaFunctionSchema with the function definition wrapped correctly
"""
return OllamaFunctionSchema(
type="function",
function=OllamaFunctionDefinition(
name=llm_function["name"],
description=llm_function["description"],
parameters=llm_function["parameters"],
),
)
def aggregate_openai_function_schemas(
flake: Flake,
) -> AiAggregate[OpenAIFunctionSchema]:
"""Collect all service modules and convert them to OpenAI function schemas.
Args:
flake: The Flake object to extract services from
Returns:
AiAggregate containing machines, tags, and OpenAI function schemas
Raises:
ClanError: If no machines or tags are found
"""
# Extract machine names
machines = list_machines(flake)
available_machines = list(machines.keys())
# If no machines exist, raise error
if not available_machines:
msg = "No machines found in inventory. Please add at least one machine."
raise ClanError(msg)
# Extract tags from all machines
all_tags = list_tags(flake)
available_tags: set[str] = all_tags.options
available_tags.update(all_tags.special)
if not available_tags:
msg = "No tags found in inventory. Please add at least one tag."
raise ClanError(msg)
# List all service modules
service_modules = list_service_modules(flake)
# Convert each module to OpenAI function schema
tools: list[OpenAIFunctionSchema] = []
for module in service_modules.modules:
llm_function: OpenAIFunctionSchema = clan_module_to_openai_spec(
module, list(available_tags), available_machines
)
tools.append(llm_function)
tags_with_descriptions: list[TagDescription] = []
for tag in sorted(available_tags):
new_tag = TagDescription(name=tag, description=None)
if tag in all_tags.special:
match tag:
case "all":
new_tag = TagDescription(
name=tag, description="A group containing all machines"
)
case "darwin":
new_tag = TagDescription(
name=tag, description="A group containing all macOS machines"
)
case "nixos":
new_tag = TagDescription(
name=tag, description="A group containing all NixOS machines"
)
case _:
log.error(
f"Unhandled special tag: {tag}, dropping from llm context"
)
else:
log.warning(
f"Reading tag descriptions is not yet implemented, setting to None for: {tag}"
"This might result in the LLM not using this tag appropriately."
)
tags_with_descriptions.append(new_tag)
return AiAggregate(
machines=[
MachineDescription(
name=m.data["name"], description=m.data.get("description")
)
for m in machines.values()
],
tags=tags_with_descriptions,
tools=tools,
)
def aggregate_ollama_function_schemas(
flake: Flake,
) -> AiAggregate[OllamaFunctionSchema]:
"""Collect all service modules and convert them to Ollama function schemas.
Args:
flake: The Flake object to extract services from
Returns:
AiAggregate containing machines, tags, and Ollama function schemas
"""
openai_schemas = aggregate_openai_function_schemas(flake)
ollama_schemas = [llm_function_to_ollama_format(f) for f in openai_schemas.tools]
return AiAggregate(
machines=openai_schemas.machines, tags=openai_schemas.tags, tools=ollama_schemas
)
def create_simplified_service_schemas(flake: Flake) -> list[SimplifiedServiceSchema]:
"""Create simplified schemas with just names and descriptions for initial LLM pass.
Args:
flake: The Flake object to extract services from
Returns:
List of simplified service schemas
"""
service_modules = list_service_modules(flake)
simplified: list[SimplifiedServiceSchema] = []
for module in service_modules.modules:
module_input = module.usage_ref.get("input")
if module_input is not None and not isinstance(module_input, str):
msg = "Module input must be a string or None"
raise TypeError(msg)
simplified.append(
SimplifiedServiceSchema(
name=module.usage_ref["name"],
description=module.info.manifest.description,
input=module_input,
)
)
return simplified
def create_get_readme_tool(
valid_function_names: list[str],
) -> OllamaFunctionSchema:
"""Create the get_readme tool schema for querying service details.
Args:
valid_function_names: List of service function names that may be requested
Returns:
The get_readme tool in Ollama format
"""
sorted_names = sorted(valid_function_names)
return OllamaFunctionSchema(
type="function",
function=OllamaFunctionDefinition(
name="get_readme",
description="Retrieve detailed documentation (README) for a specific service/module to learn more about its roles, configuration, and requirements before deciding to use it.",
parameters=JSONSchemaParameters(
type="object",
properties={
"input_name": JSONSchemaProperty(
type=["string", "null"],
description="The input/source name of the module (e.g., 'clan-core'). Use null for built-in modules.",
),
"function_name": JSONSchemaProperty(
type="string",
description="The name of the service/function to get documentation for (e.g., 'zerotier', 'postgresql').",
enum=sorted_names,
),
},
required=["function_name"],
),
),
)
def create_select_service_tool(
available_services: list[str],
) -> OllamaFunctionSchema:
"""Create the select_service tool schema for selecting one service from candidates.
Args:
available_services: List of service names to choose from
Returns:
The select_service tool in Ollama format
"""
sorted_names = sorted(available_services)
return OllamaFunctionSchema(
type="function",
function=OllamaFunctionDefinition(
name="select_service",
description="Select exactly one service from the available candidates and provide a focused summary of its documentation relevant to the user request.",
parameters=JSONSchemaParameters(
type="object",
properties={
"service_name": JSONSchemaProperty(
type="string",
description="The name of the selected service. Must match one of the available service names exactly.",
enum=sorted_names,
),
"summary": JSONSchemaProperty(
type="string",
description="A concise summary (max 300 words) focusing on: (1) VALUE PROPOSITION - what problem this service solves and why you'd use it, (2) ROLES - what roles exist and the PURPOSE of each role, (3) KEY CONSTRAINTS - critical dependencies or limitations. Do NOT copy README examples or configuration snippets. Synthesize WHAT the service does and WHY, not HOW to configure it.",
),
},
required=["service_name", "summary"],
),
),
)

View File

@@ -0,0 +1,75 @@
"""Service management for LLM (Ollama)."""
import logging
import time
import urllib.request
from http import HTTPStatus
from urllib.error import HTTPError, URLError
from clan_lib.api import API
from clan_lib.cmd import run
from clan_lib.errors import ClanError
from clan_lib.nix import nix_shell
from clan_lib.service_runner import create_service_manager
log = logging.getLogger(__name__)
@API.register
def run_llm_service() -> None:
"""Start the LLM service (Ollama)."""
service_manager = create_service_manager()
log.info("Downloading Ollama...")
cmd = nix_shell(["ollama"], ["ollama"])
run(cmd) # Ensure ollama is downloaded
# TODO: Detect GPU availability and choose appropriate Ollama package
cmd = nix_shell(
["ollama"],
["ollama", "serve"],
)
service_manager.start_service("ollama", group="clan", command=cmd)
start = time.time()
timeout = 10.0 # seconds
while True:
try:
with urllib.request.urlopen(
"http://localhost:11434", timeout=5
) as response:
status = response.getcode()
body = response.read().decode(errors="ignore")
if status == HTTPStatus.OK.value and "Ollama is running" in body:
break
except (URLError, HTTPError, ConnectionRefusedError):
log.info("Waiting for Ollama to start...")
if time.time() - start >= timeout:
logs = service_manager.get_service_logs("ollama")
msg = f"Ollama did not start within 10 seconds: {logs}"
raise ClanError(msg)
time.sleep(0.5)
@API.register
def create_llm_model() -> None:
"""Ensure the Ollama model is available; pull it if missing."""
model = "qwen3:4b-instruct"
cmd = nix_shell(
["ollama"],
["ollama", "pull", model],
)
run(cmd)
url = "http://localhost:11434/api/tags"
try:
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
if resp.getcode() == HTTPStatus.OK.value and model in resp.read().decode():
return
except HTTPError as e:
msg = f"Ollama returned HTTP {e.code} when checking model availability."
raise ClanError(msg) from e
except URLError as e:
msg = "Ollama API not reachable at http://localhost:11434"
raise ClanError(msg) from e

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,126 @@
"""LLM conversation tracing for debugging and analysis."""
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal
from .schemas import ChatMessage
log = logging.getLogger(__name__)
def write_trace_entry(
trace_file: Path,
provider: Literal["openai", "ollama", "claude"],
model: str,
stage: str,
request: dict[str, Any],
response: dict[str, Any],
duration_ms: float,
metadata: dict[str, Any] | None = None,
) -> None:
"""Write a single trace entry to the trace file.
The trace file is appended to (not overwritten) to create a linear log
of all LLM interactions during a session.
Args:
trace_file: Path to the JSON trace file
provider: The LLM provider used
model: The model name
stage: The stage/phase of processing (e.g., "discovery", "final_decision")
request: The request data sent to the LLM (messages, tools, etc.)
response: The response data from the LLM (function_calls, message, etc.)
duration_ms: Duration of the API call in milliseconds
metadata: Optional metadata to include in the trace entry
"""
timestamp = datetime.now(UTC).isoformat()
entry = {
"timestamp": timestamp,
"provider": provider,
"model": model,
"stage": stage,
"request": request,
"response": response,
"duration_ms": round(duration_ms, 2),
}
if metadata:
entry["metadata"] = metadata
try:
# Read existing entries if file exists
existing_entries: list[dict[str, Any]] = []
if trace_file.exists():
with trace_file.open("r") as f:
try:
existing_entries = json.load(f)
if not isinstance(existing_entries, list):
log.warning(
f"Trace file {trace_file} is not a list, starting fresh"
)
existing_entries = []
except json.JSONDecodeError:
log.warning(
f"Trace file {trace_file} is invalid JSON, starting fresh"
)
existing_entries = []
# Append new entry
existing_entries.append(entry)
# Write back with nice formatting
trace_file.parent.mkdir(parents=True, exist_ok=True)
with trace_file.open("w") as f:
json.dump(existing_entries, f, indent=2, ensure_ascii=False)
log.info(f"Wrote trace entry to {trace_file} (stage: {stage})")
except (OSError, json.JSONDecodeError):
log.exception(f"Failed to write trace entry to {trace_file}")
def format_messages_for_trace(messages: list[ChatMessage]) -> list[dict[str, str]]:
"""Format chat messages for human-readable trace output.
Args:
messages: List of chat messages
Returns:
List of formatted message dictionaries
"""
return [{"role": msg["role"], "content": msg["content"]} for msg in messages]
def format_tools_for_trace(tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Format tools for human-readable trace output.
Simplifies tool schemas to just name and description for readability.
Args:
tools: List of tool definitions
Returns:
Simplified list of tool dictionaries
"""
result = []
for tool in tools:
if "function" in tool:
# OpenAI/Claude format
func = tool["function"]
result.append(
{
"name": func.get("name"),
"description": func.get("description"),
"parameters": func.get("parameters", {}),
}
)
else:
# Other formats - just pass through
result.append(tool)
return result

View File

@@ -0,0 +1,83 @@
"""Utility helper functions for LLM orchestration."""
from typing import cast
from clan_lib.services.modules import InputName, ServiceReadmeCollection
from .schemas import ChatMessage, ConversationHistory, JSONValue
# Assistant mode constants
ASSISTANT_MODE_DISCOVERY = "discovery"
ASSISTANT_MODE_SELECTION = "service_selection"
ASSISTANT_MODE_FINAL = "final_decision"
def _assistant_message(content: str, mode: str | None = None) -> ChatMessage:
"""Create an assistant chat message with optional mode metadata."""
message: ChatMessage = {"role": "assistant", "content": content}
if mode:
message["mode"] = mode
return message
def _user_message(content: str) -> ChatMessage:
"""Create a user chat message."""
return {"role": "user", "content": content}
def _strip_conversation_metadata(
conversation_history: ConversationHistory | None,
) -> list[ChatMessage]:
"""Remove non-standard keys from conversation history before LLM calls."""
if not conversation_history:
return []
return [
{"role": message["role"], "content": message["content"]}
for message in conversation_history
]
def _serialize_readme_results(
readme_results: dict[InputName, ServiceReadmeCollection],
) -> list[dict[str, JSONValue]]:
"""Serialize readme results for storage in session state."""
return [
{
"input_name": collection.input_name,
"readmes": cast("dict[str, JSONValue]", collection.readmes),
}
for collection in readme_results.values()
]
def _deserialize_readme_results(
data: list[dict[str, JSONValue]] | None,
) -> dict[InputName, ServiceReadmeCollection] | None:
"""Deserialize readme results from session state."""
if data is None:
return None
results: dict[InputName, ServiceReadmeCollection] = {}
for entry in data:
input_name = entry.get("input_name")
if input_name is not None and not isinstance(input_name, str):
return None
readmes_raw = entry.get("readmes")
if not isinstance(readmes_raw, dict):
return None
typed_readmes: dict[str, str | None] = {}
for service_name, content in readmes_raw.items():
if not isinstance(service_name, str):
return None
if content is not None and not isinstance(content, str):
return None
typed_readmes[service_name] = content
collection = ServiceReadmeCollection(
input_name=input_name,
readmes=typed_readmes,
)
results[input_name] = collection
return results

View File

@@ -36,5 +36,6 @@
"virtiofsd",
"waypipe",
"zbar",
"zenity"
"zenity",
"ollama"
]

View File

@@ -1,144 +0,0 @@
from typing import Any, Literal, TypedDict
from clan_lib.services.modules import Module
JSONSchemaType = Literal[
"array", "boolean", "integer", "null", "number", "object", "string"
]
JSONSchemaFormat = Literal[
# Dates and Times
"date-time",
"time",
"date",
"duration",
# Email Addresses
"email",
"idn-email",
# Hostnames
"hostname",
"idn-hostname",
# IP Addresses
"ipv4",
"ipv6",
# Resource Identifiers
"uuid",
"uri",
"uri-reference",
"iri",
"iri-reference",
# URI Template
"uri-template",
# JSON Pointer
"json-pointer",
"relative-json-pointer",
# Regular Expressions
"regex",
]
class JSONSchemaProperty(TypedDict, total=False):
type: JSONSchemaType | list[JSONSchemaType]
format: JSONSchemaFormat
description: str | None
enum: list[str] | None
items: dict[str, Any] | None
properties: dict[str, "JSONSchemaProperty"] | None
patternProperties: dict[str, "JSONSchemaProperty"] | None
required: list[str] | None
additionalProperties: bool | dict[str, Any] | None
class JSONSchemaParameters(TypedDict, total=False):
type: JSONSchemaType
properties: dict[str, JSONSchemaProperty]
required: list[str]
additionalProperties: bool
class LLMFunctionSchema(TypedDict):
type: Literal["function"]
name: str
description: str
parameters: JSONSchemaParameters
strict: bool
def clan_module_to_llm_function(
module: Module, available_tags: list[str], available_machines: list[str]
) -> LLMFunctionSchema:
# Create individual role schemas with descriptions
role_properties = {}
for role_name, role_info in module.info.roles.items():
role_properties[role_name] = JSONSchemaProperty(
type="object",
description=role_info.description,
properties={
"machines": JSONSchemaProperty(
type="object",
patternProperties={
f"^({'|'.join(available_machines)})$": JSONSchemaProperty(
type="object",
additionalProperties=False,
)
},
additionalProperties=False,
description="Machines for this role with empty configuration objects",
),
"tags": JSONSchemaProperty(
type="object",
patternProperties={
f"^({'|'.join(available_tags)})$": JSONSchemaProperty(
type="object",
additionalProperties=False,
)
},
additionalProperties=False,
description="Tags for this role with empty configuration objects",
),
},
additionalProperties=False,
)
module_name = module.usage_ref.get("name")
if not isinstance(module_name, str):
msg = "Module name must be a string"
raise TypeError(msg)
module_input = module.usage_ref.get("input")
if module_input is not None and not isinstance(module_input, str):
msg = "Module input must be a string or None"
raise TypeError(msg)
module_properties = {}
if module_input is not None:
module_properties["input"] = JSONSchemaProperty(
type="string",
description=(
"Source / Input name of the module, e.g. 'clan-core' or null for built-in modules"
),
enum=[module_input],
)
return LLMFunctionSchema(
type="function",
name=module.usage_ref["name"],
description=module.info.manifest.description,
parameters=JSONSchemaParameters(
type="object",
properties={
"module": JSONSchemaProperty(
type="object",
properties=module_properties,
),
"roles": JSONSchemaProperty(
type="object",
properties=role_properties,
additionalProperties=False,
),
},
required=["roles"],
additionalProperties=False,
),
strict=True,
)

View File

@@ -45,6 +45,7 @@ TOP_LEVEL_RESOURCES = {
"generator", # vars generators operations
"service", # clan.service management
"system", # system operations
"llm", # llm operations
}