persist: add attributes props to accumulator

This commit is contained in:
Johannes Kirschbauer
2025-10-10 14:32:15 +02:00
parent 9b0557803e
commit 63fdc13928
2 changed files with 43 additions and 23 deletions

View File

@@ -8,7 +8,9 @@ WRITABLE_PRIORITY_THRESHOLD = 100 # Values below this are not writeable
class PersistenceAttribute(Enum):
WRITE = "write"
READONLY = "readonly"
DELETE = "delete" # can be deleted
class AttributeMap(TypedDict):
@@ -112,6 +114,8 @@ def _determine_writeability_recursive(
"""Recursively determine writeability for all paths in the priority structure.
This is internal recursive function. Use 'determine_writeability' as entry point.
results: AttributeMap that accumulates results, returned at the end.
"""
if results is None:
results = AttributeMap(writeable=set(), non_writeable=set(), attrs={})
@@ -121,8 +125,28 @@ def _determine_writeability_recursive(
if key in {"__this", "__list", "__prio"}:
continue
def get_inventory_exclusive(value: dict) -> bool | None:
if "__this" not in value or value:
return None
definition_locations = value.get("__this", {}).get("files")
if not definition_locations:
return None
return (
len(definition_locations) == 1
and definition_locations[0] == "inventory.json"
)
path = (*current_path, key)
# If the value is defined only in inventory.json, we might be able to delete it.
# If we don't know (None), decide to allow deletion as well. (Backwards compatibility)
# Unless there is a default that applies instead, when removed. Currently we cannot test that.
# So we assume exclusive values can be removed. In reality we might need to check defaults too. (TODO)
is_inventory_exclusive = get_inventory_exclusive(value)
if is_inventory_exclusive or is_inventory_exclusive is None:
results["attrs"].setdefault(path, set()).add(PersistenceAttribute.DELETE)
# Determine priority for this key
key_priority = get_priority(value)
effective_priority = (
@@ -135,6 +159,8 @@ def _determine_writeability_recursive(
)
if force_non_writeable:
results["attrs"].setdefault(path, set()).add(PersistenceAttribute.READONLY)
# Legacy:
results["non_writeable"].add(path)
# All children are also non-writeable
if isinstance(value, dict):
@@ -157,8 +183,13 @@ def _determine_writeability_recursive(
value_in_all = all_values.get(key)
if is_key_writeable(effective_priority, exists_in_persisted, value_in_all):
# TODO: Distinguish between different write types?
results["attrs"].setdefault(path, set()).add(PersistenceAttribute.WRITE)
# Legacy:
results["writeable"].add(path)
else:
results["attrs"].setdefault(path, set()).add(PersistenceAttribute.READONLY)
# Legacy:
results["non_writeable"].add(path)
# Recurse into children

View File

@@ -50,10 +50,8 @@ def test_write_simple() -> None:
data: dict = {}
res = compute_write_map(prios, default, data)
assert res == {
"writeable": {("foo", "bar"), ("foo",), ("foo.bar",)},
"non_writeable": set(),
}
assert res["writeable"] == {("foo", "bar"), ("foo",), ("foo.bar",)}
assert res["non_writeable"] == set()
# Compatibility test for old __prio style
@@ -73,10 +71,8 @@ def test_write_inherited() -> None:
data: dict = {}
res = compute_write_map(prios, {"foo": {"bar": {}}}, data)
assert res == {
"writeable": {("foo",), ("foo", "bar"), ("foo", "bar", "baz")},
"non_writeable": set(),
}
assert res["writeable"] == {("foo", "bar"), ("foo",), ("foo", "bar", "baz")}
assert res["non_writeable"] == set()
def test_non_write_inherited() -> None:
@@ -93,10 +89,8 @@ def test_non_write_inherited() -> None:
data: dict = {}
res = compute_write_map(prios, {}, data)
assert res == {
"writeable": set(),
"non_writeable": {("foo",), ("foo", "bar", "baz"), ("foo", "bar")},
}
assert res["non_writeable"] == {("foo",), ("foo", "bar"), ("foo", "bar", "baz")}
assert res["writeable"] == set()
def test_write_list() -> None:
@@ -114,10 +108,9 @@ def test_write_list() -> None:
], # <- writeable: because lists are merged. Filtering out nix-values comes later
}
res = compute_write_map(prios, default, data)
assert res == {
"writeable": {("foo",)},
"non_writeable": set(),
}
assert res["writeable"] == {("foo",)}
assert res["non_writeable"] == set()
def test_write_because_written() -> None:
@@ -138,10 +131,9 @@ def test_write_because_written() -> None:
# Given the following data. {}
# Check that the non-writeable paths are correct.
res = compute_write_map(prios, {"foo": {"bar": {}}}, {})
assert res == {
"writeable": {("foo",), ("foo", "bar")},
"non_writeable": {("foo", "bar", "baz"), ("foo", "bar", "foobar")},
}
assert res["writeable"] == {("foo",), ("foo", "bar")}
assert res["non_writeable"] == {("foo", "bar", "baz"), ("foo", "bar", "foobar")}
data: dict = {
"foo": {
@@ -151,7 +143,4 @@ def test_write_because_written() -> None:
},
}
res = compute_write_map(prios, {}, data)
assert res == {
"writeable": {("foo",), ("foo", "bar"), ("foo", "bar", "baz")},
"non_writeable": {("foo", "bar", "foobar")},
}