Inventory: init delete patial data
This commit is contained in:
@@ -140,12 +140,59 @@ def find_duplicates(string_list: list[str]) -> list[str]:
|
||||
return duplicates
|
||||
|
||||
|
||||
def find_deleted_paths(
|
||||
persisted: dict[str, Any], update: dict[str, Any], parent_key: str = ""
|
||||
) -> set[str]:
|
||||
"""
|
||||
Recursively find keys (at any nesting level) that exist in persisted but do not
|
||||
exist in update. If a nested dictionary is completely removed, return that dictionary key.
|
||||
|
||||
:param persisted: The original (persisted) nested dictionary.
|
||||
:param update: The updated nested dictionary (some keys might be removed).
|
||||
:param parent_key: The dotted path to the current dictionary's location.
|
||||
:return: A set of dotted paths indicating keys or entire nested paths that were deleted.
|
||||
"""
|
||||
deleted_paths = set()
|
||||
|
||||
# Iterate over keys in persisted
|
||||
for key, p_value in persisted.items():
|
||||
current_path = f"{parent_key}.{key}" if parent_key else key
|
||||
# Check if this key exists in update
|
||||
# breakpoint()
|
||||
if key not in update:
|
||||
# Key doesn't exist at all -> entire branch deleted
|
||||
deleted_paths.add(current_path)
|
||||
else:
|
||||
u_value = update[key]
|
||||
# If persisted value is dict, check the update value
|
||||
if isinstance(p_value, dict):
|
||||
if isinstance(u_value, dict):
|
||||
# If persisted dict is non-empty but updated dict is empty,
|
||||
# that means everything under this branch is removed.
|
||||
if p_value and not u_value:
|
||||
# All children are removed
|
||||
for child_key in p_value:
|
||||
child_path = f"{current_path}.{child_key}"
|
||||
deleted_paths.add(child_path)
|
||||
else:
|
||||
# Both are dicts, recurse deeper
|
||||
deleted_paths |= find_deleted_paths(
|
||||
p_value, u_value, current_path
|
||||
)
|
||||
else:
|
||||
# Persisted was a dict, update is not a dict -> entire branch changed
|
||||
# Consider this as a full deletion of the persisted branch
|
||||
deleted_paths.add(current_path)
|
||||
|
||||
return deleted_paths
|
||||
|
||||
|
||||
def calc_patches(
|
||||
persisted: dict[str, Any],
|
||||
update: dict[str, Any],
|
||||
all_values: dict[str, Any],
|
||||
writeables: dict[str, set[str]],
|
||||
) -> dict[str, Any]:
|
||||
) -> tuple[dict[str, Any], set[str]]:
|
||||
"""
|
||||
Calculate the patches to apply to the inventory.
|
||||
|
||||
@@ -158,6 +205,8 @@ def calc_patches(
|
||||
: param writeable: The writeable keys. Use 'determine_writeability'.
|
||||
Example: {'writeable': {'foo', 'foo.bar'}, 'non_writeable': {'foo.nix'}}
|
||||
: param all_values: All values in the inventory retrieved from the flake evaluation.
|
||||
|
||||
Returns a tuple with the SET and DELETE patches.
|
||||
"""
|
||||
persisted_flat = flatten_data(persisted)
|
||||
update_flat = flatten_data(update)
|
||||
@@ -217,7 +266,14 @@ def calc_patches(
|
||||
msg = f"Cannot determine writeability for key '{update_key}'"
|
||||
raise ClanError(msg)
|
||||
|
||||
return patchset
|
||||
delete_set = find_deleted_paths(persisted, update)
|
||||
|
||||
for delete_key in delete_set:
|
||||
if not is_writeable_key(delete_key):
|
||||
msg = f"Cannot delete: Key '{delete_key}' is not writeable."
|
||||
raise ClanError(msg)
|
||||
|
||||
return patchset, delete_set
|
||||
|
||||
|
||||
def determine_writeability(
|
||||
@@ -363,6 +419,42 @@ def load_inventory_json(flake_dir: str | Path) -> Inventory:
|
||||
return inventory
|
||||
|
||||
|
||||
def delete(d: dict[str, Any], path: str) -> Any:
|
||||
"""
|
||||
Deletes the nested entry specified by a dot-separated path from the dictionary using pop().
|
||||
|
||||
:param data: The dictionary to modify.
|
||||
:param path: A dot-separated string indicating the nested key to delete.
|
||||
e.g., "foo.bar.baz" will attempt to delete data["foo"]["bar"]["baz"].
|
||||
|
||||
:raises KeyError: If any intermediate key is missing or not a dictionary,
|
||||
or if the final key to delete is not found.
|
||||
"""
|
||||
if not path:
|
||||
msg = "Cannot delete. Path is empty."
|
||||
raise ClanError(msg)
|
||||
|
||||
keys = path.split(".")
|
||||
current = d
|
||||
|
||||
# Navigate to the parent dictionary of the final key
|
||||
for key in keys[:-1]:
|
||||
if key not in current or not isinstance(current[key], dict):
|
||||
msg = f"Cannot delete. Key '{path}' not found or not a dictionary '{d}'."
|
||||
raise ClanError(msg)
|
||||
current = current[key]
|
||||
|
||||
# Attempt to pop the final key
|
||||
last_key = keys[-1]
|
||||
try:
|
||||
value = current.pop(last_key)
|
||||
except KeyError:
|
||||
msg = f"Cannot delete. Path '{path}' not found in data '{d}'. "
|
||||
raise ClanError(msg) from KeyError
|
||||
else:
|
||||
return {last_key: value}
|
||||
|
||||
|
||||
def patch(d: dict[str, Any], path: str, content: Any) -> None:
|
||||
"""
|
||||
Update the value at a specific dot-separated path in a nested dictionary.
|
||||
@@ -436,17 +528,20 @@ def set_inventory(inventory: Inventory, flake_dir: str | Path, message: str) ->
|
||||
inventory.pop("options", None) # type: ignore
|
||||
inventory.pop("assertions", None) # type: ignore
|
||||
|
||||
patchset = calc_patches(
|
||||
patchset, delete_set = calc_patches(
|
||||
dict(write_info.data_disk),
|
||||
dict(inventory),
|
||||
dict(write_info.data_eval),
|
||||
write_info.writeables,
|
||||
)
|
||||
|
||||
persisted = dict(write_info.data_disk)
|
||||
|
||||
for patch_path, data in patchset.items():
|
||||
patch(persisted, patch_path, data)
|
||||
|
||||
for delete_path in delete_set:
|
||||
delete(persisted, delete_path)
|
||||
|
||||
inventory_file = get_inventory_path(flake_dir)
|
||||
with inventory_file.open("w") as f:
|
||||
json.dump(persisted, f, indent=2)
|
||||
@@ -471,5 +566,5 @@ def init_inventory(directory: str, init: Inventory | None = None) -> None:
|
||||
|
||||
|
||||
@API.register
|
||||
def get_inventory(flake_dir: str | Path) -> Inventory:
|
||||
return load_inventory_eval(flake_dir)
|
||||
def get_inventory(base_path: str | Path) -> Inventory:
|
||||
return load_inventory_eval(base_path)
|
||||
|
||||
Reference in New Issue
Block a user