Merge pull request 'clan machines generations' (#4848) from Qubasa/clan-core:add_generate_cli into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4848
This commit is contained in:
Luis Hebendanz
2025-09-19 23:30:19 +00:00
20 changed files with 573 additions and 106 deletions

View File

@@ -549,11 +549,11 @@ def main() -> None:
try:
args.func(args)
except ClanError:
except ClanError as e:
if debug:
log.exception("Exited with error")
else:
log.exception("Exited with error")
log.error(e) # noqa: TRY400
sys.exit(1)
except KeyboardInterrupt as ex:
log.warning("Interrupted by user", exc_info=ex)

View File

@@ -3,6 +3,7 @@ import argparse
from .create import register_create_parser
from .delete import register_delete_parser
from .generations import register_generations_parser
from .hardware import register_update_hardware_config
from .install import register_install_parser
from .list import register_list_parser
@@ -145,3 +146,19 @@ For more detailed information, visit: https://docs.clan.lol/guides/getting-start
formatter_class=argparse.RawTextHelpFormatter,
)
register_install_parser(install_parser)
generations_parser = subparser.add_parser(
"generations",
help="list generations of machines",
description="list generations of machines",
epilog=(
"""
List NixOS generations of the machine.
The generations are the different versions of the machine that are installed on the target host.
Examples:
$ clan generations [MACHINE]
"""
),
formatter_class=argparse.RawTextHelpFormatter,
)
register_generations_parser(generations_parser)

View File

