Merge pull request 'fix: Calculate a seperate delete delta for removed data' (#2587) from hsjobeki/clan-core:fix/writeable into main
This commit is contained in:
@@ -3,16 +3,13 @@ import re
|
||||
import tomllib
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, TypedDict, get_args, get_type_hints
|
||||
from typing import Any, TypedDict
|
||||
|
||||
from clan_cli.cmd import run_no_stdout
|
||||
from clan_cli.errors import ClanCmdError, ClanError
|
||||
from clan_cli.inventory import Inventory, load_inventory_json, set_inventory
|
||||
from clan_cli.inventory.classes import Service
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
from . import API
|
||||
from .serde import from_dict
|
||||
|
||||
|
||||
class CategoryInfo(TypedDict):
|
||||
@@ -247,42 +244,3 @@ def get_module_info(
|
||||
features=["inventory"] if has_inventory_feature(module_path) else [],
|
||||
constraints=frontmatter.constraints,
|
||||
)
|
||||
|
||||
|
||||
@API.register
|
||||
def get_inventory(base_path: str) -> Inventory:
|
||||
return load_inventory_json(base_path)
|
||||
|
||||
|
||||
@API.register
|
||||
def set_service_instance(
|
||||
base_path: str, module_name: str, instance_name: str, config: dict[str, Any]
|
||||
) -> None:
|
||||
"""
|
||||
A function that allows to set any service instance in the inventory.
|
||||
Takes any untyped dict. The dict is then checked and converted into the correct type using the type hints of the service.
|
||||
If any conversion error occurs, the function will raise an error.
|
||||
"""
|
||||
service_keys = get_type_hints(Service).keys()
|
||||
|
||||
if module_name not in service_keys:
|
||||
msg = f"{module_name} is not a valid Service attribute. Expected one of {', '.join(service_keys)}."
|
||||
raise ClanError(msg)
|
||||
|
||||
inventory = load_inventory_json(base_path)
|
||||
target_type = get_args(get_type_hints(Service)[module_name])[1]
|
||||
|
||||
module_instance_map: dict[str, Any] = inventory.get("services", {}).get(
|
||||
module_name, {}
|
||||
)
|
||||
|
||||
module_instance_map[instance_name] = from_dict(target_type, config)
|
||||
|
||||
inventory["services"] = inventory.get("services", {})
|
||||
inventory["services"][module_name] = module_instance_map
|
||||
|
||||
set_inventory(
|
||||
inventory, base_path, f"Update {module_name} instance {instance_name}"
|
||||
)
|
||||
|
||||
# TODO: Add a check that rolls back the inventory if the service config is not valid or causes conflicts.
|
||||
|
||||
@@ -104,8 +104,8 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
|
||||
if not field.name.startswith("_")
|
||||
and getattr(obj, field.name) is not None # type: ignore
|
||||
}
|
||||
if isinstance(obj, list | tuple):
|
||||
return [_to_dict(item) for item in obj]
|
||||
if isinstance(obj, list | tuple | set):
|
||||
return sorted([_to_dict(item) for item in obj])
|
||||
if isinstance(obj, dict):
|
||||
return {sanitize_string(k): _to_dict(v) for k, v in obj.items()}
|
||||
if isinstance(obj, Path):
|
||||
|
||||
@@ -15,6 +15,7 @@ Operate on the returned inventory to make changes
|
||||
import contextlib
|
||||
import json
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
@@ -47,16 +48,11 @@ __all__ = [
|
||||
]
|
||||
|
||||
|
||||
def get_inventory_path(flake_dir: str | Path, create: bool = True) -> Path:
|
||||
def get_inventory_path(flake_dir: str | Path) -> Path:
|
||||
"""
|
||||
Get the path to the inventory file in the flake directory
|
||||
"""
|
||||
inventory_file = (Path(flake_dir) / "inventory.json").resolve()
|
||||
|
||||
if not inventory_file.exists() and create:
|
||||
# Copy over the meta from the flake if the inventory is not initialized
|
||||
init_inventory(str(flake_dir))
|
||||
|
||||
return inventory_file
|
||||
|
||||
|
||||
@@ -67,7 +63,7 @@ default_inventory: Inventory = {"meta": {"name": "New Clan"}}
|
||||
@API.register
|
||||
def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
||||
"""
|
||||
Loads the actual inventory.
|
||||
Loads the evaluated inventory.
|
||||
After all merge operations with eventual nix code in buildClan.
|
||||
|
||||
Evaluates clanInternals.inventory with nix. Which is performant.
|
||||
@@ -87,8 +83,8 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
||||
|
||||
try:
|
||||
res = proc.stdout.strip()
|
||||
data = json.loads(res)
|
||||
inventory = from_dict(Inventory, data)
|
||||
data: dict = json.loads(res)
|
||||
inventory = Inventory(data) # type: ignore
|
||||
except json.JSONDecodeError as e:
|
||||
msg = f"Error decoding inventory from flake: {e}"
|
||||
raise ClanError(msg) from e
|
||||
@@ -144,9 +140,59 @@ def find_duplicates(string_list: list[str]) -> list[str]:
|
||||
return duplicates
|
||||
|
||||
|
||||
def find_deleted_paths(
|
||||
persisted: dict[str, Any], update: dict[str, Any], parent_key: str = ""
|
||||
) -> set[str]:
|
||||
"""
|
||||
Recursively find keys (at any nesting level) that exist in persisted but do not
|
||||
exist in update. If a nested dictionary is completely removed, return that dictionary key.
|
||||
|
||||
:param persisted: The original (persisted) nested dictionary.
|
||||
:param update: The updated nested dictionary (some keys might be removed).
|
||||
:param parent_key: The dotted path to the current dictionary's location.
|
||||
:return: A set of dotted paths indicating keys or entire nested paths that were deleted.
|
||||
"""
|
||||
deleted_paths = set()
|
||||
|
||||
# Iterate over keys in persisted
|
||||
for key, p_value in persisted.items():
|
||||
current_path = f"{parent_key}.{key}" if parent_key else key
|
||||
# Check if this key exists in update
|
||||
# breakpoint()
|
||||
if key not in update:
|
||||
# Key doesn't exist at all -> entire branch deleted
|
||||
deleted_paths.add(current_path)
|
||||
else:
|
||||
u_value = update[key]
|
||||
# If persisted value is dict, check the update value
|
||||
if isinstance(p_value, dict):
|
||||
if isinstance(u_value, dict):
|
||||
# If persisted dict is non-empty but updated dict is empty,
|
||||
# that means everything under this branch is removed.
|
||||
if p_value and not u_value:
|
||||
# All children are removed
|
||||
for child_key in p_value:
|
||||
child_path = f"{current_path}.{child_key}"
|
||||
deleted_paths.add(child_path)
|
||||
else:
|
||||
# Both are dicts, recurse deeper
|
||||
deleted_paths |= find_deleted_paths(
|
||||
p_value, u_value, current_path
|
||||
)
|
||||
else:
|
||||
# Persisted was a dict, update is not a dict -> entire branch changed
|
||||
# Consider this as a full deletion of the persisted branch
|
||||
deleted_paths.add(current_path)
|
||||
|
||||
return deleted_paths
|
||||
|
||||
|
||||
def calc_patches(
|
||||
persisted: dict, update: dict, all_values: dict, writeables: dict
|
||||
) -> dict[str, Any]:
|
||||
persisted: dict[str, Any],
|
||||
update: dict[str, Any],
|
||||
all_values: dict[str, Any],
|
||||
writeables: dict[str, set[str]],
|
||||
) -> tuple[dict[str, Any], set[str]]:
|
||||
"""
|
||||
Calculate the patches to apply to the inventory.
|
||||
|
||||
@@ -159,21 +205,42 @@ def calc_patches(
|
||||
: param writeable: The writeable keys. Use 'determine_writeability'.
|
||||
Example: {'writeable': {'foo', 'foo.bar'}, 'non_writeable': {'foo.nix'}}
|
||||
: param all_values: All values in the inventory retrieved from the flake evaluation.
|
||||
|
||||
Returns a tuple with the SET and DELETE patches.
|
||||
"""
|
||||
persisted_flat = flatten_data(persisted)
|
||||
update_flat = flatten_data(update)
|
||||
all_values_flat = flatten_data(all_values)
|
||||
|
||||
def is_writeable_key(key: str) -> bool:
|
||||
"""
|
||||
Recursively check if a key is writeable.
|
||||
key "machines.machine1.deploy.targetHost" is specified but writeability is only defined for "machines"
|
||||
We pop the last key and check if the parent key is writeable/non-writeable.
|
||||
"""
|
||||
remaining = key.split(".")
|
||||
while remaining:
|
||||
if ".".join(remaining) in writeables["writeable"]:
|
||||
return True
|
||||
if ".".join(remaining) in writeables["non_writeable"]:
|
||||
return False
|
||||
|
||||
remaining.pop()
|
||||
|
||||
msg = f"Cannot determine writeability for key '{key}'"
|
||||
raise ClanError(msg, description="F001")
|
||||
|
||||
patchset = {}
|
||||
for update_key, update_data in update_flat.items():
|
||||
if update_key in writeables["non_writeable"]:
|
||||
if not is_writeable_key(update_key):
|
||||
if update_data != all_values_flat.get(update_key):
|
||||
msg = f"Key '{update_key}' is not writeable."
|
||||
raise ClanError(msg)
|
||||
continue
|
||||
|
||||
if update_key in writeables["writeable"]:
|
||||
if type(update_data) is not type(all_values_flat.get(update_key)):
|
||||
if is_writeable_key(update_key):
|
||||
prev_value = all_values_flat.get(update_key)
|
||||
if prev_value and type(update_data) is not type(prev_value):
|
||||
msg = f"Type mismatch for key '{update_key}'. Cannot update {type(all_values_flat.get(update_key))} with {type(update_data)}"
|
||||
raise ClanError(msg)
|
||||
|
||||
@@ -196,25 +263,28 @@ def calc_patches(
|
||||
|
||||
continue
|
||||
|
||||
if update_key not in all_values_flat:
|
||||
msg = f"Key '{update_key}' cannot be set. It does not exist."
|
||||
raise ClanError(msg)
|
||||
|
||||
msg = f"Cannot determine writeability for key '{update_key}'"
|
||||
raise ClanError(msg)
|
||||
|
||||
return patchset
|
||||
delete_set = find_deleted_paths(persisted, update)
|
||||
|
||||
for delete_key in delete_set:
|
||||
if not is_writeable_key(delete_key):
|
||||
msg = f"Cannot delete: Key '{delete_key}' is not writeable."
|
||||
raise ClanError(msg)
|
||||
|
||||
return patchset, delete_set
|
||||
|
||||
|
||||
def determine_writeability(
|
||||
priorities: dict,
|
||||
defaults: dict,
|
||||
persisted: dict,
|
||||
priorities: dict[str, Any],
|
||||
defaults: dict[str, Any],
|
||||
persisted: dict[str, Any],
|
||||
parent_key: str = "",
|
||||
parent_prio: int | None = None,
|
||||
results: dict | None = None,
|
||||
non_writeable: bool = False,
|
||||
) -> dict:
|
||||
) -> dict[str, set[str]]:
|
||||
if results is None:
|
||||
results = {"writeable": set({}), "non_writeable": set({})}
|
||||
|
||||
@@ -323,21 +393,24 @@ def get_inventory_current_priority(flake_dir: str | Path) -> dict:
|
||||
|
||||
|
||||
@API.register
|
||||
def load_inventory_json(
|
||||
flake_dir: str | Path, default: Inventory = default_inventory
|
||||
) -> Inventory:
|
||||
def load_inventory_json(flake_dir: str | Path) -> Inventory:
|
||||
"""
|
||||
Load the inventory file from the flake directory
|
||||
If no file is found, returns the default inventory
|
||||
Load the inventory FILE from the flake directory
|
||||
If no file is found, returns an empty dictionary
|
||||
|
||||
DO NOT USE THIS FUNCTION TO READ THE INVENTORY
|
||||
|
||||
Use load_inventory_eval instead
|
||||
"""
|
||||
inventory = default
|
||||
|
||||
inventory_file = get_inventory_path(flake_dir)
|
||||
|
||||
if not inventory_file.exists():
|
||||
return {}
|
||||
with inventory_file.open() as f:
|
||||
try:
|
||||
res = json.load(f)
|
||||
inventory = from_dict(Inventory, res)
|
||||
res: dict = json.load(f)
|
||||
inventory = Inventory(res) # type: ignore
|
||||
except json.JSONDecodeError as e:
|
||||
# Error decoding the inventory file
|
||||
msg = f"Error decoding inventory file: {e}"
|
||||
@@ -346,6 +419,42 @@ def load_inventory_json(
|
||||
return inventory
|
||||
|
||||
|
||||
def delete(d: dict[str, Any], path: str) -> Any:
|
||||
"""
|
||||
Deletes the nested entry specified by a dot-separated path from the dictionary using pop().
|
||||
|
||||
:param data: The dictionary to modify.
|
||||
:param path: A dot-separated string indicating the nested key to delete.
|
||||
e.g., "foo.bar.baz" will attempt to delete data["foo"]["bar"]["baz"].
|
||||
|
||||
:raises KeyError: If any intermediate key is missing or not a dictionary,
|
||||
or if the final key to delete is not found.
|
||||
"""
|
||||
if not path:
|
||||
msg = "Cannot delete. Path is empty."
|
||||
raise ClanError(msg)
|
||||
|
||||
keys = path.split(".")
|
||||
current = d
|
||||
|
||||
# Navigate to the parent dictionary of the final key
|
||||
for key in keys[:-1]:
|
||||
if key not in current or not isinstance(current[key], dict):
|
||||
msg = f"Cannot delete. Key '{path}' not found or not a dictionary '{d}'."
|
||||
raise ClanError(msg)
|
||||
current = current[key]
|
||||
|
||||
# Attempt to pop the final key
|
||||
last_key = keys[-1]
|
||||
try:
|
||||
value = current.pop(last_key)
|
||||
except KeyError:
|
||||
msg = f"Cannot delete. Path '{path}' not found in data '{d}'. "
|
||||
raise ClanError(msg) from KeyError
|
||||
else:
|
||||
return {last_key: value}
|
||||
|
||||
|
||||
def patch(d: dict[str, Any], path: str, content: Any) -> None:
|
||||
"""
|
||||
Update the value at a specific dot-separated path in a nested dictionary.
|
||||
@@ -378,25 +487,64 @@ def patch_inventory_with(base_dir: Path, section: str, content: dict[str, Any])
|
||||
commit_file(inventory_file, base_dir, commit_message=f"inventory.{section}: Update")
|
||||
|
||||
|
||||
@dataclass
|
||||
class WriteInfo:
|
||||
writeables: dict[str, set[str]]
|
||||
data_eval: Inventory
|
||||
data_disk: Inventory
|
||||
|
||||
|
||||
@API.register
|
||||
def get_inventory_with_writeable_keys(
|
||||
flake_dir: str | Path,
|
||||
) -> WriteInfo:
|
||||
"""
|
||||
Load the inventory and determine the writeable keys
|
||||
Performs 2 nix evaluations to get the current priority and the inventory
|
||||
"""
|
||||
current_priority = get_inventory_current_priority(flake_dir)
|
||||
|
||||
data_eval: Inventory = load_inventory_eval(flake_dir)
|
||||
data_disk: Inventory = load_inventory_json(flake_dir)
|
||||
|
||||
writeables = determine_writeability(
|
||||
current_priority, dict(data_eval), dict(data_disk)
|
||||
)
|
||||
|
||||
return WriteInfo(writeables, data_eval, data_disk)
|
||||
|
||||
|
||||
@API.register
|
||||
def set_inventory(inventory: Inventory, flake_dir: str | Path, message: str) -> None:
|
||||
"""
|
||||
Write the inventory to the flake directory
|
||||
and commit it to git with the given message
|
||||
"""
|
||||
inventory_file = get_inventory_path(flake_dir, create=False)
|
||||
|
||||
# Filter out modules not set via UI.
|
||||
# It is not possible to set modules from "/nix/store" via the UI
|
||||
modules = {}
|
||||
filtered_modules = lambda m: {
|
||||
key: value for key, value in m.items() if "/nix/store" not in value
|
||||
}
|
||||
modules = filtered_modules(inventory.get("modules", {})) # type: ignore
|
||||
inventory["modules"] = modules
|
||||
write_info = get_inventory_with_writeable_keys(flake_dir)
|
||||
|
||||
# Remove internals from the inventory
|
||||
inventory.pop("tags", None) # type: ignore
|
||||
inventory.pop("options", None) # type: ignore
|
||||
inventory.pop("assertions", None) # type: ignore
|
||||
|
||||
patchset, delete_set = calc_patches(
|
||||
dict(write_info.data_disk),
|
||||
dict(inventory),
|
||||
dict(write_info.data_eval),
|
||||
write_info.writeables,
|
||||
)
|
||||
persisted = dict(write_info.data_disk)
|
||||
|
||||
for patch_path, data in patchset.items():
|
||||
patch(persisted, patch_path, data)
|
||||
|
||||
for delete_path in delete_set:
|
||||
delete(persisted, delete_path)
|
||||
|
||||
inventory_file = get_inventory_path(flake_dir)
|
||||
with inventory_file.open("w") as f:
|
||||
json.dump(inventory, f, indent=2)
|
||||
json.dump(persisted, f, indent=2)
|
||||
|
||||
commit_file(inventory_file, Path(flake_dir), commit_message=message)
|
||||
|
||||
@@ -418,40 +566,5 @@ def init_inventory(directory: str, init: Inventory | None = None) -> None:
|
||||
|
||||
|
||||
@API.register
|
||||
def merge_template_inventory(
|
||||
inventory: Inventory, template_inventory: Inventory, machine_name: str
|
||||
) -> None:
|
||||
"""
|
||||
Merge the template inventory into the current inventory
|
||||
The template inventory is expected to be a subset of the current inventory
|
||||
"""
|
||||
for service_name, instance in template_inventory.get("services", {}).items():
|
||||
if len(instance.keys()) > 0:
|
||||
msg = f"Service {service_name} in template inventory has multiple instances"
|
||||
description = (
|
||||
"Only one instance per service is allowed in a template inventory"
|
||||
)
|
||||
raise ClanError(msg, description=description)
|
||||
|
||||
# services.<service_name>.<instance_name>.config
|
||||
config = next((v for v in instance.values() if "config" in v), None)
|
||||
if not config:
|
||||
msg = f"Service {service_name} in template inventory has no config"
|
||||
description = "Invalid inventory configuration"
|
||||
raise ClanError(msg, description=description)
|
||||
|
||||
# Disallow "config.machines" key
|
||||
if "machines" in config:
|
||||
msg = f"Service {service_name} in template inventory has machines"
|
||||
description = "The 'machines' key is not allowed in template inventory"
|
||||
raise ClanError(msg, description=description)
|
||||
|
||||
# Require "config.roles" key
|
||||
if "roles" not in config:
|
||||
msg = f"Service {service_name} in template inventory has no roles"
|
||||
description = "roles key is required in template inventory"
|
||||
raise ClanError(msg, description=description)
|
||||
|
||||
# TODO: Implement merging of template inventory
|
||||
msg = "Merge template inventory is not implemented yet"
|
||||
raise NotImplementedError(msg)
|
||||
def get_inventory(base_path: str | Path) -> Inventory:
|
||||
return load_inventory_eval(base_path)
|
||||
|
||||
@@ -16,8 +16,7 @@ from clan_cli.git import commit_file
|
||||
from clan_cli.inventory import Machine as InventoryMachine
|
||||
from clan_cli.inventory import (
|
||||
MachineDeploy,
|
||||
load_inventory_json,
|
||||
merge_template_inventory,
|
||||
get_inventory,
|
||||
set_inventory,
|
||||
)
|
||||
from clan_cli.machines.list import list_nixos_machines
|
||||
@@ -110,12 +109,9 @@ def create_machine(opts: CreateOptions) -> None:
|
||||
|
||||
src = tmpdirp / "machines" / opts.template_name
|
||||
|
||||
has_inventory = (dst / "inventory.json").exists()
|
||||
if not (src / "configuration.nix").exists() and not has_inventory:
|
||||
msg = f"Template machine '{opts.template_name}' does not contain a configuration.nix or inventory.json"
|
||||
description = (
|
||||
"Template machine must contain a configuration.nix or inventory.json"
|
||||
)
|
||||
if not (src / "configuration.nix").exists():
|
||||
msg = f"Template machine '{opts.template_name}' does not contain a configuration.nix"
|
||||
description = "Template machine must contain a configuration.nix"
|
||||
raise ClanError(msg, description=description)
|
||||
|
||||
def log_copy(src: str, dst: str) -> None:
|
||||
@@ -125,40 +121,27 @@ def create_machine(opts: CreateOptions) -> None:
|
||||
|
||||
shutil.copytree(src, dst, ignore_dangling_symlinks=True, copy_function=log_copy)
|
||||
|
||||
inventory = get_inventory(clan_dir)
|
||||
|
||||
target_host = opts.target_host
|
||||
# TODO: We should allow the template to specify machine metadata if not defined by user
|
||||
new_machine = opts.machine
|
||||
if target_host:
|
||||
new_machine["deploy"] = {"targetHost": target_host}
|
||||
|
||||
inventory["machines"] = inventory.get("machines", {})
|
||||
inventory["machines"][machine_name] = new_machine
|
||||
|
||||
# Commit at the end in that order to avoid commiting halve-baked machines
|
||||
# TODO: automatic rollbacks if something goes wrong
|
||||
set_inventory(inventory, clan_dir, "Imported machine from template")
|
||||
|
||||
commit_file(
|
||||
clan_dir / "machines" / machine_name,
|
||||
repo_dir=clan_dir,
|
||||
commit_message=f"Add machine {machine_name}",
|
||||
)
|
||||
|
||||
inventory = load_inventory_json(clan_dir)
|
||||
|
||||
# Merge the inventory from the template
|
||||
if has_inventory:
|
||||
template_inventory = load_inventory_json(dst)
|
||||
merge_template_inventory(inventory, template_inventory, machine_name)
|
||||
|
||||
deploy = MachineDeploy()
|
||||
target_host = opts.target_host
|
||||
if target_host:
|
||||
deploy["targetHost"] = target_host
|
||||
# TODO: We should allow the template to specify machine metadata if not defined by user
|
||||
new_machine = InventoryMachine(
|
||||
name=machine_name, deploy=deploy, tags=opts.machine.get("tags", [])
|
||||
)
|
||||
if (
|
||||
not has_inventory
|
||||
and len(opts.machine.get("tags", [])) == 0
|
||||
and new_machine.get("deploy", {}).get("targetHost") is None
|
||||
):
|
||||
# no need to update inventory if there are no tags or target host
|
||||
return
|
||||
|
||||
inventory["machines"] = inventory.get("machines", {})
|
||||
inventory["machines"][machine_name] = new_machine
|
||||
|
||||
set_inventory(inventory, clan_dir, "Imported machine from template")
|
||||
|
||||
|
||||
def create_command(args: argparse.Namespace) -> None:
|
||||
if args.flake:
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import argparse
|
||||
import logging
|
||||
import shutil
|
||||
|
||||
from clan_cli.api import API
|
||||
@@ -6,14 +7,20 @@ from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.completions import add_dynamic_completer, complete_machines
|
||||
from clan_cli.dirs import specific_machine_dir
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.inventory import load_inventory_json, set_inventory
|
||||
from clan_cli.inventory import get_inventory, set_inventory
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@API.register
|
||||
def delete_machine(flake: FlakeId, name: str) -> None:
|
||||
inventory = load_inventory_json(flake.path)
|
||||
inventory = get_inventory(flake.path)
|
||||
|
||||
machine = inventory.get("machines", {}).pop(name, None)
|
||||
if "machines" not in inventory:
|
||||
msg = "No machines in inventory"
|
||||
raise ClanError(msg)
|
||||
|
||||
machine = inventory["machines"].pop(name, None)
|
||||
if machine is None:
|
||||
msg = f"Machine {name} does not exist"
|
||||
raise ClanError(msg)
|
||||
|
||||
@@ -6,9 +6,9 @@ import pytest
|
||||
from clan_cli.api.modules import list_modules
|
||||
from clan_cli.clan_uri import FlakeId
|
||||
from clan_cli.inventory import (
|
||||
Inventory,
|
||||
Machine,
|
||||
MachineDeploy,
|
||||
load_inventory_json,
|
||||
set_inventory,
|
||||
)
|
||||
from clan_cli.machines.create import CreateOptions, create_machine
|
||||
@@ -70,7 +70,7 @@ def test_add_module_to_inventory(
|
||||
)
|
||||
subprocess.run(["git", "add", "."], cwd=test_flake_with_core.path, check=True)
|
||||
|
||||
inventory = load_inventory_json(base_path)
|
||||
inventory: Inventory = {}
|
||||
|
||||
inventory["services"] = {
|
||||
"borgbackup": {
|
||||
|
||||
@@ -3,6 +3,7 @@ import pytest
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.inventory import (
|
||||
calc_patches,
|
||||
delete,
|
||||
determine_writeability,
|
||||
patch,
|
||||
unmerge_lists,
|
||||
@@ -194,7 +195,7 @@ def test_update_simple() -> None:
|
||||
# If the user would have set this value, it would trigger an error
|
||||
}
|
||||
}
|
||||
patchset = calc_patches(
|
||||
patchset, _ = calc_patches(
|
||||
data_disk, update, all_values=data_eval, writeables=writeables
|
||||
)
|
||||
|
||||
@@ -242,7 +243,7 @@ def test_update_many() -> None:
|
||||
},
|
||||
}
|
||||
}
|
||||
patchset = calc_patches(
|
||||
patchset, _ = calc_patches(
|
||||
data_disk, update, all_values=data_eval, writeables=writeables
|
||||
)
|
||||
|
||||
@@ -310,18 +311,20 @@ def test_update_list() -> None:
|
||||
"foo": ["A", "B", "C"] # User wants to add "C"
|
||||
}
|
||||
|
||||
patchset = calc_patches(
|
||||
patchset, _ = calc_patches(
|
||||
data_disk, update, all_values=data_eval, writeables=writeables
|
||||
)
|
||||
|
||||
assert patchset == {"foo": ["B", "C"]}
|
||||
|
||||
# "foo": ["A", "B"]
|
||||
# Remove "B" from the list
|
||||
# Expected is [ ] because ["A"] is defined in nix
|
||||
update = {
|
||||
"foo": ["A"] # User wants to remove "B"
|
||||
}
|
||||
|
||||
patchset = calc_patches(
|
||||
patchset, _ = calc_patches(
|
||||
data_disk, update, all_values=data_eval, writeables=writeables
|
||||
)
|
||||
|
||||
@@ -375,21 +378,171 @@ def test_update_mismatching_update_type() -> None:
|
||||
|
||||
assert writeables == {"writeable": {"foo"}, "non_writeable": set()}
|
||||
|
||||
# set foo.A which doesnt exist
|
||||
update_1 = {"foo": {"A": "B"}}
|
||||
|
||||
with pytest.raises(ClanError) as error:
|
||||
calc_patches(data_disk, update_1, all_values=data_eval, writeables=writeables)
|
||||
|
||||
assert str(error.value) == "Key 'foo.A' cannot be set. It does not exist."
|
||||
|
||||
# set foo to an int but it is a list
|
||||
update_2: dict = {"foo": 1}
|
||||
update: dict = {"foo": 1}
|
||||
|
||||
with pytest.raises(ClanError) as error:
|
||||
calc_patches(data_disk, update_2, all_values=data_eval, writeables=writeables)
|
||||
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
|
||||
|
||||
assert (
|
||||
str(error.value)
|
||||
== "Type mismatch for key 'foo'. Cannot update <class 'list'> with <class 'int'>"
|
||||
)
|
||||
|
||||
|
||||
def test_delete_key() -> None:
|
||||
prios = {
|
||||
"foo": {
|
||||
"__prio": 100, # <- writeable: "foo"
|
||||
},
|
||||
}
|
||||
|
||||
data_eval = {"foo": {"bar": "baz"}}
|
||||
|
||||
data_disk = data_eval
|
||||
|
||||
writeables = determine_writeability(prios, data_eval, data_disk)
|
||||
|
||||
assert writeables == {"writeable": {"foo"}, "non_writeable": set()}
|
||||
|
||||
# remove all keys from foo
|
||||
update: dict = {"foo": {}}
|
||||
|
||||
patchset, delete_set = calc_patches(
|
||||
data_disk, update, all_values=data_eval, writeables=writeables
|
||||
)
|
||||
|
||||
assert patchset == {}
|
||||
assert delete_set == {"foo.bar"}
|
||||
|
||||
|
||||
def test_delete_key_intermediate() -> None:
|
||||
prios = {
|
||||
"foo": {
|
||||
"__prio": 100,
|
||||
},
|
||||
}
|
||||
|
||||
data_eval = {
|
||||
"foo": {
|
||||
# Remove the key "bar"
|
||||
"bar": {"name": "bar", "info": "info", "other": ["a", "b"]},
|
||||
# Leave the key "other"
|
||||
"other": {"name": "other", "info": "info", "other": ["a", "b"]},
|
||||
}
|
||||
}
|
||||
update: dict = {
|
||||
"foo": {"other": {"name": "other", "info": "info", "other": ["a", "b"]}}
|
||||
}
|
||||
|
||||
data_disk = data_eval
|
||||
|
||||
writeables = determine_writeability(prios, data_eval, data_disk)
|
||||
|
||||
assert writeables == {"writeable": {"foo"}, "non_writeable": set()}
|
||||
|
||||
# remove all keys from foo
|
||||
|
||||
patchset, delete_set = calc_patches(
|
||||
data_disk, update, all_values=data_eval, writeables=writeables
|
||||
)
|
||||
|
||||
assert patchset == {}
|
||||
assert delete_set == {"foo.bar"}
|
||||
|
||||
|
||||
def test_delete_key_non_writeable() -> None:
|
||||
prios = {
|
||||
"foo": {
|
||||
"__prio": 50,
|
||||
},
|
||||
}
|
||||
|
||||
data_eval = {
|
||||
"foo": {
|
||||
# Remove the key "bar"
|
||||
"bar": {"name": "bar", "info": "info", "other": ["a", "b"]},
|
||||
}
|
||||
}
|
||||
update: dict = {"foo": {}}
|
||||
|
||||
data_disk = data_eval
|
||||
|
||||
writeables = determine_writeability(prios, data_eval, data_disk)
|
||||
|
||||
assert writeables == {"writeable": set(), "non_writeable": {"foo"}}
|
||||
|
||||
# remove all keys from foo
|
||||
with pytest.raises(ClanError) as error:
|
||||
calc_patches(data_disk, update, all_values=data_eval, writeables=writeables)
|
||||
|
||||
assert "Cannot delete" in str(error.value)
|
||||
|
||||
|
||||
def test_delete_atom() -> None:
|
||||
data = {"foo": {"bar": 1}}
|
||||
# Removes the key "foo.bar"
|
||||
# Returns the deleted key-value pair { "bar": 1 }
|
||||
entry = delete(data, "foo.bar")
|
||||
|
||||
assert entry == {"bar": 1}
|
||||
assert data == {"foo": {}}
|
||||
|
||||
|
||||
def test_delete_intermediate() -> None:
|
||||
data = {"a": {"b": {"c": {"d": 42}}}}
|
||||
# Removes "a.b.c.d"
|
||||
entry = delete(data, "a.b.c")
|
||||
|
||||
assert entry == {"c": {"d": 42}}
|
||||
# Check all intermediate dictionaries remain intact
|
||||
assert data == {"a": {"b": {}}}
|
||||
|
||||
|
||||
def test_delete_top_level() -> None:
|
||||
data = {"x": 100, "y": 200}
|
||||
# Deletes top-level key
|
||||
entry = delete(data, "x")
|
||||
assert entry == {"x": 100}
|
||||
assert data == {"y": 200}
|
||||
|
||||
|
||||
def test_delete_key_not_found() -> None:
|
||||
data = {"foo": {"bar": 1}}
|
||||
# Trying to delete a non-existing key "foo.baz"
|
||||
with pytest.raises(ClanError) as excinfo:
|
||||
delete(data, "foo.baz")
|
||||
assert "Cannot delete. Path 'foo.baz'" in str(excinfo.value)
|
||||
# Data should remain unchanged
|
||||
assert data == {"foo": {"bar": 1}}
|
||||
|
||||
|
||||
def test_delete_intermediate_not_dict() -> None:
|
||||
data = {"foo": "not a dict"}
|
||||
# Trying to go deeper into a non-dict value
|
||||
with pytest.raises(ClanError) as excinfo:
|
||||
delete(data, "foo.bar")
|
||||
assert "not found or not a dictionary" in str(excinfo.value)
|
||||
# Data should remain unchanged
|
||||
assert data == {"foo": "not a dict"}
|
||||
|
||||
|
||||
def test_delete_empty_path() -> None:
|
||||
data = {"foo": {"bar": 1}}
|
||||
# Attempting to delete with an empty path
|
||||
with pytest.raises(ClanError) as excinfo:
|
||||
delete(data, "")
|
||||
# Depending on how you handle empty paths, you might raise an error or handle it differently.
|
||||
# If you do raise an error, check the message.
|
||||
assert "Cannot delete. Path is empty" in str(excinfo.value)
|
||||
assert data == {"foo": {"bar": 1}}
|
||||
|
||||
|
||||
def test_delete_non_existent_path_deep() -> None:
|
||||
data = {"foo": {"bar": {"baz": 123}}}
|
||||
# non-existent deep path
|
||||
with pytest.raises(ClanError) as excinfo:
|
||||
delete(data, "foo.bar.qux")
|
||||
assert "not found" in str(excinfo.value)
|
||||
# Data remains unchanged
|
||||
assert data == {"foo": {"bar": {"baz": 123}}}
|
||||
|
||||
Reference in New Issue
Block a user