Feat(inventory): remove legacy action functions

Inventory should only and always be accessed through the inventory store.
Manually reading and writing to the json file is highly insecure
This commit is contained in:
Johannes Kirschbauer
2025-05-27 17:20:04 +02:00
parent c32d196428
commit e6d1f71907
4 changed files with 21 additions and 219 deletions

View File

@@ -21,8 +21,11 @@ log = logging.getLogger(__name__)
@API.register
def delete_machine(machine: Machine) -> None:
inventory_store = inventory.InventoryStore(machine.flake)
try:
inventory.delete(machine.flake, {f"machines.{machine.name}"})
inventory_store.delete(
{f"machines.{machine.name}"},
)
except KeyError as exc:
# louis@(2025-03-09): test infrastructure does not seem to set the
# inventory properly, but more importantly only one machine in my

View File

@@ -5,7 +5,7 @@ from clan_cli.tests.age_keys import SopsSetup, assert_secrets_file_recipients
from clan_cli.tests.helpers import cli
from clan_cli.tests.stdout import CaptureOutput
from clan_lib.flake import Flake
from clan_lib.inventory import load_inventory_json
from clan_lib.persist.inventory_store import InventoryStore
@pytest.mark.impure
@@ -25,7 +25,8 @@ def test_machine_subcommands(
]
)
inventory: dict = dict(load_inventory_json(Flake(str(test_flake_with_core.path))))
inventory_store = InventoryStore(Flake(str(test_flake_with_core.path)))
inventory: dict = dict(inventory_store.read())
assert "machine1" in inventory["machines"]
assert "service" not in inventory
@@ -41,7 +42,7 @@ def test_machine_subcommands(
["machines", "delete", "--flake", str(test_flake_with_core.path), "machine1"]
)
inventory_2: dict = dict(load_inventory_json(Flake(str(test_flake_with_core.path))))
inventory_2: dict = dict(inventory_store.read())
assert "machine1" not in inventory_2["machines"]
assert "service" not in inventory_2

View File

