Merge pull request 'api: init get_machine_writeability' (#4504) from cli-fixes into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4504
This commit is contained in:
hsjobeki
2025-07-28 08:47:43 +00:00
5 changed files with 302 additions and 33 deletions

View File

@@ -95,7 +95,7 @@ def test_add_module_to_inventory(
inventory_store.write(
inventory,
message="Add borgbackup service",
commit=False,
commit=True,
)
# cmd = ["facts", "generate", "--flake", str(test_flake_with_core.path), "machine1"]
@@ -123,7 +123,7 @@ def test_add_module_to_inventory(
generator = gen
break
assert generator
assert generator, "Borgbackup generator not found"
ssh_key = machine.public_vars_store.get(generator, "borgbackup.ssh.pub")

View File

@@ -8,7 +8,11 @@ from clan_lib.nix_models.clan import (
InventoryMachine,
)
from clan_lib.persist.inventory_store import InventoryStore
from clan_lib.persist.util import set_value_by_path
from clan_lib.persist.util import (
is_writeable_key,
retrieve_typed_field_names,
set_value_by_path,
)
class MachineFilter(TypedDict):
@@ -92,3 +96,43 @@ def set_machine(machine: Machine, update: InventoryMachine) -> None:
inventory_store.write(
inventory, message=f"Update information about machine {machine.name}"
)
class Writeability(TypedDict):
writable: bool
reason: str | None
@API.register
def get_machine_writeability(machine: Machine) -> dict[str, Writeability]:
"""
Get writeability information for the fields of a machine.
This function checks which fields of the 'machine' resource are writable and provides a reason for each field's writability.
Args:
machine (Machine): The machine object for which to retrieve writeability.
Returns:
dict[str, Writeability]: A map from field-names to { 'writable' (bool) and 'reason' (str or None ) }
"""
inventory_store = InventoryStore(machine.flake)
write_info = inventory_store.get_writeability_of(f"machines.{machine.name}")
field_names = retrieve_typed_field_names(InventoryMachine)
# TODO: handle this more generically
# persisted_data = inventory_store._get_persisted() #
# unmerge_lists(all_list, persisted_data)
return {
field: {
"writable": False
if field == "name"
else is_writeable_key(f"machines.{machine.name}.{field}", write_info),
# TODO: Provide a meaningful reason
"reason": None,
}
for field in field_names
}

View File

@@ -1,12 +1,16 @@
from collections.abc import Callable
from typing import cast
from unittest.mock import ANY, patch
import pytest
from clan_lib.errors import ClanError
from clan_lib.flake import Flake
from clan_lib.nix_models.clan import Clan, Unknown
from clan_lib.machines import actions as actions_module
from clan_lib.machines.machines import Machine
from clan_lib.nix_models.clan import Clan, InventoryMachine, Unknown
from .actions import list_machines
from .actions import get_machine, get_machine_writeability, list_machines, set_machine
@pytest.mark.with_core
@@ -49,3 +53,157 @@ def test_list_inventory_machines(clan_flake: Callable[..., Flake]) -> None:
machines = list_machines(flake)
assert list(machines.keys()) == ["jon", "sara", "vanessa"]
@pytest.mark.with_core
def test_set_machine_no_op(clan_flake: Callable[..., Flake]) -> None:
flake = clan_flake(
# clan.nix, cannot be changed
clan={
"inventory": {
"machines": {
"jon": {},
"sara": {},
},
}
},
)
# No-op roundtrip should not change anything in the inventory
machine_jon = get_machine(flake, "jon")
with patch(f"{actions_module.__name__}.InventoryStore._write") as mock_write:
set_machine(Machine("jon", flake), machine_jon)
# Assert _write was never called
mock_write.assert_not_called()
# Change something to make sure the mock_write is actually called
machine_jon["machineClass"] = "darwin"
set_machine(Machine("jon", flake), machine_jon)
# This is a bit internal - we want to make sure the write is called
# with only the changed value, so we don't persist the whole machine
mock_write.assert_called_once_with(
{"machines": {"jon": {"machineClass": "darwin"}}}, post_write=ANY
)
@pytest.mark.with_core
def test_set_machine_fully_defined_in_nix(clan_flake: Callable[..., Flake]) -> None:
flake = clan_flake(
# clan.nix, cannot be changed
clan={
"inventory": {
"machines": {
"jon": {
"machineClass": "nixos",
"description": "A NixOS machine",
"icon": "nixos",
"deploy": {
"targetHost": "jon.example.com",
"buildHost": "jon.example.com",
},
"tags": ["server", "backup"],
},
},
}
},
)
# No-op roundtrip should not change anything in the inventory
machine_jon = get_machine(flake, "jon")
machine_jon["description"] = "description updated"
with patch(f"{actions_module.__name__}.InventoryStore._write") as mock_write:
with pytest.raises(ClanError) as exc_info:
set_machine(Machine("jon", flake), machine_jon)
assert (
"Key 'machines.jon.description' is not writeable. It seems its value is statically defined in nix."
in str(exc_info.value)
)
# Assert _write should not be called
mock_write.assert_not_called()
@pytest.mark.with_core
def test_set_machine_manage_tags(clan_flake: Callable[..., Flake]) -> None:
"""Test adding/removing tags on a machine with validation of immutable base tags."""
flake = clan_flake(
clan={
"inventory": {
"machines": {
"jon": {"tags": ["nix1", "nix2"]},
},
}
},
)
def get_jon() -> InventoryMachine:
return get_machine(flake, "jon")
def set_jon(tags: list[str]) -> None:
machine = get_jon()
machine["tags"] = tags
set_machine(Machine("jon", flake), machine)
# --- Add UI tags ---
initial_tags = get_jon().get("tags", [])
new_tags = [*initial_tags, "ui1", "ui2"]
set_jon(new_tags)
updated_tags = get_jon().get("tags", [])
expected_tags = ["nix1", "nix2", "ui1", "ui2", "all", "nixos"]
assert updated_tags == expected_tags
# --- Remove UI tags (allowed) ---
allowed_removal_tags = ["nix1", "nix2", "all", "nixos"]
set_jon(allowed_removal_tags)
assert get_jon().get("tags", []) == allowed_removal_tags
# --- Attempt to remove mandatory tags (should raise) ---
invalid_tags = ["all", "nixos"] # Removing 'nix1', 'nix2' is disallowed
with pytest.raises(ClanError) as exc_info:
set_jon(invalid_tags)
assert "Key 'machines.jon.tags' doesn't contain items ['nix1', 'nix2']" in str(
exc_info.value
)
@pytest.mark.with_core
def test_get_machine_writeability(clan_flake: Callable[..., Flake]) -> None:
flake = clan_flake(
clan={
"inventory": {
"machines": {
"jon": {
"machineClass": "nixos", # Static string is not writeable
"deploy": {}, # Empty dict is writeable
# TOOD: Return writability for existing items
"tags": ["nix1"], # Static list is not partially writeable
},
},
}
},
)
write_info = get_machine_writeability(Machine("jon", flake))
# {'tags': {'writable': True, 'reason': None}, 'machineClass': {'writable': False, 'reason': None}, 'name': {'writable': False, 'reason': None}, 'description': {'writable': True, 'reason': None}, 'deploy.buildHost': {'writable': True, 'reason': None}, 'icon': {'writable': True, 'reason': None}, 'deploy.targetHost': {'writable': True, 'reason': None}}
writeable_fields = {field for field, info in write_info.items() if info["writable"]}
read_only_fields = {
field for field, info in write_info.items() if not info["writable"]
}
assert writeable_fields == {
"tags",
"deploy.targetHost",
"deploy.buildHost",
"description",
"icon",
}
assert read_only_fields == {"machineClass", "name"}

View File

@@ -1,4 +1,5 @@
import json
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Any, NotRequired, Protocol, TypedDict, cast
@@ -220,6 +221,16 @@ class InventoryStore:
return WriteInfo(writeables, data_eval, data_disk)
def get_writeability_of(self, path: str) -> Any:
"""
Get the writeability of a path in the inventory
:param path: The path to check
:return: A dictionary with the writeability of the path
"""
write_info = self._write_info()
return write_info.writeables
def read(self) -> InventorySnapshot:
"""
Accessor to the merged inventory
@@ -271,14 +282,30 @@ class InventoryStore:
for delete_path in delete_set:
delete_by_path(persisted, delete_path)
def post_write() -> None:
if commit:
commit_file(
self.inventory_file,
self._flake.path,
commit_message=f"update({self.inventory_file.name}): {message}",
)
self._flake.invalidate_cache()
if not patchset and not delete_set:
# No changes, no need to write
return
self._write(persisted, post_write=post_write)
def _write(
self, content: Any, post_write: Callable[[], None] | None = None
) -> None:
"""
Write the content to the inventory file and run post_write callback
"""
with self.inventory_file.open("w") as f:
json.dump(persisted, f, indent=2)
json.dump(content, f, indent=2)
if commit:
commit_file(
self.inventory_file,
self._flake.path,
commit_message=f"update({self.inventory_file.name}): {message}",
)
self._flake.invalidate_cache()
if post_write:
post_write()

View File

@@ -200,6 +200,28 @@ def parent_is_dict(key: str, data: dict[str, Any]) -> bool:
return False
def is_writeable_key(
key: str,
writeables: dict[str, set[str]],
) -> bool:
"""
Recursively check if a key is writeable.
key "machines.machine1.deploy.targetHost" is specified but writeability is only defined for "machines"
We pop the last key and check if the parent key is writeable/non-writeable.
"""
remaining = key.split(".")
while remaining:
if ".".join(remaining) in writeables["writeable"]:
return True
if ".".join(remaining) in writeables["non_writeable"]:
return False
remaining.pop()
msg = f"Cannot determine writeability for key '{key}'"
raise ClanError(msg, description="F001")
def calc_patches(
persisted: dict[str, Any],
update: dict[str, Any],
@@ -225,24 +247,6 @@ def calc_patches(
data_all_updated = flatten_data(update)
data_dyn = flatten_data(persisted)
def is_writeable_key(key: str) -> bool:
"""
Recursively check if a key is writeable.
key "machines.machine1.deploy.targetHost" is specified but writeability is only defined for "machines"
We pop the last key and check if the parent key is writeable/non-writeable.
"""
remaining = key.split(".")
while remaining:
if ".".join(remaining) in writeables["writeable"]:
return True
if ".".join(remaining) in writeables["non_writeable"]:
return False
remaining.pop()
msg = f"Cannot determine writeability for key '{key}'"
raise ClanError(msg, description="F001")
all_keys = set(data_all) | set(data_all_updated)
patchset = {}
@@ -256,7 +260,7 @@ def calc_patches(
# Some kind of change
if old != new:
# If there is a change, check if the key is writeable
if not is_writeable_key(key):
if not is_writeable_key(key, writeables):
msg = f"Key '{key}' is not writeable. It seems its value is statically defined in nix."
raise ClanError(msg)
@@ -452,3 +456,39 @@ def set_value_by_path(d: DictLike, path: str, content: Any) -> None:
for key in keys[:-1]:
current = current.setdefault(key, {})
current[keys[-1]] = content
from typing import NotRequired, Required, get_args, get_origin, get_type_hints
def is_typeddict_class(obj: type) -> bool:
"""Safely checks if a class is a TypedDict."""
return (
isinstance(obj, type)
and hasattr(obj, "__annotations__")
and obj.__class__.__name__ == "_TypedDictMeta"
)
def retrieve_typed_field_names(obj: type, prefix: str = "") -> set[str]:
fields = set()
hints = get_type_hints(obj, include_extras=True)
for field, field_type in hints.items():
full_key = f"{prefix}.{field}" if prefix else field
origin = get_origin(field_type)
args = get_args(field_type)
# Unwrap Required/NotRequired
if origin in {NotRequired, Required}:
field_type = args[0]
origin = get_origin(field_type)
args = get_args(field_type)
if is_typeddict_class(field_type):
fields |= retrieve_typed_field_names(field_type, prefix=full_key)
else:
fields.add(full_key)
return fields