@@ -0,0 +1,285 @@
import argparse
import logging
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal, TypeVar, get_args
from clan_lib.async_run import AsyncContext, AsyncFuture, AsyncOpts, AsyncRuntime
from clan_lib.errors import ClanError, text_heading
from clan_lib.flake import require_flake
from clan_lib.machines.generations import MachineGeneration, get_machine_generations
from clan_lib.machines.machines import Machine
from clan_lib.metrics.telegraf import MonitoringNotEnabledError
from clan_lib.metrics.version import check_machine_up_to_date
from clan_lib.network.network import get_best_remote
from clan_lib.ssh.host_key import HostKeyCheck
from clan_lib.ssh.localhost import LocalHost
from clan_lib.ssh.remote import Remote
from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
complete_tags,
)
from clan_cli.machines.update import get_machines_for_update
if TYPE_CHECKING:
from clan_lib.ssh.host import Host
log = logging.getLogger(__name__)
UpToDateType = Literal["up-to-date", "out-of-date", "unknown"]
def print_generations(
generations: list[MachineGeneration],
needs_update: UpToDateType = "unknown",
) -> None:
headers = [
"Generation (Up-To-Date)",
"Date",
"NixOS Version",
"Kernel Version",
]
rows = []
for gen in generations:
gen_marker = f" ← ({needs_update})" if gen.current else ""
gen_str = f"{gen.generation}{gen_marker}"
row = [
gen_str,
gen.date,
gen.nixos_version,
gen.kernel_version,
]
rows.append(row)
elided_rows = rows
col_widths = [
max(len(str(item)) for item in [header] + [row[i] for row in elided_rows])
for i, header in enumerate(headers)
]
# Print header
header_row = " | ".join(
header.ljust(col_widths[i]) for i, header in enumerate(headers)
)
print(header_row)
print("-+-".join("-" * w for w in col_widths))
# Print rows
for row in elided_rows:
print(" | ".join(row[i].ljust(col_widths[i]) for i in range(len(headers))))
print()
def print_summary_table(
machine_data: dict[Machine, tuple[list[MachineGeneration], UpToDateType]],
) -> None:
print(text_heading("Current Generations Summary"))
headers = ["Machine", "Current Generation", "Date", "NixOS Version", "Up-To-Date"]
rows = []
for machine, (generations, needs_update) in machine_data.items():
current_gen = None
for gen in generations:
if gen.current:
current_gen = gen
break
if current_gen is None:
continue
status = needs_update
row = [
machine.name,
str(current_gen.generation),
current_gen.date,
current_gen.nixos_version,
status,
]
rows.append(row)
if not rows:
print("Couldn't retrieve data from any machine.")
return
col_widths = [
max(len(str(item)) for item in [header] + [row[i] for row in rows])
for i, header in enumerate(headers)
]
# Print header
header_row = " | ".join(
header.ljust(col_widths[i]) for i, header in enumerate(headers)
)
print(header_row)
print("-+-".join("-" * w for w in col_widths))
# Print rows
for row in rows:
print(" | ".join(row[i].ljust(col_widths[i]) for i in range(len(headers))))
print()
@dataclass(frozen=True)
class MachineVersionData:
generations: AsyncFuture[list[MachineGeneration]]
machine_update: AsyncFuture[bool] | None
def generations_command(args: argparse.Namespace) -> None:
flake = require_flake(args.flake)
machines_to_update = get_machines_for_update(flake, args.machines, args.tags)
if args.target_host is not None and len(machines_to_update) > 1:
msg = "Target Host can only be set for one machines"
raise ClanError(msg)
host_key_check = args.host_key_check
machine_generations: dict[Machine, MachineVersionData] = {}
with AsyncRuntime() as runtime:
for machine in machines_to_update:
if args.target_host:
target_host: Host | None = None
if args.target_host == "localhost":
target_host = LocalHost()
else:
target_host = Remote.from_ssh_uri(
machine_name=machine.name,
address=args.target_host,
).override(host_key_check=host_key_check)
else:
try:
with get_best_remote(machine) as _remote:
target_host = machine.target_host().override(
host_key_check=host_key_check
)
except ClanError:
log.warning(
f"Skipping {machine.name} as it has no target host configured."
)
continue
generations = runtime.async_run(
AsyncOpts(
tid=machine.name,
async_ctx=AsyncContext(prefix=machine.name),
),
get_machine_generations,
target_host=target_host,
)
if args.skip_outdated_check:
machine_update = None
else:
machine_update = runtime.async_run(
AsyncOpts(
tid=machine.name + "-needs-update",
async_ctx=AsyncContext(prefix=machine.name),
),
check_machine_up_to_date,
machine=machine,
target_host=target_host,
)
machine_generations[machine] = MachineVersionData(
generations, machine_update
)
runtime.join_all()
R = TypeVar("R")
errors: dict[Machine, Exception] = {}
successful_machines: dict[
Machine, tuple[list[MachineGeneration], UpToDateType]
] = {}
for machine, async_version_data in machine_generations.items():
def get_result(async_future: AsyncFuture[R]) -> R | Exception:
aresult = async_future.get_result()
if aresult is None:
msg = "Generations result should never be None"
raise ClanError(msg)
if aresult.error is not None:
return aresult.error
return aresult.result
mgenerations = get_result(async_version_data.generations)
if isinstance(mgenerations, Exception):
errors[machine] = mgenerations
continue
if async_version_data.machine_update is None:
needs_update: UpToDateType = "unknown"
else:
eneeds_update = get_result(async_version_data.machine_update)
if isinstance(eneeds_update, MonitoringNotEnabledError):
log.warning(
f"Skipping up-to-date check for {machine.name} as monitoring is not enabled."
)
needs_update = "unknown"
elif isinstance(eneeds_update, Exception):
errors[machine] = eneeds_update
continue
else:
needs_update = "out-of-date" if eneeds_update else "up-to-date"
successful_machines[machine] = (mgenerations, needs_update)
# Check if specific machines were requested
specific_machines_requested = bool(args.machines or args.tags)
if specific_machines_requested:
# Print detailed generations for each machine
for mgenerations, needs_update in successful_machines.values():
print_generations(
generations=mgenerations,
needs_update=needs_update,
)
else:
# Print summary table
print_summary_table(successful_machines)
for machine, error in errors.items():
msg = f"Failed for machine {machine.name}: {error}"
raise ClanError(msg) from error
def register_generations_parser(parser: argparse.ArgumentParser) -> None:
machines_parser = parser.add_argument(
"machines",
type=str,
nargs="*",
default=[],
metavar="MACHINE",
help="Machine to update. If no machines are specified, all machines that don't require explicit updates will be updated.",
)
add_dynamic_completer(machines_parser, complete_machines)
tag_parser = parser.add_argument(
"--tags",
nargs="+",
default=[],
help="Tags that machines should be queried for. Multiple tags will intersect.",
)
add_dynamic_completer(tag_parser, complete_tags)
parser.add_argument(
"--host-key-check",
choices=list(get_args(HostKeyCheck)),
default="ask",
help="Host key (.ssh/known_hosts) check mode.",
)
parser.add_argument(
"--target-host",
type=str,
help="Address of the machine to update, in the format of user@host:1234.",
)
parser.add_argument(
"--skip-outdated-check",
action="store_true",
help="Skip checking if the current generation is outdated (faster).",
)
parser.set_defaults(func=generations_command)

View File

