clan_lib: add 'get_metrics' API endpoint

This commit is contained in:
Qubasa
2025-08-20 20:17:14 +02:00
parent 56d9256c02
commit ea1e470502
3 changed files with 145 additions and 0 deletions

View File

@@ -0,0 +1,71 @@
import json
import logging
import urllib.request
from base64 import b64encode
from collections.abc import Iterator
from typing import Any, TypedDict, cast
from clan_cli.vars.get import get_machine_var
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_lib.ssh.host import Host
log = logging.getLogger(__name__)
class MetricSample(TypedDict):
fields: dict[str, Any]
name: str
tags: dict[str, str]
timestamp: int
def get_metrics(
machine: Machine,
target_host: Host,
) -> Iterator[MetricSample]:
"""Fetch Prometheus metrics from telegraf and return them as streaming metrics.
Args:
machine: The Machine instance to check.
target_host: Remote instance representing the target host.
Returns:
Iterator[dict[str, Any]]: An iterator yielding parsed metric dictionaries line by line.
"""
# Example: fetch Prometheus metrics with basic auth
url = f"http://{target_host.address}:9990"
username = "prometheus"
var_name = "telegraf/password"
password_var = get_machine_var(machine, var_name)
if not password_var.exists:
msg = (
f"Missing required var '{var_name}' for machine '{machine.name}'.\n"
"Ensure the 'monitoring' clanService is enabled and run `clan machines update {machine.name}`."
"For more information, see: https://docs.clan.lol/reference/clanServices/monitoring/"
)
raise ClanError(msg)
password = password_var.value.decode("utf-8")
credentials = f"{username}:{password}"
encoded_credentials = b64encode(credentials.encode("utf-8")).decode("utf-8")
headers = {"Authorization": f"Basic {encoded_credentials}"}
req = urllib.request.Request(url, headers=headers)
try:
response = urllib.request.urlopen(req)
for line in response:
line_str = line.decode("utf-8").strip()
if line_str:
try:
yield cast(MetricSample, json.loads(line_str))
except json.JSONDecodeError:
log.warning(f"Skipping invalid JSON line: {line_str}")
continue
except Exception as e:
msg = (
f"Failed to fetch Prometheus metrics from {url} for machine '{machine.name}': {e}\n"
"Ensure the telegraf.service is running and accessible."
)
raise ClanError(msg) from e

View File

@@ -0,0 +1,74 @@
import logging
from dataclasses import dataclass
from clan_lib.api import API
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
from clan_lib.metrics.telegraf import get_metrics
from clan_lib.nix import nix_eval
from clan_lib.ssh.localhost import LocalHost
from clan_lib.ssh.remote import Remote
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class NixOSSystems:
current_system: str
booted_system: str
current_kernel: str
booted_kernel: str
def get_nixos_systems(
machine: Machine, target_host: Remote | LocalHost
) -> NixOSSystems | None:
"""Get the nixos systems from the target host."""
parsed_metrics = get_metrics(machine, target_host)
for metric in parsed_metrics:
if metric["name"] == "nixos_systems":
return NixOSSystems(
current_system=metric["tags"]["current_system"],
booted_system=metric["tags"]["booted_system"],
current_kernel=metric["tags"]["current_kernel"],
booted_kernel=metric["tags"]["booted_kernel"],
)
return None
@API.register
def check_machine_up_to_date(
machine: Machine,
target_host: Remote | LocalHost,
) -> bool:
"""Check if a machine needs an update.
Args:
machine: The Machine instance to check.
target_host: Optional Remote or LocalHost instance representing the target host.
Returns:
bool: True if the machine needs an update, False otherwise.
"""
nixos_systems = get_nixos_systems(machine, target_host)
if nixos_systems is None:
msg = "Failed to find 'current_system_present' metric in telegraf logs."
raise ClanError(msg)
machine.info(f"Getting system outPath from {machine.name}...")
git_out_path = nix_eval(
[
f"{machine.flake}#nixosConfigurations.'{machine.name}'.config.system.build.toplevel.outPath"
]
)
log.debug(
f"Checking if {machine.name} needs an update:\n"
f"Machine outPath: {nixos_systems.current_system}\n"
f"Git outPath : {git_out_path}\n"
)
return git_out_path != nixos_systems.current_system