store: move merge_objects into persistence helpers
This commit is contained in:
@@ -3,11 +3,67 @@ Utilities for working with nested dictionaries, particularly for
|
||||
flattening, unmerging lists, finding duplicates, and calculating patches.
|
||||
"""
|
||||
|
||||
import json
|
||||
from collections import Counter
|
||||
from typing import Any
|
||||
from typing import Any, TypeVar, cast
|
||||
|
||||
from clan_lib.errors import ClanError
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
empty: list[str] = []
|
||||
|
||||
|
||||
def merge_objects(
|
||||
curr: T, update: T, merge_lists: bool = True, path: list[str] = empty
|
||||
) -> T:
|
||||
"""
|
||||
Updates values in curr by values of update
|
||||
The output contains values for all keys of curr and update together.
|
||||
|
||||
Lists are deduplicated and appended almost like in the nix module system.
|
||||
|
||||
Example:
|
||||
|
||||
merge_objects({"a": 1}, {"a": null }) -> {"a": null}
|
||||
merge_objects({"a": null}, {"a": 1 }) -> {"a": 1}
|
||||
"""
|
||||
result = {}
|
||||
msg = f"cannot update non-dictionary values: {curr} by {update}"
|
||||
if not isinstance(update, dict):
|
||||
raise ClanError(msg)
|
||||
if not isinstance(curr, dict):
|
||||
raise ClanError(msg)
|
||||
|
||||
all_keys = set(update.keys()).union(curr.keys())
|
||||
|
||||
for key in all_keys:
|
||||
curr_val = curr.get(key)
|
||||
update_val = update.get(key)
|
||||
|
||||
if isinstance(update_val, dict) and isinstance(curr_val, dict):
|
||||
result[key] = merge_objects(
|
||||
curr_val, update_val, merge_lists=merge_lists, path=[*path, key]
|
||||
)
|
||||
elif isinstance(update_val, list) and isinstance(curr_val, list):
|
||||
if merge_lists:
|
||||
result[key] = list(dict.fromkeys(curr_val + update_val)) # type: ignore
|
||||
else:
|
||||
result[key] = update_val # type: ignore
|
||||
elif (
|
||||
update_val is not None
|
||||
and curr_val is not None
|
||||
and type(update_val) is not type(curr_val)
|
||||
):
|
||||
msg = f"Type mismatch for key '{key}'. Cannot update {type(curr_val)} with {type(update_val)}"
|
||||
raise ClanError(msg, location=json.dumps([*path, key]))
|
||||
elif key in update:
|
||||
result[key] = update_val # type: ignore
|
||||
elif key in curr:
|
||||
result[key] = curr_val # type: ignore
|
||||
|
||||
return cast(T, result)
|
||||
|
||||
|
||||
def path_match(path: list[str], whitelist_paths: list[list[str]]) -> bool:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user