@@ -11,216 +11,14 @@ Which is an abstraction over the inventory
Interacting with 'clan_lib.inventory' is NOT recommended and will be removed
"""
import json
from pathlib import Path
from typing import Any
from clan_lib.api import API
from clan_lib.errors import ClanError
from clan_lib.flake import Flake
from clan_lib.git import commit_file
from clan_lib.nix_models.inventory import Inventory
from clan_lib.persist.inventory_store import WriteInfo
from clan_lib.persist.util import (
apply_patch,
calc_patches,
delete_by_path,
determine_writeability,
)
def get_inventory_path(flake: Flake) -> Path:
"""
Get the path to the inventory file in the flake directory
"""
inventory_file = (flake.path / "inventory.json").resolve()
return inventory_file
def load_inventory_eval(flake: Flake) -> Inventory:
"""
Loads the evaluated inventory.
After all merge operations with eventual nix code in buildClan.
Evaluates clanInternals.inventory with nix. Which is performant.
- Contains all clan metadata
- Contains all machines
- and more
"""
data = flake.select("clanInternals.inventory")
try:
inventory = Inventory(data) # type: ignore
except json.JSONDecodeError as e:
msg = f"Error decoding inventory from flake: {e}"
raise ClanError(msg) from e
else:
return inventory
def get_inventory_current_priority(flake: Flake) -> dict:
"""
Returns the current priority of the inventory values
machines = {
__prio = 100;
flash-installer = {
__prio = 100;
deploy = {
targetHost = { __prio = 1500; };
};
description = { __prio = 1500; };
icon = { __prio = 1500; };
name = { __prio = 1500; };
tags = { __prio = 1500; };
};
}
"""
try:
data = flake.select("clanInternals.inventoryClass.introspection")
except json.JSONDecodeError as e:
msg = f"Error decoding inventory from flake: {e}"
raise ClanError(msg) from e
else:
return data
@API.register
def load_inventory_json(flake: Flake) -> Inventory:
"""
Load the inventory FILE from the flake directory
If no file is found, returns an empty dictionary
DO NOT USE THIS FUNCTION TO READ THE INVENTORY
Use load_inventory_eval instead
"""
inventory_file = get_inventory_path(flake)
if not inventory_file.exists():
return {}
with inventory_file.open() as f:
try:
res: dict = json.load(f)
inventory = Inventory(res) # type: ignore
except json.JSONDecodeError as e:
# Error decoding the inventory file
msg = f"Error decoding inventory file: {e}"
raise ClanError(msg) from e
return inventory
@API.register
def patch_inventory_with(flake: Flake, section: str, content: dict[str, Any]) -> None:
"""
Pass only the section to update and the content to update with.
Make sure you pass only attributes that you would like to persist.
ATTENTION: Don't pass nix eval values unintentionally.
"""
inventory_file = get_inventory_path(flake)
curr_inventory = {}
if inventory_file.exists():
with inventory_file.open("r") as f:
curr_inventory = json.load(f)
apply_patch(curr_inventory, section, content)
with inventory_file.open("w") as f:
json.dump(curr_inventory, f, indent=2)
commit_file(
inventory_file, flake.path, commit_message=f"inventory.{section}: Update"
)
@API.register
def get_inventory_with_writeable_keys(
flake: Flake,
) -> WriteInfo:
"""
Load the inventory and determine the writeable keys
Performs 2 nix evaluations to get the current priority and the inventory
"""
current_priority = get_inventory_current_priority(flake)
data_eval: Inventory = load_inventory_eval(flake)
data_disk: Inventory = load_inventory_json(flake)
writeables = determine_writeability(
current_priority, dict(data_eval), dict(data_disk)
)
return WriteInfo(writeables, data_eval, data_disk)
# TODO: remove this function in favor of a proper read/write API
@API.register
def set_inventory(
inventory: Inventory, flake: Flake, message: str, commit: bool = True
) -> None:
"""
Write the inventory to the flake directory
and commit it to git with the given message
"""
write_info = get_inventory_with_writeable_keys(flake)
# Remove internals from the inventory
inventory.pop("tags", None) # type: ignore
inventory.pop("options", None) # type: ignore
inventory.pop("assertions", None) # type: ignore
patchset, delete_set = calc_patches(
dict(write_info.data_disk),
dict(inventory),
dict(write_info.data_eval),
write_info.writeables,
)
persisted = dict(write_info.data_disk)
for patch_path, data in patchset.items():
apply_patch(persisted, patch_path, data)
for delete_path in delete_set:
delete_by_path(persisted, delete_path)
inventory_file = get_inventory_path(flake)
with inventory_file.open("w") as f:
json.dump(persisted, f, indent=2)
if commit:
commit_file(inventory_file, flake.path, commit_message=message)
# TODO: wrap this in a proper persistence API
def delete(flake: Flake, delete_set: set[str]) -> None:
"""
Delete keys from the inventory
"""
write_info = get_inventory_with_writeable_keys(flake)
data_disk = dict(write_info.data_disk)
for delete_path in delete_set:
delete_by_path(data_disk, delete_path)
inventory_file = get_inventory_path(flake)
with inventory_file.open("w") as f:
json.dump(data_disk, f, indent=2)
commit_file(
inventory_file,
flake.path,
commit_message=f"Delete inventory keys {delete_set}",
)
from clan_lib.persist.inventory_store import InventoryStore
@API.register
def get_inventory(flake: Flake) -> Inventory:
return load_inventory_eval(flake)
inventory_store = InventoryStore(flake)
inventory = inventory_store.read()
return inventory

View File

@@ -22,7 +22,6 @@ from clan_lib.cmd import RunOpts, run
from clan_lib.dirs import specific_machine_dir
from clan_lib.errors import ClanError
from clan_lib.flake import Flake
from clan_lib.inventory import patch_inventory_with
from clan_lib.machines.machines import Machine
from clan_lib.nix import nix_command
from clan_lib.nix_models.inventory import Machine as InventoryMachine
@@ -204,16 +203,17 @@ def test_clan_create_api(
result = check_machine_online(machine)
assert result == "Online", f"Machine {machine.name} is not online"
ssh_keys = [
SSHKeyPair(
private=private_key,
public=public_key,
)
]
# ssh_keys = [
# SSHKeyPair(
# private=private_key,
# public=public_key,
# )
# ]
# ===== CREATE BASE INVENTORY ======
inventory = create_base_inventory(ssh_keys)
patch_inventory_with(Flake(str(dest_clan_dir)), "services", inventory.services)
# TODO(@Qubasa): This seems unused?
# inventory = create_base_inventory(ssh_keys)
# patch_inventory_with(Flake(str(dest_clan_dir)), "services", inventory.services)
# Invalidate cache because of new inventory
clan_dir_flake.invalidate_cache()