pkgs/cli: Add dynamic completer for tags

Add a dynamic completer for `tags`.

Queries tag information from:
- the keys of the computed/static tags (inventory tags)
- the services tags
- the machines tags
This commit is contained in:
a-kenji
2024-11-11 13:45:13 +01:00
parent e08f2bf3e1
commit c9ec772f1d

View File

@@ -300,6 +300,111 @@ def complete_target_host(
return providers_dict
def complete_tags(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for tags inside the inventory
"""
tags: list[str] = []
threads = []
def run_computed_tags_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
computed_tags_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#clanInternals.inventory.tags",
"--apply",
"builtins.attrNames",
],
),
).stdout.strip()
)
tags.extend(computed_tags_result)
except subprocess.CalledProcessError:
pass
def run_services_tags_cmd() -> None:
services_tags: list[str] = []
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
services_tags_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#clanInternals.inventory.services",
],
),
).stdout.strip()
)
for service in services_tags_result.values():
for environment in service.values():
roles = environment.get("roles", {})
for role_details in roles.values():
services_tags += role_details.get("tags", [])
tags.extend(services_tags)
except subprocess.CalledProcessError:
pass
def run_machines_tags_cmd() -> None:
machine_tags: list[str] = []
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
machine_tags_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#clanInternals.inventory.machines",
],
),
).stdout.strip()
)
for machine in machine_tags_result.values():
machine_tags.extend(machine.get("tags", []))
tags.extend(machine_tags)
except subprocess.CalledProcessError:
pass
def start_thread(target_function: Callable) -> threading.Thread:
thread = threading.Thread(target=target_function)
thread.start()
return thread
functions_to_run = [
run_computed_tags_cmd,
run_services_tags_cmd,
run_machines_tags_cmd,
]
threads = [start_thread(func) for func in functions_to_run]
for thread in threads:
thread.join(timeout=COMPLETION_TIMEOUT)
if any(thread.is_alive() for thread in threads):
return iter([])
providers_dict = {name: "tag" for name in tags}
return providers_dict
def add_dynamic_completer(
action: argparse.Action,
completer: Callable[..., Iterable[str]],