When a machine is deleted with `clan machines delete`, remove its vars and legacy secrets, and update any secrets that reference the machine's key. This command is a superset of `clan secrets machine delete`, and I am wondering if we could remove the `clan secrets machine` subcommand, unless there is an use case for having a machine defined without its key, and any secrets/vars? Note: - This deletes the `ListSecretsOptions` dataclass, as it did not seem to bring any value, especially since `list_secrets` was receiving its individual members instead of the whole dataclass. We can always bring it back if complexity grows to demand it.
414 lines
12 KiB
Python
414 lines
12 KiB
Python
import argparse
|
|
import contextlib
|
|
import json
|
|
import subprocess
|
|
import threading
|
|
from collections.abc import Callable, Iterable
|
|
from types import ModuleType
|
|
from typing import Any
|
|
|
|
from .cmd import run
|
|
from .nix import nix_eval
|
|
|
|
"""
|
|
This module provides dynamic completions.
|
|
The completions should feel fast.
|
|
We target a maximum of 1second on our average machine.
|
|
"""
|
|
|
|
|
|
argcomplete: ModuleType | None = None
|
|
with contextlib.suppress(ImportError):
|
|
import argcomplete # type: ignore[no-redef]
|
|
|
|
|
|
# The default completion timeout for commands
|
|
COMPLETION_TIMEOUT: int = 3
|
|
|
|
|
|
def clan_dir(flake: str | None) -> str | None:
|
|
from .dirs import get_clan_flake_toplevel_or_env
|
|
|
|
path_result = get_clan_flake_toplevel_or_env()
|
|
return str(path_result) if path_result is not None else None
|
|
|
|
|
|
def complete_machines(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for machine names configured in the clan.
|
|
"""
|
|
machines: list[str] = []
|
|
|
|
def run_cmd() -> None:
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
services_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#clanInternals.machines.x86_64-linux",
|
|
"--apply",
|
|
"builtins.attrNames",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
machines.extend(services_result)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
thread = threading.Thread(target=run_cmd)
|
|
thread.start()
|
|
thread.join(timeout=COMPLETION_TIMEOUT)
|
|
|
|
if thread.is_alive():
|
|
return iter([])
|
|
|
|
machines_dict = dict.fromkeys(machines, "machine")
|
|
return machines_dict
|
|
|
|
|
|
def complete_services_for_machine(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for machine facts generation services.
|
|
"""
|
|
services: list[str] = []
|
|
# TODO: consolidate, if multiple machines are used
|
|
machines: list[str] = parsed_args.machines
|
|
|
|
def run_cmd() -> None:
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
services_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#nixosConfigurations.{machines[0]}.config.clan.core.facts.services",
|
|
"--apply",
|
|
"builtins.attrNames",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
services.extend(services_result)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
thread = threading.Thread(target=run_cmd)
|
|
thread.start()
|
|
thread.join(timeout=COMPLETION_TIMEOUT)
|
|
|
|
if thread.is_alive():
|
|
return iter([])
|
|
|
|
services_dict = dict.fromkeys(services, "service")
|
|
return services_dict
|
|
|
|
|
|
def complete_backup_providers_for_machine(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for machine backup providers.
|
|
"""
|
|
providers: list[str] = []
|
|
machine: str = parsed_args.machine
|
|
|
|
def run_cmd() -> None:
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
providers_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#nixosConfigurations.{machine}.config.clan.core.backups.providers",
|
|
"--apply",
|
|
"builtins.attrNames",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
providers.extend(providers_result)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
thread = threading.Thread(target=run_cmd)
|
|
thread.start()
|
|
thread.join(timeout=COMPLETION_TIMEOUT)
|
|
|
|
if thread.is_alive():
|
|
return iter([])
|
|
|
|
providers_dict = dict.fromkeys(providers, "provider")
|
|
return providers_dict
|
|
|
|
|
|
def complete_state_services_for_machine(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for machine state providers.
|
|
"""
|
|
providers: list[str] = []
|
|
machine: str = parsed_args.machine
|
|
|
|
def run_cmd() -> None:
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
providers_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#nixosConfigurations.{machine}.config.clan.core.state",
|
|
"--apply",
|
|
"builtins.attrNames",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
providers.extend(providers_result)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
thread = threading.Thread(target=run_cmd)
|
|
thread.start()
|
|
thread.join(timeout=COMPLETION_TIMEOUT)
|
|
|
|
if thread.is_alive():
|
|
return iter([])
|
|
|
|
providers_dict = dict.fromkeys(providers, "service")
|
|
return providers_dict
|
|
|
|
|
|
def complete_secrets(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for clan secrets
|
|
"""
|
|
from .clan_uri import Flake
|
|
from .secrets.secrets import list_secrets
|
|
|
|
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
|
|
|
|
secrets = list_secrets(Flake(flake).path)
|
|
|
|
secrets_dict = dict.fromkeys(secrets, "secret")
|
|
return secrets_dict
|
|
|
|
|
|
def complete_users(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for clan users
|
|
"""
|
|
from pathlib import Path
|
|
|
|
from .secrets.users import list_users
|
|
|
|
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
|
|
|
|
users = list_users(Path(flake))
|
|
|
|
users_dict = dict.fromkeys(users, "user")
|
|
return users_dict
|
|
|
|
|
|
def complete_groups(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for clan groups
|
|
"""
|
|
from pathlib import Path
|
|
|
|
from .secrets.groups import list_groups
|
|
|
|
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
|
|
|
|
groups_list = list_groups(Path(flake))
|
|
groups = [group.name for group in groups_list]
|
|
|
|
groups_dict = dict.fromkeys(groups, "group")
|
|
return groups_dict
|
|
|
|
|
|
def complete_target_host(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for target_host for a specific machine
|
|
"""
|
|
target_hosts: list[str] = []
|
|
machine: str = parsed_args.machine
|
|
|
|
def run_cmd() -> None:
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
target_host_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#nixosConfigurations.{machine}.config.clan.core.networking.targetHost",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
target_hosts.append(target_host_result)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
thread = threading.Thread(target=run_cmd)
|
|
thread.start()
|
|
thread.join(timeout=COMPLETION_TIMEOUT)
|
|
|
|
if thread.is_alive():
|
|
return iter([])
|
|
|
|
providers_dict = dict.fromkeys(target_hosts, "target_host")
|
|
return providers_dict
|
|
|
|
|
|
def complete_tags(
|
|
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
|
|
) -> Iterable[str]:
|
|
"""
|
|
Provides completion functionality for tags inside the inventory
|
|
"""
|
|
tags: list[str] = []
|
|
threads = []
|
|
|
|
def run_computed_tags_cmd() -> None:
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
computed_tags_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#clanInternals.inventory.tags",
|
|
"--apply",
|
|
"builtins.attrNames",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
tags.extend(computed_tags_result)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
def run_services_tags_cmd() -> None:
|
|
services_tags: list[str] = []
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
services_tags_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#clanInternals.inventory.services",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
for service in services_tags_result.values():
|
|
for environment in service.values():
|
|
roles = environment.get("roles", {})
|
|
for role_details in roles.values():
|
|
services_tags += role_details.get("tags", [])
|
|
|
|
tags.extend(services_tags)
|
|
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
def run_machines_tags_cmd() -> None:
|
|
machine_tags: list[str] = []
|
|
try:
|
|
if (clan_dir_result := clan_dir(None)) is not None:
|
|
flake = clan_dir_result
|
|
else:
|
|
flake = "."
|
|
machine_tags_result = json.loads(
|
|
run(
|
|
nix_eval(
|
|
flags=[
|
|
f"{flake}#clanInternals.inventory.machines",
|
|
],
|
|
),
|
|
).stdout.strip()
|
|
)
|
|
|
|
for machine in machine_tags_result.values():
|
|
machine_tags.extend(machine.get("tags", []))
|
|
|
|
tags.extend(machine_tags)
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
|
|
def start_thread(target_function: Callable) -> threading.Thread:
|
|
thread = threading.Thread(target=target_function)
|
|
thread.start()
|
|
return thread
|
|
|
|
functions_to_run = [
|
|
run_computed_tags_cmd,
|
|
run_services_tags_cmd,
|
|
run_machines_tags_cmd,
|
|
]
|
|
|
|
threads = [start_thread(func) for func in functions_to_run]
|
|
|
|
for thread in threads:
|
|
thread.join(timeout=COMPLETION_TIMEOUT)
|
|
|
|
if any(thread.is_alive() for thread in threads):
|
|
return iter([])
|
|
|
|
providers_dict = dict.fromkeys(tags, "tag")
|
|
return providers_dict
|
|
|
|
|
|
def add_dynamic_completer(
|
|
action: argparse.Action,
|
|
completer: Callable[..., Iterable[str]],
|
|
) -> None:
|
|
"""
|
|
Add a completion function to an argparse action, this will only be added,
|
|
if the argcomplete module is loaded.
|
|
"""
|
|
if argcomplete:
|
|
# mypy doesn't check this correctly, so we ignore it
|
|
action.completer = completer # type: ignore[attr-defined]
|