Files
clan-core/pkgs/clan-cli/clan_cli/completions.py
2025-06-10 19:15:07 +02:00

369 lines
10 KiB
Python

import argparse
import contextlib
import json
import subprocess
import threading
from collections.abc import Callable, Iterable
from types import ModuleType
from typing import Any
from clan_lib.cmd import run
from clan_lib.flake.flake import Flake
from clan_lib.nix import nix_eval
from clan_lib.persist.inventory_store import InventoryStore
"""
This module provides dynamic completions.
The completions should feel fast.
We target a maximum of 1second on our average machine.
"""
argcomplete: ModuleType | None = None
with contextlib.suppress(ImportError):
import argcomplete # type: ignore[no-redef]
# The default completion timeout for commands
COMPLETION_TIMEOUT: int = 3
def clan_dir(flake: str | None) -> str | None:
from clan_lib.dirs import get_clan_flake_toplevel_or_env
path_result = get_clan_flake_toplevel_or_env()
return str(path_result) if path_result is not None else None
def complete_machines(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for machine names configured in the clan.
"""
machines: list[str] = []
def run_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
inventory = InventoryStore(Flake(str(flake))).read()
machines.extend(inventory.get("machines", {}).keys())
except subprocess.CalledProcessError:
pass
thread = threading.Thread(target=run_cmd)
thread.start()
thread.join(timeout=COMPLETION_TIMEOUT)
if thread.is_alive():
return iter([])
machines_dict = dict.fromkeys(machines, "machine")
return machines_dict
def complete_services_for_machine(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for machine facts generation services.
"""
services: list[str] = []
# TODO: consolidate, if multiple machines are used
machines: list[str] = parsed_args.machines
def run_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
services_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#nixosConfigurations.{machines[0]}.config.clan.core.facts.services",
"--apply",
"builtins.attrNames",
],
),
).stdout.strip()
)
services.extend(services_result)
except subprocess.CalledProcessError:
pass
thread = threading.Thread(target=run_cmd)
thread.start()
thread.join(timeout=COMPLETION_TIMEOUT)
if thread.is_alive():
return iter([])
services_dict = dict.fromkeys(services, "service")
return services_dict
def complete_backup_providers_for_machine(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for machine backup providers.
"""
providers: list[str] = []
machine: str = parsed_args.machine
def run_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
providers_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#nixosConfigurations.{machine}.config.clan.core.backups.providers",
"--apply",
"builtins.attrNames",
],
),
).stdout.strip()
)
providers.extend(providers_result)
except subprocess.CalledProcessError:
pass
thread = threading.Thread(target=run_cmd)
thread.start()
thread.join(timeout=COMPLETION_TIMEOUT)
if thread.is_alive():
return iter([])
providers_dict = dict.fromkeys(providers, "provider")
return providers_dict
def complete_state_services_for_machine(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for machine state providers.
"""
providers: list[str] = []
machine: str = parsed_args.machine
def run_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
providers_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#nixosConfigurations.{machine}.config.clan.core.state",
"--apply",
"builtins.attrNames",
],
),
).stdout.strip()
)
providers.extend(providers_result)
except subprocess.CalledProcessError:
pass
thread = threading.Thread(target=run_cmd)
thread.start()
thread.join(timeout=COMPLETION_TIMEOUT)
if thread.is_alive():
return iter([])
providers_dict = dict.fromkeys(providers, "service")
return providers_dict
def complete_secrets(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for clan secrets
"""
from clan_lib.flake.flake import Flake
from .secrets.secrets import list_secrets
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
secrets = list_secrets(Flake(flake).path)
secrets_dict = dict.fromkeys(secrets, "secret")
return secrets_dict
def complete_users(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for clan users
"""
from pathlib import Path
from .secrets.users import list_users
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
users = list_users(Path(flake))
users_dict = dict.fromkeys(users, "user")
return users_dict
def complete_groups(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for clan groups
"""
from pathlib import Path
from .secrets.groups import list_groups
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
groups_list = list_groups(Path(flake))
groups = [group.name for group in groups_list]
groups_dict = dict.fromkeys(groups, "group")
return groups_dict
def complete_target_host(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for target_host for a specific machine
"""
target_hosts: list[str] = []
machine: str = parsed_args.machine
def run_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
target_host_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#nixosConfigurations.{machine}.config.clan.core.networking.targetHost",
],
),
).stdout.strip()
)
target_hosts.append(target_host_result)
except subprocess.CalledProcessError:
pass
thread = threading.Thread(target=run_cmd)
thread.start()
thread.join(timeout=COMPLETION_TIMEOUT)
if thread.is_alive():
return iter([])
providers_dict = dict.fromkeys(target_hosts, "target_host")
return providers_dict
def complete_tags(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for tags inside the inventory
"""
tags: list[str] = []
threads = []
def run_computed_tags_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
inventory_store = InventoryStore(Flake(str(flake)))
inventory = inventory_store.get_readonly_raw()
if "tags" in inventory:
tags.extend(inventory["tags"].keys())
except subprocess.CalledProcessError:
pass
def run_machines_tags_cmd() -> None:
machine_tags: list[str] = []
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
inventory_store = InventoryStore(Flake(str(flake)))
inventory = inventory_store.get_readonly_raw()
machine_tags_result = inventory.get("machines")
if machine_tags_result is None:
return
for machine in machine_tags_result.values():
machine_tags.extend(machine.get("tags", []))
tags.extend(machine_tags)
except subprocess.CalledProcessError:
pass
def start_thread(target_function: Callable) -> threading.Thread:
thread = threading.Thread(target=target_function)
thread.start()
return thread
functions_to_run = [
run_computed_tags_cmd,
run_machines_tags_cmd,
]
threads = [start_thread(func) for func in functions_to_run]
for thread in threads:
thread.join(timeout=COMPLETION_TIMEOUT)
if any(thread.is_alive() for thread in threads):
return iter([])
providers_dict = dict.fromkeys(tags, "tag")
return providers_dict
def add_dynamic_completer(
action: argparse.Action,
completer: Callable[..., Iterable[str]],
) -> None:
"""
Add a completion function to an argparse action, this will only be added,
if the argcomplete module is loaded.
"""
if argcomplete:
# mypy doesn't check this correctly, so we ignore it
action.completer = completer # type: ignore[attr-defined]