feat(persist/inventory): add transformation and filtering to support deferredModules

This commit is contained in:
Johannes Kirschbauer
2025-05-23 21:45:52 +02:00
parent 467c1e7f28
commit bb2955d9ad

View File

@@ -12,9 +12,66 @@ from .util import (
calc_patches,
delete_by_path,
determine_writeability,
path_match,
)
def unwrap_known_unknown(value: Any) -> Any:
"""
Helper untility to unwrap our custom deferred module. (uniqueDeferredSerializableModule)
This works because we control ClanLib.type.uniqueDeferredSerializableModule
If value is a dict with the form:
{
"imports": [
{
"_file": <any>,
"imports": [<actual_value>]
}
]
}
then return the actual_value.
Otherwise, return the value unchanged.
"""
if (
isinstance(value, dict)
and "imports" in value
and isinstance(value["imports"], list)
and len(value["imports"]) == 1
and isinstance(value["imports"][0], dict)
and "_file" in value["imports"][0]
and "imports" in value["imports"][0]
and isinstance(value["imports"][0]["imports"], list)
and len(value["imports"][0]["imports"]) == 1
):
return value["imports"][0]["imports"][0]
return value
def sanitize(data: Any, whitelist_paths: list[str], current_path: list[str]) -> Any:
"""
Recursively walks dicts only, unwraps matching values only on whitelisted paths.
Throws error if a value would be transformed on non-whitelisted path.
"""
if isinstance(data, dict):
sanitized = {}
for k, v in data.items():
new_path = [*current_path, k]
unwrapped_v = unwrap_known_unknown(v)
if unwrapped_v is not v: # means unwrap will happen
# check whitelist
wl_paths_split = [wp.split(".") for wp in whitelist_paths]
if not path_match(new_path, wl_paths_split):
msg = f"Unwrap attempted at disallowed path: {'.'.join(new_path)}"
raise ValueError(msg)
sanitized[k] = unwrapped_v
else:
sanitized[k] = sanitize(v, whitelist_paths, new_path)
return sanitized
return data
@dataclass
class WriteInfo:
writeables: dict[str, set[str]]
@@ -35,10 +92,31 @@ class FlakeInterface(Protocol):
class InventoryStore:
def __init__(
self, flake: FlakeInterface, inventory_file_name: str = "inventory.json"
self,
flake: FlakeInterface,
inventory_file_name: str = "inventory.json",
_allowed_path_transforms: list[str] | None = None,
_keys: list[str] | None = None,
) -> None:
"""
InventoryStore constructor
:param flake: The flake to use
:param inventory_file_name: The name of the inventory file
:param _allowed_path_transforms: The paths where deferredModules are allowed to be transformed
"""
self._flake = flake
self.inventory_file = self._flake.path / inventory_file_name
if _allowed_path_transforms is None:
_allowed_path_transforms = [
"instances.*.settings",
"instances.*.machines.*.settings",
]
self._allowed_path_transforms = _allowed_path_transforms
if _keys is None:
_keys = ["machines", "instances", "meta", "services"]
self._keys = _keys
def _load_merged_inventory(self) -> Inventory:
"""
@@ -51,7 +129,11 @@ class InventoryStore:
- Contains all machines
- and more
"""
return self._flake.select("clanInternals.inventoryClass.inventory")
raw_value = self._flake.select("clanInternals.inventoryClass.inventory")
filtered = {k: v for k, v in raw_value.items() if k in self._keys}
sanitized = sanitize(filtered, self._allowed_path_transforms, [])
return sanitized
def _get_persisted(self) -> Inventory:
"""
@@ -146,23 +228,14 @@ class InventoryStore:
"""
write_info = self._write_info()
# Remove internals from the inventory
update.pop("tags", None) # type: ignore
update.pop("options", None) # type: ignore
update.pop("assertions", None) # type: ignore
# Remove instances until the 'settings' deferred module is properly supported.
update.pop("instances", None)
patchset, delete_set = calc_patches(
dict(write_info.data_disk),
dict(update),
dict(write_info.data_eval),
write_info.writeables,
)
persisted = dict(write_info.data_disk)
persisted = dict(write_info.data_disk)
for patch_path, data in patchset.items():
apply_patch(persisted, patch_path, data)