api/list_modules: return a simpler list of modules
This commit is contained in:
@@ -8,6 +8,7 @@ from clan_lib.errors import ClanError
|
|||||||
from clan_lib.flake.flake import Flake
|
from clan_lib.flake.flake import Flake
|
||||||
from clan_lib.nix_models.clan import (
|
from clan_lib.nix_models.clan import (
|
||||||
InventoryInstance,
|
InventoryInstance,
|
||||||
|
InventoryInstanceModule,
|
||||||
InventoryInstanceModuleType,
|
InventoryInstanceModuleType,
|
||||||
InventoryInstanceRolesType,
|
InventoryInstanceRolesType,
|
||||||
)
|
)
|
||||||
@@ -151,27 +152,32 @@ class ModuleInfo(TypedDict):
|
|||||||
roles: dict[str, None]
|
roles: dict[str, None]
|
||||||
|
|
||||||
|
|
||||||
class ModuleList(TypedDict):
|
class Module(TypedDict):
|
||||||
modules: dict[str, dict[str, ModuleInfo]]
|
module: InventoryInstanceModule
|
||||||
|
info: ModuleInfo
|
||||||
|
|
||||||
|
|
||||||
@API.register
|
@API.register
|
||||||
def list_service_modules(flake: Flake) -> ModuleList:
|
def list_service_modules(flake: Flake) -> list[Module]:
|
||||||
"""Show information about a module"""
|
"""Show information about a module"""
|
||||||
modules = flake.select("clanInternals.inventoryClass.modulesPerSource")
|
modules = flake.select("clanInternals.inventoryClass.modulesPerSource")
|
||||||
|
|
||||||
res: dict[str, dict[str, ModuleInfo]] = {}
|
res: list[Module] = []
|
||||||
for input_name, module_set in modules.items():
|
for input_name, module_set in modules.items():
|
||||||
res[input_name] = {}
|
|
||||||
|
|
||||||
for module_name, module_info in module_set.items():
|
for module_name, module_info in module_set.items():
|
||||||
# breakpoint()
|
res.append(
|
||||||
res[input_name][module_name] = ModuleInfo(
|
Module(
|
||||||
manifest=ModuleManifest.from_dict(module_info.get("manifest")),
|
module={"name": module_name, "input": input_name},
|
||||||
|
info=ModuleInfo(
|
||||||
|
manifest=ModuleManifest.from_dict(
|
||||||
|
module_info.get("manifest"),
|
||||||
|
),
|
||||||
roles=module_info.get("roles", {}),
|
roles=module_info.get("roles", {}),
|
||||||
|
),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
return ModuleList(modules=res)
|
return res
|
||||||
|
|
||||||
|
|
||||||
@API.register
|
@API.register
|
||||||
@@ -188,19 +194,21 @@ def get_service_module(
|
|||||||
input_name, module_name = check_service_module_ref(flake, module_ref)
|
input_name, module_name = check_service_module_ref(flake, module_ref)
|
||||||
|
|
||||||
avilable_modules = list_service_modules(flake)
|
avilable_modules = list_service_modules(flake)
|
||||||
module_set = avilable_modules.get("modules", {}).get(input_name)
|
module_set: list[Module] = [
|
||||||
|
m for m in avilable_modules if m["module"].get("input", None) == input_name
|
||||||
|
]
|
||||||
|
|
||||||
if module_set is None:
|
if not module_set:
|
||||||
msg = f"Module set for input '{input_name}' not found"
|
msg = f"Module set for input '{input_name}' not found"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
module = module_set.get(module_name)
|
module = next((m for m in module_set if m["module"]["name"] == module_name), None)
|
||||||
|
|
||||||
if module is None:
|
if module is None:
|
||||||
msg = f"Module '{module_name}' not found in input '{input_name}'"
|
msg = f"Module '{module_name}' not found in input '{input_name}'"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
return module
|
return module["info"]
|
||||||
|
|
||||||
|
|
||||||
def check_service_module_ref(
|
def check_service_module_ref(
|
||||||
@@ -219,18 +227,21 @@ def check_service_module_ref(
|
|||||||
msg = "Setting module_ref.input is currently required"
|
msg = "Setting module_ref.input is currently required"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
module_set = avilable_modules.get("modules", {}).get(input_ref)
|
module_set = [
|
||||||
|
m for m in avilable_modules if m["module"].get("input", None) == input_ref
|
||||||
|
]
|
||||||
|
|
||||||
if module_set is None:
|
if module_set is None:
|
||||||
|
inputs = {m["module"].get("input") for m in avilable_modules}
|
||||||
msg = f"module set for input '{input_ref}' not found"
|
msg = f"module set for input '{input_ref}' not found"
|
||||||
msg += f"\nAvilable input_refs: {avilable_modules.get('modules', {}).keys()}"
|
msg += f"\nAvilable input_refs: {inputs}"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
module_name = module_ref.get("name")
|
module_name = module_ref.get("name")
|
||||||
if not module_name:
|
if not module_name:
|
||||||
msg = "Module name is required in module_ref"
|
msg = "Module name is required in module_ref"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
module = module_set.get(module_name)
|
module = next((m for m in module_set if m["module"]["name"] == module_name), None)
|
||||||
if module is None:
|
if module is None:
|
||||||
msg = f"module with name '{module_name}' not found"
|
msg = f"module with name '{module_name}' not found"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|||||||
@@ -215,8 +215,10 @@ def test_clan_create_api(
|
|||||||
inventory = store.read()
|
inventory = store.read()
|
||||||
|
|
||||||
modules = list_service_modules(clan_dir_flake)
|
modules = list_service_modules(clan_dir_flake)
|
||||||
|
|
||||||
|
admin_module = next(m for m in modules if m["module"]["name"] == "admin")
|
||||||
assert (
|
assert (
|
||||||
modules["modules"]["clan-core"]["admin"]["manifest"].name == "clan-core/admin"
|
admin_module["info"]["manifest"].name == "clan-core/admin"
|
||||||
)
|
)
|
||||||
|
|
||||||
set_value_by_path(inventory, "instances", inventory_conf.instances)
|
set_value_by_path(inventory, "instances", inventory_conf.instances)
|
||||||
|
|||||||
Reference in New Issue
Block a user