diff --git a/pkgs/clan-cli/clan_cli/inventory/__init__.py b/pkgs/clan-cli/clan_cli/inventory/__init__.py index 7239249fb..87f63db57 100644 --- a/pkgs/clan-cli/clan_cli/inventory/__init__.py +++ b/pkgs/clan-cli/clan_cli/inventory/__init__.py @@ -140,12 +140,59 @@ def find_duplicates(string_list: list[str]) -> list[str]: return duplicates +def find_deleted_paths( + persisted: dict[str, Any], update: dict[str, Any], parent_key: str = "" +) -> set[str]: + """ + Recursively find keys (at any nesting level) that exist in persisted but do not + exist in update. If a nested dictionary is completely removed, return that dictionary key. + + :param persisted: The original (persisted) nested dictionary. + :param update: The updated nested dictionary (some keys might be removed). + :param parent_key: The dotted path to the current dictionary's location. + :return: A set of dotted paths indicating keys or entire nested paths that were deleted. + """ + deleted_paths = set() + + # Iterate over keys in persisted + for key, p_value in persisted.items(): + current_path = f"{parent_key}.{key}" if parent_key else key + # Check if this key exists in update + # breakpoint() + if key not in update: + # Key doesn't exist at all -> entire branch deleted + deleted_paths.add(current_path) + else: + u_value = update[key] + # If persisted value is dict, check the update value + if isinstance(p_value, dict): + if isinstance(u_value, dict): + # If persisted dict is non-empty but updated dict is empty, + # that means everything under this branch is removed. + if p_value and not u_value: + # All children are removed + for child_key in p_value: + child_path = f"{current_path}.{child_key}" + deleted_paths.add(child_path) + else: + # Both are dicts, recurse deeper + deleted_paths |= find_deleted_paths( + p_value, u_value, current_path + ) + else: + # Persisted was a dict, update is not a dict -> entire branch changed + # Consider this as a full deletion of the persisted branch + deleted_paths.add(current_path) + + return deleted_paths + + def calc_patches( persisted: dict[str, Any], update: dict[str, Any], all_values: dict[str, Any], writeables: dict[str, set[str]], -) -> dict[str, Any]: +) -> tuple[dict[str, Any], set[str]]: """ Calculate the patches to apply to the inventory. @@ -158,6 +205,8 @@ def calc_patches( : param writeable: The writeable keys. Use 'determine_writeability'. Example: {'writeable': {'foo', 'foo.bar'}, 'non_writeable': {'foo.nix'}} : param all_values: All values in the inventory retrieved from the flake evaluation. + + Returns a tuple with the SET and DELETE patches. """ persisted_flat = flatten_data(persisted) update_flat = flatten_data(update) @@ -217,7 +266,14 @@ def calc_patches( msg = f"Cannot determine writeability for key '{update_key}'" raise ClanError(msg) - return patchset + delete_set = find_deleted_paths(persisted, update) + + for delete_key in delete_set: + if not is_writeable_key(delete_key): + msg = f"Cannot delete: Key '{delete_key}' is not writeable." + raise ClanError(msg) + + return patchset, delete_set def determine_writeability( @@ -363,6 +419,42 @@ def load_inventory_json(flake_dir: str | Path) -> Inventory: return inventory +def delete(d: dict[str, Any], path: str) -> Any: + """ + Deletes the nested entry specified by a dot-separated path from the dictionary using pop(). + + :param data: The dictionary to modify. + :param path: A dot-separated string indicating the nested key to delete. + e.g., "foo.bar.baz" will attempt to delete data["foo"]["bar"]["baz"]. + + :raises KeyError: If any intermediate key is missing or not a dictionary, + or if the final key to delete is not found. + """ + if not path: + msg = "Cannot delete. Path is empty." + raise ClanError(msg) + + keys = path.split(".") + current = d + + # Navigate to the parent dictionary of the final key + for key in keys[:-1]: + if key not in current or not isinstance(current[key], dict): + msg = f"Cannot delete. Key '{path}' not found or not a dictionary '{d}'." + raise ClanError(msg) + current = current[key] + + # Attempt to pop the final key + last_key = keys[-1] + try: + value = current.pop(last_key) + except KeyError: + msg = f"Cannot delete. Path '{path}' not found in data '{d}'. " + raise ClanError(msg) from KeyError + else: + return {last_key: value} + + def patch(d: dict[str, Any], path: str, content: Any) -> None: """ Update the value at a specific dot-separated path in a nested dictionary. @@ -436,17 +528,20 @@ def set_inventory(inventory: Inventory, flake_dir: str | Path, message: str) -> inventory.pop("options", None) # type: ignore inventory.pop("assertions", None) # type: ignore - patchset = calc_patches( + patchset, delete_set = calc_patches( dict(write_info.data_disk), dict(inventory), dict(write_info.data_eval), write_info.writeables, ) - persisted = dict(write_info.data_disk) + for patch_path, data in patchset.items(): patch(persisted, patch_path, data) + for delete_path in delete_set: + delete(persisted, delete_path) + inventory_file = get_inventory_path(flake_dir) with inventory_file.open("w") as f: json.dump(persisted, f, indent=2) @@ -471,5 +566,5 @@ def init_inventory(directory: str, init: Inventory | None = None) -> None: @API.register -def get_inventory(flake_dir: str | Path) -> Inventory: - return load_inventory_eval(flake_dir) +def get_inventory(base_path: str | Path) -> Inventory: + return load_inventory_eval(base_path) diff --git a/pkgs/clan-cli/clan_cli/machines/delete.py b/pkgs/clan-cli/clan_cli/machines/delete.py index 350305eab..d9db53b7e 100644 --- a/pkgs/clan-cli/clan_cli/machines/delete.py +++ b/pkgs/clan-cli/clan_cli/machines/delete.py @@ -1,4 +1,5 @@ import argparse +import logging import shutil from clan_cli.api import API @@ -8,12 +9,18 @@ from clan_cli.dirs import specific_machine_dir from clan_cli.errors import ClanError from clan_cli.inventory import get_inventory, set_inventory +log = logging.getLogger(__name__) + @API.register def delete_machine(flake: FlakeId, name: str) -> None: inventory = get_inventory(flake.path) - machine = inventory.get("machines", {}).pop(name, None) + if "machines" not in inventory: + msg = "No machines in inventory" + raise ClanError(msg) + + machine = inventory["machines"].pop(name, None) if machine is None: msg = f"Machine {name} does not exist" raise ClanError(msg) diff --git a/pkgs/clan-cli/tests/test_patch_inventory.py b/pkgs/clan-cli/tests/test_patch_inventory.py index 969ecfc36..cc2f73894 100644 --- a/pkgs/clan-cli/tests/test_patch_inventory.py +++ b/pkgs/clan-cli/tests/test_patch_inventory.py @@ -3,6 +3,7 @@ import pytest from clan_cli.errors import ClanError from clan_cli.inventory import ( calc_patches, + delete, determine_writeability, patch, unmerge_lists, @@ -194,7 +195,7 @@ def test_update_simple() -> None: # If the user would have set this value, it would trigger an error } } - patchset = calc_patches( + patchset, _ = calc_patches( data_disk, update, all_values=data_eval, writeables=writeables ) @@ -242,7 +243,7 @@ def test_update_many() -> None: }, } } - patchset = calc_patches( + patchset, _ = calc_patches( data_disk, update, all_values=data_eval, writeables=writeables ) @@ -310,18 +311,20 @@ def test_update_list() -> None: "foo": ["A", "B", "C"] # User wants to add "C" } - patchset = calc_patches( + patchset, _ = calc_patches( data_disk, update, all_values=data_eval, writeables=writeables ) assert patchset == {"foo": ["B", "C"]} + # "foo": ["A", "B"] # Remove "B" from the list + # Expected is [ ] because ["A"] is defined in nix update = { "foo": ["A"] # User wants to remove "B" } - patchset = calc_patches( + patchset, _ = calc_patches( data_disk, update, all_values=data_eval, writeables=writeables ) @@ -375,21 +378,171 @@ def test_update_mismatching_update_type() -> None: assert writeables == {"writeable": {"foo"}, "non_writeable": set()} - # set foo.A which doesnt exist - update_1 = {"foo": {"A": "B"}} - - with pytest.raises(ClanError) as error: - calc_patches(data_disk, update_1, all_values=data_eval, writeables=writeables) - - assert str(error.value) == "Key 'foo.A' cannot be set. It does not exist." - # set foo to an int but it is a list - update_2: dict = {"foo": 1} + update: dict = {"foo": 1} with pytest.raises(ClanError) as error: - calc_patches(data_disk, update_2, all_values=data_eval, writeables=writeables) + calc_patches(data_disk, update, all_values=data_eval, writeables=writeables) assert ( str(error.value) == "Type mismatch for key 'foo'. Cannot update with " ) + + +def test_delete_key() -> None: + prios = { + "foo": { + "__prio": 100, # <- writeable: "foo" + }, + } + + data_eval = {"foo": {"bar": "baz"}} + + data_disk = data_eval + + writeables = determine_writeability(prios, data_eval, data_disk) + + assert writeables == {"writeable": {"foo"}, "non_writeable": set()} + + # remove all keys from foo + update: dict = {"foo": {}} + + patchset, delete_set = calc_patches( + data_disk, update, all_values=data_eval, writeables=writeables + ) + + assert patchset == {} + assert delete_set == {"foo.bar"} + + +def test_delete_key_intermediate() -> None: + prios = { + "foo": { + "__prio": 100, + }, + } + + data_eval = { + "foo": { + # Remove the key "bar" + "bar": {"name": "bar", "info": "info", "other": ["a", "b"]}, + # Leave the key "other" + "other": {"name": "other", "info": "info", "other": ["a", "b"]}, + } + } + update: dict = { + "foo": {"other": {"name": "other", "info": "info", "other": ["a", "b"]}} + } + + data_disk = data_eval + + writeables = determine_writeability(prios, data_eval, data_disk) + + assert writeables == {"writeable": {"foo"}, "non_writeable": set()} + + # remove all keys from foo + + patchset, delete_set = calc_patches( + data_disk, update, all_values=data_eval, writeables=writeables + ) + + assert patchset == {} + assert delete_set == {"foo.bar"} + + +def test_delete_key_non_writeable() -> None: + prios = { + "foo": { + "__prio": 50, + }, + } + + data_eval = { + "foo": { + # Remove the key "bar" + "bar": {"name": "bar", "info": "info", "other": ["a", "b"]}, + } + } + update: dict = {"foo": {}} + + data_disk = data_eval + + writeables = determine_writeability(prios, data_eval, data_disk) + + assert writeables == {"writeable": set(), "non_writeable": {"foo"}} + + # remove all keys from foo + with pytest.raises(ClanError) as error: + calc_patches(data_disk, update, all_values=data_eval, writeables=writeables) + + assert "Cannot delete" in str(error.value) + + +def test_delete_atom() -> None: + data = {"foo": {"bar": 1}} + # Removes the key "foo.bar" + # Returns the deleted key-value pair { "bar": 1 } + entry = delete(data, "foo.bar") + + assert entry == {"bar": 1} + assert data == {"foo": {}} + + +def test_delete_intermediate() -> None: + data = {"a": {"b": {"c": {"d": 42}}}} + # Removes "a.b.c.d" + entry = delete(data, "a.b.c") + + assert entry == {"c": {"d": 42}} + # Check all intermediate dictionaries remain intact + assert data == {"a": {"b": {}}} + + +def test_delete_top_level() -> None: + data = {"x": 100, "y": 200} + # Deletes top-level key + entry = delete(data, "x") + assert entry == {"x": 100} + assert data == {"y": 200} + + +def test_delete_key_not_found() -> None: + data = {"foo": {"bar": 1}} + # Trying to delete a non-existing key "foo.baz" + with pytest.raises(ClanError) as excinfo: + delete(data, "foo.baz") + assert "Cannot delete. Path 'foo.baz'" in str(excinfo.value) + # Data should remain unchanged + assert data == {"foo": {"bar": 1}} + + +def test_delete_intermediate_not_dict() -> None: + data = {"foo": "not a dict"} + # Trying to go deeper into a non-dict value + with pytest.raises(ClanError) as excinfo: + delete(data, "foo.bar") + assert "not found or not a dictionary" in str(excinfo.value) + # Data should remain unchanged + assert data == {"foo": "not a dict"} + + +def test_delete_empty_path() -> None: + data = {"foo": {"bar": 1}} + # Attempting to delete with an empty path + with pytest.raises(ClanError) as excinfo: + delete(data, "") + # Depending on how you handle empty paths, you might raise an error or handle it differently. + # If you do raise an error, check the message. + assert "Cannot delete. Path is empty" in str(excinfo.value) + assert data == {"foo": {"bar": 1}} + + +def test_delete_non_existent_path_deep() -> None: + data = {"foo": {"bar": {"baz": 123}}} + # non-existent deep path + with pytest.raises(ClanError) as excinfo: + delete(data, "foo.bar.qux") + assert "not found" in str(excinfo.value) + # Data remains unchanged + assert data == {"foo": {"bar": {"baz": 123}}}