clan_lib: use structured keys for def calc_patches

This allows key that contain dot  and other potentially ambigous unicode
This commit is contained in:
Johannes Kirschbauer
2025-09-21 16:01:53 +02:00
parent 00d38180d3
commit bd50e1ee40
3 changed files with 1060 additions and 8 deletions

View File

@@ -1,11 +1,79 @@
from typing import Any
from collections import Counter
from typing import Any, TypedDict
from clan_lib.errors import ClanError
from clan_lib.persist.util import list_difference
type DictLike = dict[str, Any] | Any
PathTuple = tuple[str, ...]
def list_difference(all_items: list, filter_items: list) -> list:
"""Applys a filter to a list and returns the items in all_items that are not in filter_items
Example:
all_items = [1, 2, 3, 4]
filter_items = [3, 4]
list_difference(all_items, filter_items) == [1, 2]
"""
return [value for value in all_items if value not in filter_items]
def set_value_by_path_tuple(d: DictLike, path: PathTuple, content: Any) -> None:
"""Update the value at a specific path in a nested dictionary.
If the value didn't exist before, it will be created recursively.
:param d: The dictionary to update.
:param path: A tuple of strings representing the path to the value.
:param content: The new value to set.
"""
keys = path
current = d
for key in keys[:-1]:
current = current.setdefault(key, {})
current[keys[-1]] = content
def delete_by_path_tuple(d: dict[str, Any], path: PathTuple) -> Any:
"""Deletes the nested entry specified by a dot-separated path from the dictionary using pop().
:param data: The dictionary to modify.
:param path: A dot-separated string indicating the nested key to delete.
e.g., "foo.bar.baz" will attempt to delete data["foo"]["bar"]["baz"].
:raises KeyError: If any intermediate key is missing or not a dictionary,
or if the final key to delete is not found.
"""
if not path:
msg = "Cannot delete. Path is empty."
raise KeyError(msg)
keys = path
current = d
# Navigate to the parent dictionary of the final key
for key in keys[:-1]:
if key not in current or not isinstance(current[key], dict):
msg = f"Cannot delete. Key '{path_to_string(path)}' not found or not a dictionary '{d}'"
raise KeyError(msg)
current = current[key]
# Attempt to pop the final key
last_key = keys[-1]
try:
value = current.pop(last_key)
except KeyError as exc:
# Possibly data was already deleted
msg = f"Canot delete. Path '{path}' not found in data '{d}'"
raise KeyError(msg) from exc
# return {}
else:
return {last_key: value}
def path_to_string(path: PathTuple) -> str:
"""Convert tuple path to string for display/error messages."""
return ".".join(str(p) for p in path)
@@ -72,3 +140,382 @@ def calculate_static_data(
static_flat[key] = static_items
return static_flat
def find_deleted_paths_structured(
all_values: dict[str, Any],
update: dict[str, Any],
parent_key: PathTuple = (),
) -> set[PathTuple]:
"""Find paths that are marked for deletion in the structured format."""
deleted_paths: set[PathTuple] = set()
for key, p_value in all_values.items():
current_path = (*parent_key, key) if parent_key else (key,)
if key not in update:
# Key doesn't exist at all -> entire branch deleted
deleted_paths.add(current_path)
else:
u_value = update[key]
# If persisted value is dict, check the update value
if isinstance(p_value, dict):
if isinstance(u_value, dict):
# If persisted dict is non-empty but updated dict is empty,
# that means everything under this branch is removed.
if p_value and not u_value:
# All children are removed
for child_key in p_value:
child_path = (*current_path, child_key)
deleted_paths.add(child_path)
else:
# Both are dicts, recurse deeper
deleted_paths |= find_deleted_paths_structured(
p_value,
u_value,
current_path,
)
else:
# deleted_paths.add(current_path)
# Persisted was a dict, update is not a dict
# This can happen if user sets the value to None, explicitly. # produce 'set path None' is not a deletion
pass
return deleted_paths
def should_skip_path(path: tuple, delete_paths: set[tuple]) -> bool:
"""Check if path should be skipped because it's under a deletion path."""
return any(path_starts_with(path, delete_path) for delete_path in delete_paths)
def validate_no_static_deletion(
path: PathTuple, new_value: Any, static_data: dict[PathTuple, Any]
) -> None:
"""Validate that we're not trying to delete static data."""
# Check if we're trying to delete a path that exists in static data
if path in static_data and new_value is None:
msg = f"Path '{path_to_string(path)}' is readonly - since its defined via a .nix file"
raise ClanError(msg)
# For lists, check if we're trying to remove static items
if isinstance(new_value, list) and path in static_data:
static_items = static_data[path]
if isinstance(static_items, list):
missing_static = [item for item in static_items if item not in new_value]
if missing_static:
msg = f"Path '{path_to_string(path)}' doesn't contain static items {missing_static} - They are readonly - since they are defined via a .nix file"
raise ClanError(msg)
def get_priority(value: Any) -> int | None:
"""Extract priority from a value, handling both dict and non-dict cases."""
if isinstance(value, dict) and "__prio" in value:
return value["__prio"]
return None
def is_mergeable_type(value: Any) -> bool:
"""Check if a value type supports merging (dict or list)."""
return isinstance(value, (dict, list))
def should_inherit_non_writeable(
priority: int | None, parent_non_writeable: bool
) -> bool:
"""Determine if this node should be marked as non-writeable due to inheritance."""
if parent_non_writeable:
return True
return priority is not None and priority < WRITABLE_PRIORITY_THRESHOLD
def is_key_writeable(
priority: int | None, exists_in_persisted: bool, value_in_all: Any
) -> bool:
"""Determine if a key is writeable based on priority and mergeability rules.
Rules:
- Priority > 100: Always writeable
- Priority < 100: Never writeable
- Priority == 100: Writeable if mergeable (dict/list) OR exists in persisted
"""
if priority is None:
return False # No priority means not writeable
if priority > WRITABLE_PRIORITY_THRESHOLD:
return True
if priority < WRITABLE_PRIORITY_THRESHOLD:
return False
# priority == WRITABLE_PRIORITY_THRESHOLD (100)
return is_mergeable_type(value_in_all) or exists_in_persisted
WRITABLE_PRIORITY_THRESHOLD = 100 # Values below this are not writeable
class WriteabilityResult(TypedDict):
writeable: set[PathTuple]
non_writeable: set[PathTuple]
def _determine_writeability_recursive(
priorities: dict[str, Any],
all_values: dict[str, Any],
persisted: dict[str, Any],
current_path: PathTuple = (),
inherited_priority: int | None = None,
parent_non_writeable: bool = False,
results: WriteabilityResult | None = None,
) -> WriteabilityResult:
"""Recursively determine writeability for all paths in the priority structure.
This is internal recursive function. Use 'determine_writeability' as entry point.
"""
if results is None:
results = WriteabilityResult(writeable=set(), non_writeable=set())
for key, value in priorities.items():
# Skip metadata keys
if key == "__prio":
continue
path = (*current_path, key)
# Determine priority for this key
key_priority = get_priority(value)
effective_priority = (
key_priority if key_priority is not None else inherited_priority
)
# Check if this should be non-writeable due to inheritance
force_non_writeable = should_inherit_non_writeable(
effective_priority, parent_non_writeable
)
if force_non_writeable:
results["non_writeable"].add(path)
# All children are also non-writeable
if isinstance(value, dict):
_determine_writeability_recursive(
value,
all_values.get(key, {}),
{}, # Doesn't matter since all children will be non-writeable
path,
effective_priority,
parent_non_writeable=True,
results=results,
)
else:
# Determine writeability based on rules
if effective_priority is None:
msg = f"Priority for path '{path_to_string(path)}' is not defined. Cannot determine writeability."
raise ClanError(msg)
exists_in_persisted = key in persisted
value_in_all = all_values.get(key)
if is_key_writeable(effective_priority, exists_in_persisted, value_in_all):
results["writeable"].add(path)
else:
results["non_writeable"].add(path)
# Recurse into children
if isinstance(value, dict):
_determine_writeability_recursive(
value,
all_values.get(key, {}),
persisted.get(key, {}),
path,
effective_priority,
parent_non_writeable=False,
results=results,
)
return results
def determine_writeability(
priorities: dict[str, Any], all_values: dict[str, Any], persisted: dict[str, Any]
) -> WriteabilityResult:
"""Determine writeability for all paths based on priorities and current data.
- Priority-based writeability: Values with priority < 100 are not writeable
- Inheritance: Children inherit parent priorities if not specified
- Special case at priority 100: Can be writeable if it's mergeable (dict/list) or exists in persisted data
Args:
priorities: The priority structure defining writeability rules. See: 'clanInternals.inventoryClass.introspection'
all_values: All values in the inventory, See: 'clanInternals.inventoryClass.allValues'
persisted: The current mutable state of the inventory, see: 'readFile inventory.json'
Returns:
Dict with sets of writeable and non-writeable paths using tuple keys
"""
return _determine_writeability_recursive(priorities, all_values, persisted)
def find_duplicates(string_list: list[str]) -> list[str]:
count = Counter(string_list)
return [item for item, freq in count.items() if freq > 1]
def is_writeable_key(
key: PathTuple,
writeables: WriteabilityResult,
) -> bool:
"""Recursively check if a key is writeable.
If key "machines.machine1.deploy.targetHost" is specified but writeability is only
defined for "machines", we pop the last key and check if the parent key is writeable/non-writeable.
"""
remaining = key
while remaining:
current_path = remaining
if current_path in writeables["writeable"]:
return True
if current_path in writeables["non_writeable"]:
return False
remaining = remaining[:-1]
msg = f"Cannot determine writeability for key '{key}'"
raise ClanError(msg)
def validate_writeability(path: PathTuple, writeables: WriteabilityResult) -> None:
"""Validate that a path is writeable."""
if not is_writeable_key(path, writeables):
msg = f"Path '{path_to_string(path)}' is readonly. - It seems its value is statically defined in nix."
raise ClanError(msg)
def validate_type_compatibility(path: tuple, old_value: Any, new_value: Any) -> None:
"""Validate that type changes are allowed."""
if old_value is not None and type(old_value) is not type(new_value):
if new_value is None:
return # Deletion is handled separately
path_str = path_to_string(path)
msg = f"Type mismatch for path '{path_str}'. Cannot update {type(old_value)} with {type(new_value)}"
description = f"""
Previous value is of type '{type(old_value)}' this operation would change it to '{type(new_value)}'.
Prev: {old_value}
->
After: {new_value}
"""
raise ClanError(msg, description=description)
def validate_list_uniqueness(path: tuple, value: Any) -> None:
"""Validate that lists don't contain duplicates."""
if isinstance(value, list):
duplicates = find_duplicates(value)
if duplicates:
msg = f"Path '{path_to_string(path)}' contains list duplicates: {duplicates} - List values must be unique."
raise ClanError(msg)
def validate_patch_conflicts(
patches: set[PathTuple], delete_paths: set[PathTuple]
) -> None:
"""Ensure patches don't conflict with deletions."""
conflicts = {
path
for delete_path in delete_paths
for path in patches
if path_starts_with(path, delete_path)
}
if conflicts:
conflict_list = ", ".join(path_to_string(path) for path in sorted(conflicts))
msg = f"The following paths are marked for deletion but also have update values: {conflict_list}. - You cannot delete and patch the same path and its subpaths."
raise ClanError(msg)
def calc_patches(
persisted: dict[str, Any],
update: dict[str, Any],
all_values: dict[str, Any],
writeables: WriteabilityResult,
) -> tuple[dict[PathTuple, Any], set[PathTuple]]:
"""Calculate the patches to apply to the inventory using structured paths.
Given its current state and the update to apply.
Calulates the necessary SET patches and DELETE paths.
While validating writeability rules.
Args:
persisted: The current mutable state of the inventory
update: The update to apply
all_values: All values in the inventory (static + mutable merged)
writeables: The writeable keys. Use 'determine_writeability'.
Example: {'writeable': {'foo', 'foo.bar'}, 'non_writeable': {'foo.nix'}}
Returns:
Tuple of (SET patches dict, DELETE paths set)
Raises:
ClanError: When validation fails or invalid operations are attempted
"""
# Calculate static data using structured paths
static_data = calculate_static_data(all_values, persisted)
# Flatten all data structures using structured paths
# persisted_flat = flatten_data_structured(persisted)
update_flat = flatten_data_structured(update)
all_values_flat = flatten_data_structured(all_values)
# Early validation: ensure we're not trying to modify static-only paths
# validate_no_static_modification(update_flat, static_data)
# Find paths marked for deletion
delete_paths = find_deleted_paths_structured(all_values, update)
# Validate deletions don't affect static data
for delete_path in delete_paths:
for static_path in static_data:
if path_starts_with(static_path, delete_path):
msg = f"Cannot delete path '{path_to_string(delete_path)}' - Readonly path '{path_to_string(static_path)}' is set via .nix file"
raise ClanError(msg)
# Get all paths that might need processing
all_paths: set[PathTuple] = set(all_values_flat) | set(update_flat)
# Calculate patches
patches: dict[PathTuple, Any] = {}
for path in all_paths:
old_value = all_values_flat.get(path)
new_value = update_flat.get(path)
has_value = path in update_flat
# Skip if no change
if old_value == new_value:
continue
# Skip if path is marked for deletion or under a deletion path
if should_skip_path(path, delete_paths):
continue
# Skip deletions (they're handled by delete_paths)
if old_value is not None and not has_value:
continue
# Validate the change is allowed
validate_no_static_deletion(path, new_value, static_data)
validate_writeability(path, writeables)
validate_type_compatibility(path, old_value, new_value)
validate_list_uniqueness(path, new_value)
patch_value = new_value # Init
if isinstance(new_value, list):
patch_value = list_difference(new_value, static_data.get(path, []))
patches[path] = patch_value
validate_patch_conflicts(set(patches.keys()), delete_paths)
return patches, delete_paths

View File

@@ -1,4 +1,17 @@
from clan_lib.persist.static_data import calculate_static_data, flatten_data_structured
from copy import deepcopy
from typing import Any
import pytest
from clan_lib.errors import ClanError
from clan_lib.persist.static_data import (
calc_patches,
calculate_static_data,
determine_writeability,
flatten_data_structured,
set_value_by_path_tuple,
)
from clan_lib.persist.util import delete_by_path, set_value_by_path
def test_flatten_data_structured() -> None:
@@ -204,3 +217,593 @@ def test_dot_in_keys() -> None:
static_data = calculate_static_data(all_values, persisted)
assert static_data == expected_static
def test_write_simple() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
"bar": {"__prio": 1000}, # <- writeable: mkDefault "foo.bar"
},
"foo.bar": {"__prio": 1000},
}
default: dict = {"foo": {}}
data: dict = {}
res = determine_writeability(prios, default, data)
assert res == {
"writeable": {("foo", "bar"), ("foo",), ("foo.bar",)},
"non_writeable": set(),
}
def test_write_inherited() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
"bar": {
# Inherits prio from parent <- writeable: "foo.bar"
"baz": {"__prio": 1000}, # <- writeable: "foo.bar.baz"
},
},
}
data: dict = {}
res = determine_writeability(prios, {"foo": {"bar": {}}}, data)
assert res == {
"writeable": {("foo",), ("foo", "bar"), ("foo", "bar", "baz")},
"non_writeable": set(),
}
def test_non_write_inherited() -> None:
prios = {
"foo": {
"__prio": 50, # <- non writeable: mkForce "foo" = {...}
"bar": {
# Inherits prio from parent <- non writeable
"baz": {"__prio": 1000}, # <- non writeable: mkDefault "foo.bar.baz"
},
},
}
data: dict = {}
res = determine_writeability(prios, {}, data)
assert res == {
"writeable": set(),
"non_writeable": {("foo",), ("foo", "bar", "baz"), ("foo", "bar")},
}
def test_write_list() -> None:
prios = {
"foo": {
"__prio": 100,
},
}
data: dict = {}
default: dict = {
"foo": [
"a",
"b",
], # <- writeable: because lists are merged. Filtering out nix-values comes later
}
res = determine_writeability(prios, default, data)
assert res == {
"writeable": {("foo",)},
"non_writeable": set(),
}
def test_write_because_written() -> None:
# This test is essential to allow removing data that was previously written.
# It may also have the weird effect, that somebody can change data, but others cannot.
# But this might be acceptable, since its rare minority edge-case.
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
"bar": {
# Inherits prio from parent <- writeable
"baz": {"__prio": 100}, # <- non writeable usually
"foobar": {"__prio": 100}, # <- non writeable
},
},
}
# Given the following data. {}
# Check that the non-writeable paths are correct.
res = determine_writeability(prios, {"foo": {"bar": {}}}, {})
assert res == {
"writeable": {("foo",), ("foo", "bar")},
"non_writeable": {("foo", "bar", "baz"), ("foo", "bar", "foobar")},
}
data: dict = {
"foo": {
"bar": {
"baz": "foo", # <- written. Since we created the data, we know we can write to it
},
},
}
res = determine_writeability(prios, {}, data)
assert res == {
"writeable": {("foo",), ("foo", "bar"), ("foo", "bar", "baz")},
"non_writeable": {("foo", "bar", "foobar")},
}
# --------- Write tests ---------
def test_update_simple() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
"bar": {"__prio": 1000}, # <- writeable: mkDefault "foo.bar"
"nix": {"__prio": 100}, # <- non writeable: "foo.bar" (defined in nix)
},
}
data_eval = {"foo": {"bar": "baz", "nix": "this is set in nix"}}
data_disk: dict = {}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {
"writeable": {("foo",), ("foo", "bar")},
"non_writeable": {("foo", "nix")},
}
update = {
"foo": {
"bar": "new value", # <- user sets this value
# If the user would have set this value, it would trigger an error
"nix": "this is set in nix", # <- user didnt touch this value
},
}
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {("foo", "bar"): "new value"}
assert delete_set == set()
def test_update_add_empty_dict() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
"nix": {"__prio": 100}, # <- non writeable: "foo.nix" (defined in nix)
},
}
data_eval: dict = {"foo": {"nix": {}}}
data_disk: dict = {}
writeables = determine_writeability(prios, data_eval, data_disk)
update = deepcopy(data_eval)
set_value_by_path_tuple(update, ("foo", "mimi"), {})
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {("foo", "mimi"): {}} # this is what gets persisted
assert delete_set == set()
def test_update_many() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
"bar": {"__prio": 100}, # <-
"nix": {"__prio": 100}, # <- non writeable: "foo.nix" (defined in nix)
"nested": {
"__prio": 100,
"x": {"__prio": 100}, # <- writeable: "foo.nested.x"
"y": {"__prio": 100}, # <- non-writeable: "foo.nested.y"
},
},
}
data_eval = {
"foo": {
"bar": "baz",
"nix": "this is set in nix",
"nested": {"x": "x", "y": "y"},
},
}
data_disk = {"foo": {"bar": "baz", "nested": {"x": "x"}}}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {
"writeable": {
("foo",),
("foo", "bar"),
("foo", "nested"),
("foo", "nested", "x"),
},
"non_writeable": {("foo", "nix"), ("foo", "nested", "y")},
}
update = {
"foo": {
"bar": "new value for bar", # <- user sets this value
"nix": "this is set in nix", # <- user cannot set this value
"nested": {
"x": "new value for x", # <- user sets this value
"y": "y", # <- user cannot set this value
},
},
}
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {
("foo", "bar"): "new value for bar",
("foo", "nested", "x"): "new value for x",
}
assert delete_set == set()
def test_update_parent_non_writeable() -> None:
prios = {
"foo": {
"__prio": 50, # <- non-writeable: "foo"
"bar": {"__prio": 1000}, # <- writeable: mkDefault "foo.bar"
},
}
data_eval = {
"foo": {
"bar": "baz",
},
}
data_disk = {
"foo": {
"bar": "baz",
},
}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {
"writeable": set(),
"non_writeable": {("foo",), ("foo", "bar")},
}
update = {
"foo": {
"bar": "new value", # <- user sets this value
},
}
with pytest.raises(ClanError) as error:
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
assert "Path 'foo.bar' is readonly." in str(error.value)
def test_remove_non_writable_attrs() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
},
}
data_eval: dict = {"foo": {"bar": {}, "baz": {}}}
data_disk: dict = {}
writeables = determine_writeability(prios, data_eval, data_disk)
update: dict = {
"foo": {
"bar": {}, # <- user leaves this value
# User removed "baz"
},
}
with pytest.raises(ClanError) as error:
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
assert "Cannot delete path 'foo.baz'" in str(error.value)
def test_update_list() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
},
}
data_eval = {
# [ "A" ] is defined in nix.
"foo": ["A", "B"],
}
data_disk = {"foo": ["B"]}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": {("foo",)}, "non_writeable": set()}
# Add "C" to the list
update = {"foo": ["A", "B", "C"]} # User wants to add "C"
patchset, _ = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {("foo",): ["B", "C"]}
# "foo": ["A", "B"]
# Remove "B" from the list
# Expected is [ ] because ["A"] is defined in nix
update = {"foo": ["A"]} # User wants to remove "B"
patchset, _ = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {("foo",): []}
def test_update_list_duplicates() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
},
}
data_eval = {
# [ "A" ] is defined in nix.
"foo": ["A", "B"],
}
data_disk = {"foo": ["B"]}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": {("foo",)}, "non_writeable": set()}
# Add "A" to the list
update = {"foo": ["A", "B", "A"]} # User wants to add duplicate "A"
with pytest.raises(ClanError) as error:
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
assert "Path 'foo' contains list duplicates: ['A']" in str(error.value)
def test_dont_persist_defaults() -> None:
"""Default values should not be persisted to disk if not explicitly requested by the user."""
prios = {
"enabled": {"__prio": 1500},
"config": {"__prio": 100},
}
data_eval = {
"enabled": True,
"config": {"foo": "bar"},
}
data_disk: dict[str, Any] = {}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {
"writeable": {("config",), ("enabled",)},
"non_writeable": set(),
}
update = deepcopy(data_eval)
set_value_by_path(update, "config.foo", "foo")
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {("config", "foo"): "foo"}
assert delete_set == set()
def test_set_null() -> None:
data_eval: dict = {
"foo": {},
"bar": {},
}
data_disk = data_eval
# User set Foo to null
# User deleted bar
update = {"foo": None}
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=determine_writeability(
{"__prio": 100, "foo": {"__prio": 100}},
data_eval,
data_disk,
),
)
assert patchset == {("foo",): None}
assert delete_set == {("bar",)}
def test_machine_delete() -> None:
prios = {
"machines": {"__prio": 100},
}
data_eval = {
"machines": {
"foo": {"name": "foo"},
"bar": {"name": "bar"},
"naz": {"name": "naz"},
},
}
data_disk = data_eval
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": {("machines",)}, "non_writeable": set()}
# Delete machine "bar" from the inventory
update = deepcopy(data_eval)
delete_by_path(update, "machines.bar")
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {}
assert delete_set == {("machines", "bar")}
def test_update_mismatching_update_type() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
},
}
data_eval = {"foo": ["A", "B"]}
data_disk: dict = {}
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": {("foo",)}, "non_writeable": set()}
# set foo to an int but it is a list
update: dict = {"foo": 1}
with pytest.raises(ClanError) as error:
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
assert (
"Type mismatch for path 'foo'. Cannot update <class 'list'> with <class 'int'>"
in str(error.value)
)
def test_delete_key() -> None:
prios = {
"foo": {
"__prio": 100, # <- writeable: "foo"
},
}
data_eval = {"foo": {"bar": "baz"}}
data_disk = data_eval
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": {("foo",)}, "non_writeable": set()}
# remove all keys from foo
update: dict = {"foo": {}}
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {("foo",): {}}
assert delete_set == {("foo", "bar")}
def test_delete_key_intermediate() -> None:
prios = {
"foo": {
"__prio": 100,
},
}
data_eval = {
"foo": {
# Remove the key "bar"
"bar": {"name": "bar", "info": "info", "other": ["a", "b"]},
# Leave the key "other"
"other": {"name": "other", "info": "info", "other": ["a", "b"]},
},
}
update: dict = {
"foo": {"other": {"name": "other", "info": "info", "other": ["a", "b"]}},
}
data_disk = data_eval
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": {("foo",)}, "non_writeable": set()}
# remove all keys from foo
patchset, delete_set = calc_patches(
data_disk,
update,
all_values=data_eval,
writeables=writeables,
)
assert patchset == {}
assert delete_set == {("foo", "bar")}
def test_delete_key_non_writeable() -> None:
prios = {
"foo": {
"__prio": 50,
},
}
data_eval = {
"foo": {
# Remove the key "bar"
"bar": {"name": "bar", "info": "info", "other": ["a", "b"]},
},
}
update: dict = {"foo": {}}
data_disk = data_eval
writeables = determine_writeability(prios, data_eval, data_disk)
assert writeables == {"writeable": set(), "non_writeable": {("foo",)}}
# remove all keys from foo
with pytest.raises(ClanError) as error:
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
assert "Path 'foo' is readonly." in str(error.value)

View File

@@ -129,13 +129,15 @@ def flatten_data(data: dict, parent_key: str = "", separator: str = ".") -> dict
def list_difference(all_items: list, filter_items: list) -> list:
"""Unmerge the current list. Given a previous list.
"""Applys a filter to a list and returns the items in all_items that are not in filter_items
Returns:
The other list.
Example:
all_items = [1, 2, 3, 4]
filter_items = [3, 4]
list_difference(all_items, filter_items) == [1, 2]
"""
# Unmerge the lists
return [value for value in all_items if value not in filter_items]
@@ -159,7 +161,7 @@ def find_deleted_paths(
"""
deleted_paths = set()
# Iterate over keys in persisted
# Iterate over keys in all_data
for key, p_value in curr.items():
current_path = f"{parent_key}.{key}" if parent_key else key
# Check if this key exists in update