API: init method for getting write info
This commit is contained in:
@@ -15,6 +15,7 @@ Operate on the returned inventory to make changes
|
|||||||
import contextlib
|
import contextlib
|
||||||
import json
|
import json
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -67,7 +68,7 @@ default_inventory: Inventory = {"meta": {"name": "New Clan"}}
|
|||||||
@API.register
|
@API.register
|
||||||
def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
||||||
"""
|
"""
|
||||||
Loads the actual inventory.
|
Loads the evaluated inventory.
|
||||||
After all merge operations with eventual nix code in buildClan.
|
After all merge operations with eventual nix code in buildClan.
|
||||||
|
|
||||||
Evaluates clanInternals.inventory with nix. Which is performant.
|
Evaluates clanInternals.inventory with nix. Which is performant.
|
||||||
@@ -87,8 +88,8 @@ def load_inventory_eval(flake_dir: str | Path) -> Inventory:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
res = proc.stdout.strip()
|
res = proc.stdout.strip()
|
||||||
data = json.loads(res)
|
data: dict = json.loads(res)
|
||||||
inventory = from_dict(Inventory, data)
|
inventory = Inventory(data) # type: ignore
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
msg = f"Error decoding inventory from flake: {e}"
|
msg = f"Error decoding inventory from flake: {e}"
|
||||||
raise ClanError(msg) from e
|
raise ClanError(msg) from e
|
||||||
@@ -145,7 +146,10 @@ def find_duplicates(string_list: list[str]) -> list[str]:
|
|||||||
|
|
||||||
|
|
||||||
def calc_patches(
|
def calc_patches(
|
||||||
persisted: dict, update: dict, all_values: dict, writeables: dict
|
persisted: dict[str, Any],
|
||||||
|
update: dict[str, Any],
|
||||||
|
all_values: dict[str, Any],
|
||||||
|
writeables: dict[str, set[str]],
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Calculate the patches to apply to the inventory.
|
Calculate the patches to apply to the inventory.
|
||||||
@@ -173,7 +177,8 @@ def calc_patches(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if update_key in writeables["writeable"]:
|
if update_key in writeables["writeable"]:
|
||||||
if type(update_data) is not type(all_values_flat.get(update_key)):
|
prev_value = all_values_flat.get(update_key)
|
||||||
|
if prev_value and type(update_data) is not type(prev_value):
|
||||||
msg = f"Type mismatch for key '{update_key}'. Cannot update {type(all_values_flat.get(update_key))} with {type(update_data)}"
|
msg = f"Type mismatch for key '{update_key}'. Cannot update {type(all_values_flat.get(update_key))} with {type(update_data)}"
|
||||||
raise ClanError(msg)
|
raise ClanError(msg)
|
||||||
|
|
||||||
@@ -207,14 +212,14 @@ def calc_patches(
|
|||||||
|
|
||||||
|
|
||||||
def determine_writeability(
|
def determine_writeability(
|
||||||
priorities: dict,
|
priorities: dict[str, Any],
|
||||||
defaults: dict,
|
defaults: dict[str, Any],
|
||||||
persisted: dict,
|
persisted: dict[str, Any],
|
||||||
parent_key: str = "",
|
parent_key: str = "",
|
||||||
parent_prio: int | None = None,
|
parent_prio: int | None = None,
|
||||||
results: dict | None = None,
|
results: dict | None = None,
|
||||||
non_writeable: bool = False,
|
non_writeable: bool = False,
|
||||||
) -> dict:
|
) -> dict[str, set[str]]:
|
||||||
if results is None:
|
if results is None:
|
||||||
results = {"writeable": set({}), "non_writeable": set({})}
|
results = {"writeable": set({}), "non_writeable": set({})}
|
||||||
|
|
||||||
@@ -323,21 +328,18 @@ def get_inventory_current_priority(flake_dir: str | Path) -> dict:
|
|||||||
|
|
||||||
|
|
||||||
@API.register
|
@API.register
|
||||||
def load_inventory_json(
|
def load_inventory_json(flake_dir: str | Path) -> Inventory:
|
||||||
flake_dir: str | Path, default: Inventory = default_inventory
|
|
||||||
) -> Inventory:
|
|
||||||
"""
|
"""
|
||||||
Load the inventory file from the flake directory
|
Load the inventory file from the flake directory
|
||||||
If no file is found, returns the default inventory
|
If no file is found, returns the default inventory
|
||||||
"""
|
"""
|
||||||
inventory = default
|
|
||||||
|
|
||||||
inventory_file = get_inventory_path(flake_dir)
|
inventory_file = get_inventory_path(flake_dir)
|
||||||
|
|
||||||
with inventory_file.open() as f:
|
with inventory_file.open() as f:
|
||||||
try:
|
try:
|
||||||
res = json.load(f)
|
res: dict = json.load(f)
|
||||||
inventory = from_dict(Inventory, res)
|
inventory = Inventory(res) # type: ignore
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
# Error decoding the inventory file
|
# Error decoding the inventory file
|
||||||
msg = f"Error decoding inventory file: {e}"
|
msg = f"Error decoding inventory file: {e}"
|
||||||
@@ -378,25 +380,55 @@ def patch_inventory_with(base_dir: Path, section: str, content: dict[str, Any])
|
|||||||
commit_file(inventory_file, base_dir, commit_message=f"inventory.{section}: Update")
|
commit_file(inventory_file, base_dir, commit_message=f"inventory.{section}: Update")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class WriteInfo:
|
||||||
|
writeables: dict[str, set[str]]
|
||||||
|
data_eval: Inventory
|
||||||
|
data_disk: Inventory
|
||||||
|
|
||||||
|
|
||||||
|
@API.register
|
||||||
|
def load_inventory_with_writeable_keys(
|
||||||
|
flake_dir: str | Path,
|
||||||
|
) -> WriteInfo:
|
||||||
|
"""
|
||||||
|
Load the inventory and determine the writeable keys
|
||||||
|
Performs 2 nix evaluations to get the current priority and the inventory
|
||||||
|
"""
|
||||||
|
current_priority = get_inventory_current_priority(flake_dir)
|
||||||
|
|
||||||
|
data_eval: Inventory = load_inventory_eval(flake_dir)
|
||||||
|
data_disk: Inventory = load_inventory_json(flake_dir)
|
||||||
|
|
||||||
|
writeables = determine_writeability(
|
||||||
|
current_priority, dict(data_eval), dict(data_disk)
|
||||||
|
)
|
||||||
|
|
||||||
|
return WriteInfo(writeables, data_eval, data_disk)
|
||||||
|
|
||||||
|
|
||||||
@API.register
|
@API.register
|
||||||
def set_inventory(inventory: Inventory, flake_dir: str | Path, message: str) -> None:
|
def set_inventory(inventory: Inventory, flake_dir: str | Path, message: str) -> None:
|
||||||
"""
|
"""
|
||||||
Write the inventory to the flake directory
|
Write the inventory to the flake directory
|
||||||
and commit it to git with the given message
|
and commit it to git with the given message
|
||||||
"""
|
"""
|
||||||
inventory_file = get_inventory_path(flake_dir, create=False)
|
|
||||||
|
|
||||||
# Filter out modules not set via UI.
|
write_info = load_inventory_with_writeable_keys(flake_dir)
|
||||||
# It is not possible to set modules from "/nix/store" via the UI
|
|
||||||
modules = {}
|
|
||||||
filtered_modules = lambda m: {
|
|
||||||
key: value for key, value in m.items() if "/nix/store" not in value
|
|
||||||
}
|
|
||||||
modules = filtered_modules(inventory.get("modules", {})) # type: ignore
|
|
||||||
inventory["modules"] = modules
|
|
||||||
|
|
||||||
|
patchset = calc_patches(
|
||||||
|
dict(write_info.data_disk),
|
||||||
|
dict(inventory),
|
||||||
|
dict(write_info.data_eval),
|
||||||
|
write_info.writeables,
|
||||||
|
)
|
||||||
|
|
||||||
|
for patch_path, data in patchset.items():
|
||||||
|
patch(dict(write_info.data_disk), patch_path, data)
|
||||||
|
|
||||||
|
inventory_file = get_inventory_path(flake_dir)
|
||||||
with inventory_file.open("w") as f:
|
with inventory_file.open("w") as f:
|
||||||
json.dump(inventory, f, indent=2)
|
json.dump(write_info.data_disk, f, indent=2)
|
||||||
|
|
||||||
commit_file(inventory_file, Path(flake_dir), commit_message=message)
|
commit_file(inventory_file, Path(flake_dir), commit_message=message)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user