From c9ec772f1def0afb497918e8a498d22c9e0304b3 Mon Sep 17 00:00:00 2001 From: a-kenji Date: Mon, 11 Nov 2024 13:45:13 +0100 Subject: [PATCH] pkgs/cli: Add dynamic completer for `tags` Add a dynamic completer for `tags`. Queries tag information from: - the keys of the computed/static tags (inventory tags) - the services tags - the machines tags --- pkgs/clan-cli/clan_cli/completions.py | 105 ++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/pkgs/clan-cli/clan_cli/completions.py b/pkgs/clan-cli/clan_cli/completions.py index 8d9a7d8da..3bb145d21 100644 --- a/pkgs/clan-cli/clan_cli/completions.py +++ b/pkgs/clan-cli/clan_cli/completions.py @@ -300,6 +300,111 @@ def complete_target_host( return providers_dict +def complete_tags( + prefix: str, parsed_args: argparse.Namespace, **kwargs: Any +) -> Iterable[str]: + """ + Provides completion functionality for tags inside the inventory + """ + tags: list[str] = [] + threads = [] + + def run_computed_tags_cmd() -> None: + try: + if (clan_dir_result := clan_dir(None)) is not None: + flake = clan_dir_result + else: + flake = "." + computed_tags_result = json.loads( + run( + nix_eval( + flags=[ + f"{flake}#clanInternals.inventory.tags", + "--apply", + "builtins.attrNames", + ], + ), + ).stdout.strip() + ) + + tags.extend(computed_tags_result) + except subprocess.CalledProcessError: + pass + + def run_services_tags_cmd() -> None: + services_tags: list[str] = [] + try: + if (clan_dir_result := clan_dir(None)) is not None: + flake = clan_dir_result + else: + flake = "." + services_tags_result = json.loads( + run( + nix_eval( + flags=[ + f"{flake}#clanInternals.inventory.services", + ], + ), + ).stdout.strip() + ) + for service in services_tags_result.values(): + for environment in service.values(): + roles = environment.get("roles", {}) + for role_details in roles.values(): + services_tags += role_details.get("tags", []) + + tags.extend(services_tags) + + except subprocess.CalledProcessError: + pass + + def run_machines_tags_cmd() -> None: + machine_tags: list[str] = [] + try: + if (clan_dir_result := clan_dir(None)) is not None: + flake = clan_dir_result + else: + flake = "." + machine_tags_result = json.loads( + run( + nix_eval( + flags=[ + f"{flake}#clanInternals.inventory.machines", + ], + ), + ).stdout.strip() + ) + + for machine in machine_tags_result.values(): + machine_tags.extend(machine.get("tags", [])) + + tags.extend(machine_tags) + except subprocess.CalledProcessError: + pass + + def start_thread(target_function: Callable) -> threading.Thread: + thread = threading.Thread(target=target_function) + thread.start() + return thread + + functions_to_run = [ + run_computed_tags_cmd, + run_services_tags_cmd, + run_machines_tags_cmd, + ] + + threads = [start_thread(func) for func in functions_to_run] + + for thread in threads: + thread.join(timeout=COMPLETION_TIMEOUT) + + if any(thread.is_alive() for thread in threads): + return iter([]) + + providers_dict = {name: "tag" for name in tags} + return providers_dict + + def add_dynamic_completer( action: argparse.Action, completer: Callable[..., Iterable[str]],