@@ -17,6 +17,10 @@ from .list import get_machine_vars
log = logging.getLogger(__name__)
class VarNotFoundError(ClanError):
pass
def get_machine_var(machine: Machine, var_id: str) -> Var:
log.debug(f"getting var: {var_id} from machine: {machine.name}")
vars_ = get_machine_vars(machine)
@@ -29,11 +33,14 @@ def get_machine_var(machine: Machine, var_id: str) -> Var:
if var.id.startswith(var_id):
results.append(var)
if len(results) == 0:
msg = f"Couldn't find var: {var_id} for machine: {machine}"
raise ClanError(msg)
msg = f"Couldn't find var: {var_id} for machine: {machine.name}"
raise VarNotFoundError(msg)
if len(results) > 1:
error = f"Found multiple vars in {machine} for {var_id}:\n - " + "\n - ".join(
[str(var) for var in results],
error = (
f"Found multiple vars in {machine.name} for {var_id}:\n - "
+ "\n - ".join(
[str(var) for var in results],
)
)
raise ClanError(error)
# we have exactly one result at this point

View File

@@ -0,0 +1,44 @@
import json
from dataclasses import dataclass, field
from clan_lib.api import API
from clan_lib.ssh.localhost import LocalHost
from clan_lib.ssh.remote import Remote
@dataclass(order=True, frozen=True)
class MachineGeneration:
generation: int
date: str
nixos_version: str
kernel_version: str
configuration_revision: str
specialisations: list[str] = field(default_factory=list)
current: bool = False
@API.register
def get_machine_generations(target_host: Remote | LocalHost) -> list[MachineGeneration]:
"""Get the nix generations installed on the target host and compare them with the machine."""
with target_host.host_connection() as target_host_conn:
cmd = [
"nixos-rebuild",
"list-generations",
"--json",
]
res = target_host_conn.run(cmd)
data = json.loads(res.stdout.strip())
sorted_data = sorted(data, key=lambda gen: gen.get("generation", 0))
return [
MachineGeneration(
generation=gen.get("generation"),
date=gen.get("date"),
nixos_version=gen.get("nixosVersion", ""),
kernel_version=gen.get("kernelVersion", ""),
configuration_revision=gen.get("configurationRevision", ""),
specialisations=gen.get("specialisations", []),
current=gen.get("current", False),
)
for gen in sorted_data
]

View File

@@ -1,11 +1,12 @@
import json
import logging
import ssl
import urllib.request
from base64 import b64encode
from collections.abc import Iterator
from typing import Any, TypedDict, cast
from clan_cli.vars.get import get_machine_var
from clan_cli.vars.get import VarNotFoundError, get_machine_var
from clan_lib.errors import ClanError
from clan_lib.machines.machines import Machine
@@ -21,6 +22,11 @@ class MetricSample(TypedDict):
timestamp: int
class MonitoringNotEnabledError(ClanError):
pass
# Tests for this function are in the 'monitoring' clanService tests
def get_metrics(
machine: Machine,
target_host: Host,
@@ -36,14 +42,20 @@ def get_metrics(
"""
# Example: fetch Prometheus metrics with basic auth
url = f"http://{target_host.address}:9990/telegraf.json"
url = f"https://{target_host.address}:9990/telegraf.json"
username = "prometheus"
var_name = "telegraf/password"
password_var = get_machine_var(machine, var_name)
if not password_var.exists:
try:
password_var = get_machine_var(machine, "telegraf/password")
cert_var = get_machine_var(machine, "telegraf-certs/crt")
except VarNotFoundError as e:
msg = "Module 'monitoring' is required to fetch metrics from machine."
raise MonitoringNotEnabledError(msg) from e
if not password_var.exists or not cert_var.exists:
msg = (
f"Missing required var '{var_name}' for machine '{machine.name}'.\n"
"Ensure the 'monitoring' clanService is enabled and run `clan machines update {machine.name}`."
f"Missing required var.\n"
f"Ensure the 'monitoring' clanService is enabled and run `clan machines update {machine.name}`."
"For more information, see: https://docs.clan.lol/reference/clanServices/monitoring/"
)
raise ClanError(msg)
@@ -53,21 +65,30 @@ def get_metrics(
encoded_credentials = b64encode(credentials.encode("utf-8")).decode("utf-8")
headers = {"Authorization": f"Basic {encoded_credentials}"}
cert_path = machine.select(
"config.clan.core.vars.generators.telegraf-certs.files.crt.path"
)
context = ssl.create_default_context(cafile=cert_path)
context.check_hostname = False
context.verify_mode = ssl.CERT_REQUIRED
req = urllib.request.Request(url, headers=headers) # noqa: S310
try:
response = urllib.request.urlopen(req) # noqa: S310
for line in response:
line_str = line.decode("utf-8").strip()
if line_str:
try:
yield cast("MetricSample", json.loads(line_str))
except json.JSONDecodeError:
log.warning(f"Skipping invalid JSON line: {line_str}")
continue
machine.info(f"Fetching Prometheus metrics from {url}")
with urllib.request.urlopen(req, context=context, timeout=10) as response: # noqa: S310
for line in response:
line_str = line.decode("utf-8").strip()
if line_str:
try:
yield cast("MetricSample", json.loads(line_str))
except json.JSONDecodeError:
machine.warn(f"Skipping invalid JSON line: {line_str}")
continue
except Exception as e:
msg = (
f"Failed to fetch Prometheus metrics from {url} for machine '{machine.name}': {e}\n"
f"Failed to fetch Prometheus metrics from {url}: {e}\n"
"Ensure the telegraf.service is running and accessible."
)
raise ClanError(msg) from e

View File

@@ -67,8 +67,8 @@ def check_machine_up_to_date(
],
)
log.debug(
f"Checking if {machine.name} needs an update:\n"
machine.debug(
f"Checking up-to-date:\n"
f"Machine outPath: {nixos_systems.current_system}\n"
f"Git outPath : {git_out_path}\n",
)