Merge pull request 'pkgs/cli: Add dynamic completer for tags' (#2372) from kenji/clan-core:kenji-cli/2360/tag-completions into main

This commit is contained in:
clan-bot
2024-11-11 12:54:45 +00:00
2 changed files with 108 additions and 1 deletions

View File

@@ -300,6 +300,111 @@ def complete_target_host(
return providers_dict
def complete_tags(
prefix: str, parsed_args: argparse.Namespace, **kwargs: Any
) -> Iterable[str]:
"""
Provides completion functionality for tags inside the inventory
"""
tags: list[str] = []
threads = []
def run_computed_tags_cmd() -> None:
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
computed_tags_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#clanInternals.inventory.tags",
"--apply",
"builtins.attrNames",
],
),
).stdout.strip()
)
tags.extend(computed_tags_result)
except subprocess.CalledProcessError:
pass
def run_services_tags_cmd() -> None:
services_tags: list[str] = []
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
services_tags_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#clanInternals.inventory.services",
],
),
).stdout.strip()
)
for service in services_tags_result.values():
for environment in service.values():
roles = environment.get("roles", {})
for role_details in roles.values():
services_tags += role_details.get("tags", [])
tags.extend(services_tags)
except subprocess.CalledProcessError:
pass
def run_machines_tags_cmd() -> None:
machine_tags: list[str] = []
try:
if (clan_dir_result := clan_dir(None)) is not None:
flake = clan_dir_result
else:
flake = "."
machine_tags_result = json.loads(
run(
nix_eval(
flags=[
f"{flake}#clanInternals.inventory.machines",
],
),
).stdout.strip()
)
for machine in machine_tags_result.values():
machine_tags.extend(machine.get("tags", []))
tags.extend(machine_tags)
except subprocess.CalledProcessError:
pass
def start_thread(target_function: Callable) -> threading.Thread:
thread = threading.Thread(target=target_function)
thread.start()
return thread
functions_to_run = [
run_computed_tags_cmd,
run_services_tags_cmd,
run_machines_tags_cmd,
]
threads = [start_thread(func) for func in functions_to_run]
for thread in threads:
thread.join(timeout=COMPLETION_TIMEOUT)
if any(thread.is_alive() for thread in threads):
return iter([])
providers_dict = {name: "tag" for name in tags}
return providers_dict
def add_dynamic_completer(
action: argparse.Action,
completer: Callable[..., Iterable[str]],

View File

@@ -10,6 +10,7 @@ from clan_cli.api import API
from clan_cli.clan.create import git_command
from clan_cli.clan_uri import FlakeId
from clan_cli.cmd import Log, run
from clan_cli.completions import add_dynamic_completer, complete_tags
from clan_cli.dirs import clan_templates, get_clan_flake_toplevel_or_env
from clan_cli.errors import ClanError
from clan_cli.inventory import Machine as InventoryMachine
@@ -176,12 +177,13 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
type=str,
help="The name of the machine to create",
)
parser.add_argument(
tag_parser = parser.add_argument(
"--tags",
nargs="+",
default=[],
help="Tags to associate with the machine. Can be used to assign multiple machines to services.",
)
add_dynamic_completer(tag_parser, complete_tags)
parser.add_argument(
"--template-name",
type=str,