From b752d2eb674beb74169fe08dc620b74103ed1f47 Mon Sep 17 00:00:00 2001 From: Johannes Kirschbauer Date: Wed, 17 Jul 2024 23:11:15 +0200 Subject: [PATCH] Init: Autogenerate classes from nix interfaces --- inventory.json | 3 - lib/inventory/build-inventory/interface.nix | 15 +- lib/inventory/interface-to-schema.nix | 3 + lib/jsonschema/default.nix | 2 +- pkgs/clan-cli/clan_cli/inventory/__init__.py | 219 +++++++++---------- pkgs/clan-cli/clan_cli/inventory/classes.py | 175 +++++++++++++++ pkgs/clan-cli/flake-module.nix | 34 ++- pkgs/classgen/main.py | 157 ++++++++----- 8 files changed, 416 insertions(+), 192 deletions(-) create mode 100644 pkgs/clan-cli/clan_cli/inventory/classes.py diff --git a/inventory.json b/inventory.json index f984b63a9..028bc4112 100644 --- a/inventory.json +++ b/inventory.json @@ -26,9 +26,6 @@ }, "roles": { "default": { - "config": { - "packages": ["vim"] - }, "imports": [], "machines": ["test-inventory-machine"], "tags": [] diff --git a/lib/inventory/build-inventory/interface.nix b/lib/inventory/build-inventory/interface.nix index 362bca5a2..e2e50f4b3 100644 --- a/lib/inventory/build-inventory/interface.nix +++ b/lib/inventory/build-inventory/interface.nix @@ -41,6 +41,7 @@ in options = { inherit (metaOptions) name description icon; tags = lib.mkOption { + default = [ ]; apply = lib.unique; type = types.listOf types.str; @@ -49,16 +50,10 @@ in default = null; type = types.nullOr types.str; }; - deploy = lib.mkOption { - default = { }; - type = types.submodule { - options = { - targetHost = lib.mkOption { - default = null; - type = types.nullOr types.str; - }; - }; - }; + deploy.targetHost = lib.mkOption { + description = "Configuration for the deployment of the machine"; + default = null; + type = types.nullOr types.str; }; }; } diff --git a/lib/inventory/interface-to-schema.nix b/lib/inventory/interface-to-schema.nix index b9ee02808..4256e4ddc 100644 --- a/lib/inventory/interface-to-schema.nix +++ b/lib/inventory/interface-to-schema.nix @@ -55,6 +55,7 @@ let inventorySchema.properties.services.additionalProperties.additionalProperties.properties.meta; config = { title = "${moduleName}-config"; + default = { }; } // moduleSchema; roles = { type = "object"; @@ -69,6 +70,7 @@ let { properties.config = { title = "${moduleName}-config"; + default = { }; } // moduleSchema; }; }) (rolesOf moduleName) @@ -80,6 +82,7 @@ let { additionalProperties.properties.config = { title = "${moduleName}-config"; + default = { }; } // moduleSchema; }; }; diff --git a/lib/jsonschema/default.nix b/lib/jsonschema/default.nix index 631ef016b..8b9d03308 100644 --- a/lib/jsonschema/default.nix +++ b/lib/jsonschema/default.nix @@ -318,7 +318,7 @@ rec { # return jsonschema property definition for submodule # then (lib.attrNames (option.type.getSubOptions option.loc).opt) then - parseOptions' (option.type.getSubOptions option.loc) + example // description // parseOptions' (option.type.getSubOptions option.loc) # throw error if option type is not supported else notSupported option; diff --git a/pkgs/clan-cli/clan_cli/inventory/__init__.py b/pkgs/clan-cli/clan_cli/inventory/__init__.py index 95778b7e8..618566bdd 100644 --- a/pkgs/clan-cli/clan_cli/inventory/__init__.py +++ b/pkgs/clan-cli/clan_cli/inventory/__init__.py @@ -1,16 +1,22 @@ -# ruff: noqa: N815 -# ruff: noqa: N806 +import dataclasses import json -from dataclasses import asdict, dataclass, field, is_dataclass +from dataclasses import asdict, fields, is_dataclass from pathlib import Path -from typing import Any, Literal +from types import UnionType +from typing import Any, get_args, get_origin from clan_cli.errors import ClanError from clan_cli.git import commit_file +from .classes import Inventory as NixInventory +from .classes import Machine, Service +from .classes import Meta as InventoryMeta + +__all__ = ["Service", "Machine", "InventoryMeta"] + def sanitize_string(s: str) -> str: - return s.replace("\\", "\\\\").replace('"', '\\"') + return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") def dataclass_to_dict(obj: Any) -> Any: @@ -37,149 +43,132 @@ def dataclass_to_dict(obj: Any) -> Any: return obj -@dataclass -class DeploymentInfo: +def is_union_type(type_hint: type) -> bool: + return type(type_hint) is UnionType + + +def get_inner_type(type_hint: type) -> type: + if is_union_type(type_hint): + # Return the first non-None type + return next(t for t in get_args(type_hint) if t is not type(None)) + return type_hint + + +def get_second_type(type_hint: type[dict]) -> type: """ - Deployment information for a machine. + Get the value type of a dictionary type hint """ + args = get_args(type_hint) + if len(args) == 2: + # Return the second argument, which should be the value type (Machine) + return args[1] - targetHost: str | None = None + raise ValueError(f"Invalid type hint for dict: {type_hint}") -@dataclass -class Machine: +def from_dict(t: type, data: dict[str, Any] | None) -> Any: """ - Inventory machine model. - - DO NOT EDIT THIS CLASS. - Any changes here must be reflected in the inventory interface file and potentially other nix files. - - - Persisted to the inventory.json file - - Source of truth to generate each clan machine. - - For hardware deployment, the machine must declare the host system. + Dynamically instantiate a data class from a dictionary, handling nested data classes. """ + if data is None: + return None - name: str - deploy: DeploymentInfo = field(default_factory=DeploymentInfo) - description: str | None = None - icon: str | None = None - tags: list[str] = field(default_factory=list) - system: Literal["x86_64-linux"] | str | None = None + try: + # Attempt to create an instance of the data_class + field_values = {} + for field in fields(t): + field_value = data.get(field.name) + field_type = get_inner_type(field.type) # type: ignore - @staticmethod - def from_dict(data: dict[str, Any]) -> "Machine": - targetHost = data.get("deploy", {}).get("targetHost", None) - return Machine( - name=data["name"], - description=data.get("description", None), - icon=data.get("icon", None), - tags=data.get("tags", []), - system=data.get("system", None), - deploy=DeploymentInfo(targetHost), - ) + if field.name in data: + # The field is present + + # If the field is another dataclass, recursively instantiate it + if is_dataclass(field_type): + field_value = from_dict(field_type, field_value) + elif isinstance(field_type, Path | str) and isinstance( + field_value, str + ): + field_value = ( + Path(field_value) if field_type == Path else field_value + ) + elif get_origin(field_type) is dict and isinstance(field_value, dict): + # The field is a dictionary with a specific type + inner_type = get_second_type(field_type) + field_value = { + k: from_dict(inner_type, v) for k, v in field_value.items() + } + elif get_origin is list and isinstance(field_value, list): + # The field is a list with a specific type + inner_type = get_args(field_type)[0] + field_value = [from_dict(inner_type, v) for v in field_value] + + # Set the value + if ( + field.default is not dataclasses.MISSING + or field.default_factory is not dataclasses.MISSING + ): + # Fields with default value + # a: Int = 1 + # b: list = Field(default_factory=list) + if field.name in data or field_value is not None: + field_values[field.name] = field_value + else: + # Fields without default value + # a: Int + field_values[field.name] = field_value + + return t(**field_values) + + except (TypeError, ValueError) as e: + print(f"Failed to instantiate {t.__name__}: {e}") + return None -@dataclass -class MachineServiceConfig: - config: dict[str, Any] = field(default_factory=dict) - imports: list[str] = field(default_factory=list) - - -@dataclass -class ServiceMeta: - name: str - description: str | None = None - icon: str | None = None - - -@dataclass -class Role: - config: dict[str, Any] = field(default_factory=dict) - imports: list[str] = field(default_factory=list) - machines: list[str] = field(default_factory=list) - tags: list[str] = field(default_factory=list) - - -@dataclass -class Service: - meta: ServiceMeta - roles: dict[str, Role] - config: dict[str, Any] = field(default_factory=dict) - imports: list[str] = field(default_factory=list) - machines: dict[str, MachineServiceConfig] = field(default_factory=dict) - - @staticmethod - def from_dict(d: dict[str, Any]) -> "Service": - return Service( - meta=ServiceMeta(**d.get("meta", {})), - roles={name: Role(**role) for name, role in d.get("roles", {}).items()}, - machines=( - { - name: MachineServiceConfig(**machine) - for name, machine in d.get("machines", {}).items() - } - if d.get("machines") - else {} - ), - config=d.get("config", {}), - imports=d.get("imports", []), - ) - - -@dataclass -class InventoryMeta: - name: str - description: str | None = None - icon: str | None = None - - -@dataclass class Inventory: - meta: InventoryMeta - machines: dict[str, Machine] - services: dict[str, dict[str, Service]] + nix_inventory: NixInventory - @staticmethod - def from_dict(d: dict[str, Any]) -> "Inventory": - return Inventory( - meta=InventoryMeta(**d.get("meta", {})), - machines={ - name: Machine.from_dict(machine) - for name, machine in d.get("machines", {}).items() - }, - services={ - name: { - role: Service.from_dict(service) - for role, service in services.items() - } - for name, services in d.get("services", {}).items() - }, + def __init__(self) -> None: + self.nix_inventory = NixInventory( + meta=InventoryMeta(name="New Clan"), machines={}, services=Service() ) @staticmethod def get_path(flake_dir: str | Path) -> Path: - return Path(flake_dir) / "inventory.json" + return (Path(flake_dir) / "inventory.json").resolve() @staticmethod def load_file(flake_dir: str | Path) -> "Inventory": - inventory = Inventory( - machines={}, services={}, meta=InventoryMeta(name="New Clan") + inventory = from_dict( + NixInventory, + { + "meta": {"name": "New Clan"}, + "machines": {}, + "services": {}, + }, ) + + NixInventory( + meta=InventoryMeta(name="New Clan"), machines={}, services=Service() + ) + inventory_file = Inventory.get_path(flake_dir) if inventory_file.exists(): with open(inventory_file) as f: try: res = json.load(f) - inventory = Inventory.from_dict(res) + inventory = from_dict(NixInventory, res) except json.JSONDecodeError as e: raise ClanError(f"Error decoding inventory file: {e}") - return inventory + res = Inventory() + res.nix_inventory = inventory + return Inventory() def persist(self, flake_dir: str | Path, message: str) -> None: inventory_file = Inventory.get_path(flake_dir) with open(inventory_file, "w") as f: - json.dump(dataclass_to_dict(self), f, indent=2) + json.dump(dataclass_to_dict(self.nix_inventory), f, indent=2) commit_file(inventory_file, Path(flake_dir), commit_message=message) diff --git a/pkgs/clan-cli/clan_cli/inventory/classes.py b/pkgs/clan-cli/clan_cli/inventory/classes.py new file mode 100644 index 000000000..5aab8aa83 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/inventory/classes.py @@ -0,0 +1,175 @@ + +# DON NOT EDIT THIS FILE MANUALLY. IT IS GENERATED. +# UPDATE +# ruff: noqa: N815 +# ruff: noqa: N806 +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class MachineDeploy: + targetHost: str | None = None + + +@dataclass +class Machine: + deploy: MachineDeploy + name: str + description: str | None = None + icon: str | None = None + system: str | None = None + tags: list[str] = field(default_factory=list) + + +@dataclass +class Meta: + name: str + description: str | None = None + icon: str | None = None + + +@dataclass +class BorgbackupConfigDestination: + repo: str + name: str + + +@dataclass +class BorgbackupConfig: + destinations: dict[str, BorgbackupConfigDestination] | dict[str,Any] = field(default_factory=dict) + + +@dataclass +class ServiceBorgbackupMachine: + config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + + +@dataclass +class ServiceBorgbackupMeta: + name: str + description: str | None = None + icon: str | None = None + + +@dataclass +class ServiceBorgbackupRoleClient: + config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + machines: list[str] = field(default_factory=list) + tags: list[str] = field(default_factory=list) + + +@dataclass +class ServiceBorgbackupRoleServer: + config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + machines: list[str] = field(default_factory=list) + tags: list[str] = field(default_factory=list) + + +@dataclass +class ServiceBorgbackupRole: + client: ServiceBorgbackupRoleClient + server: ServiceBorgbackupRoleServer + + +@dataclass +class ServiceBorgbackup: + meta: ServiceBorgbackupMeta + roles: ServiceBorgbackupRole + config: BorgbackupConfig | dict[str,Any] = field(default_factory=dict) + machines: dict[str, ServiceBorgbackupMachine] | dict[str,Any] = field(default_factory=dict) + + +@dataclass +class PackagesConfig: + packages: list[str] = field(default_factory=list) + + +@dataclass +class ServicePackageMachine: + config: dict[str,Any] | PackagesConfig = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + + +@dataclass +class ServicePackageMeta: + name: str + description: str | None = None + icon: str | None = None + + +@dataclass +class ServicePackageRoleDefault: + config: dict[str,Any] | PackagesConfig = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + machines: list[str] = field(default_factory=list) + tags: list[str] = field(default_factory=list) + + +@dataclass +class ServicePackageRole: + default: ServicePackageRoleDefault + + +@dataclass +class ServicePackage: + meta: ServicePackageMeta + roles: ServicePackageRole + config: dict[str,Any] | PackagesConfig = field(default_factory=dict) + machines: dict[str, ServicePackageMachine] | dict[str,Any] = field(default_factory=dict) + + +@dataclass +class SingleDiskConfig: + device: str + + +@dataclass +class ServiceSingleDiskMachine: + config: SingleDiskConfig | dict[str,Any] = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + + +@dataclass +class ServiceSingleDiskMeta: + name: str + description: str | None = None + icon: str | None = None + + +@dataclass +class ServiceSingleDiskRoleDefault: + config: SingleDiskConfig | dict[str,Any] = field(default_factory=dict) + imports: list[str] = field(default_factory=list) + machines: list[str] = field(default_factory=list) + tags: list[str] = field(default_factory=list) + + +@dataclass +class ServiceSingleDiskRole: + default: ServiceSingleDiskRoleDefault + + +@dataclass +class ServiceSingleDisk: + meta: ServiceSingleDiskMeta + roles: ServiceSingleDiskRole + config: SingleDiskConfig | dict[str,Any] = field(default_factory=dict) + machines: dict[str, ServiceSingleDiskMachine] | dict[str,Any] = field(default_factory=dict) + + +@dataclass +class Service: + borgbackup: dict[str, ServiceBorgbackup] = field(default_factory=dict) + packages: dict[str, ServicePackage] = field(default_factory=dict) + single_disk: dict[str, ServiceSingleDisk] = field(default_factory=dict) + + +@dataclass +class Inventory: + meta: Meta + services: Service + machines: dict[str, Machine] | dict[str,Any] = field(default_factory=dict) diff --git a/pkgs/clan-cli/flake-module.nix b/pkgs/clan-cli/flake-module.nix index b5645105c..604ccff5b 100644 --- a/pkgs/clan-cli/flake-module.nix +++ b/pkgs/clan-cli/flake-module.nix @@ -9,7 +9,7 @@ { self', pkgs, ... }: let flakeLock = lib.importJSON (self + /flake.lock); - flakeInputs = (builtins.removeAttrs inputs [ "self" ]); + flakeInputs = builtins.removeAttrs inputs [ "self" ]; flakeLockVendoredDeps = flakeLock // { nodes = flakeLock.nodes @@ -38,7 +38,6 @@ ''; in { - devShells.clan-cli = pkgs.callPackage ./shell.nix { inherit (self'.packages) clan-cli clan-cli-full; inherit self'; @@ -84,6 +83,35 @@ default = self'.packages.clan-cli; }; - checks = self'.packages.clan-cli.tests; + checks = self'.packages.clan-cli.tests // { + inventory-classes-up-to-date = pkgs.stdenv.mkDerivation { + name = "inventory-classes-up-to-date"; + src = ./clan_cli/inventory; + + env = { + classFile = "classes.py"; + }; + installPhase = '' + ${self'.packages.classgen}/bin/classgen ${self'.packages.inventory-schema-pretty}/schema.json b_classes.py + file1=$classFile + file2=b_classes.py + + echo "Comparing $file1 and $file2" + if cmp -s "$file1" "$file2"; then + echo "Files are identical" + echo "Classes file is up to date" + else + echo "Classes file is out of date or has been modified" + echo "run ./update.sh in the inventory directory to update the classes file" + echo "--------------------------------\n" + diff "$file1" "$file2" + echo "--------------------------------\n\n" + exit 1 + fi + + touch $out + ''; + }; + }; }; } diff --git a/pkgs/classgen/main.py b/pkgs/classgen/main.py index b2d138690..3f1201d18 100644 --- a/pkgs/classgen/main.py +++ b/pkgs/classgen/main.py @@ -3,26 +3,33 @@ import json from typing import Any -# Function to map JSON schema types to Python types -def map_json_type(json_type: Any, nested_type: str = "Any") -> str: +# Function to map JSON schemas and types to Python types +def map_json_type( + json_type: Any, nested_types: set[str] = {"Any"}, parent: Any = None +) -> set[str]: if isinstance(json_type, list): - return " | ".join(map(map_json_type, json_type)) + res = set() + for t in json_type: + res |= map_json_type(t) + return res if isinstance(json_type, dict): return map_json_type(json_type.get("type")) elif json_type == "string": - return "str" + return {"str"} elif json_type == "integer": - return "int" + return {"int"} elif json_type == "boolean": - return "bool" + return {"bool"} elif json_type == "array": - return f"list[{nested_type}]" # Further specification can be handled if needed + assert nested_types, f"Array type not found for {parent}" + return {f"""list[{" | ".join(nested_types)}]"""} elif json_type == "object": - return f"dict[str, {nested_type}]" + assert nested_types, f"dict type not found for {parent}" + return {f"""dict[str, {" | ".join(nested_types)}]"""} elif json_type == "null": - return "None" + return {"None"} else: - return "Any" + raise ValueError(f"Python type not found for {json_type}") known_classes = set() @@ -32,9 +39,9 @@ root_class = "Inventory" # Recursive function to generate dataclasses from JSON schema def generate_dataclass(schema: dict[str, Any], class_name: str = root_class) -> str: properties = schema.get("properties", {}) - required = schema.get("required", []) - fields = [] + required_fields = [] + fields_with_default = [] nested_classes = [] for prop, prop_info in properties.items(): @@ -42,77 +49,107 @@ def generate_dataclass(schema: dict[str, Any], class_name: str = root_class) -> prop_type = prop_info.get("type", None) union_variants = prop_info.get("oneOf", []) + # Collect all types + field_types = set() title = prop_info.get("title", prop.removesuffix("s")) title_sanitized = "".join([p.capitalize() for p in title.split("-")]) nested_class_name = f"""{class_name if class_name != root_class and not prop_info.get("title") else ""}{title_sanitized}""" - # if nested_class_name == "ServiceBorgbackupRoleServerConfig": - # breakpoint() - if (prop_type is None) and (not union_variants): raise ValueError(f"Type not found for property {prop} {prop_info}") - # Unions fields (oneOf) - # str | int | None - python_type = None - if union_variants: - python_type = map_json_type(union_variants) + field_types = map_json_type(union_variants) + elif prop_type == "array": item_schema = prop_info.get("items") + if isinstance(item_schema, dict): - python_type = map_json_type( - prop_type, - map_json_type(item_schema), + field_types = map_json_type( + prop_type, map_json_type(item_schema), field_name ) - else: - python_type = map_json_type( - prop_type, - map_json_type([i for i in prop_info.get("items", [])]), - ) - assert python_type, f"Python type not found for {prop} {prop_info}" + elif prop_type == "object": + inner_type = prop_info.get("additionalProperties") + if inner_type and inner_type.get("type") == "object": + # Inner type is a class + field_types = map_json_type(prop_type, {nested_class_name}, field_name) - if prop in required: - field_def = f"{prop}: {python_type}" - else: - field_def = f"{prop}: {python_type} | None = None" + # + if nested_class_name not in known_classes: + nested_classes.append( + generate_dataclass(inner_type, nested_class_name) + ) + known_classes.add(nested_class_name) - if prop_type == "object": - map_type = prop_info.get("additionalProperties") - if map_type: - # breakpoint() - if map_type.get("type") == "object": - # Non trivial type - if nested_class_name not in known_classes: - nested_classes.append( - generate_dataclass(map_type, nested_class_name) - ) - known_classes.add(nested_class_name) - field_def = f"{field_name}: dict[str, {nested_class_name}]" - else: - # Trivial type - field_def = f"{field_name}: dict[str, {map_json_type(map_type)}]" - else: + elif inner_type and inner_type.get("type") != "object": + # Trivial type + field_types = map_json_type(inner_type) + + elif not inner_type: + # The type is a class + field_types = {nested_class_name} if nested_class_name not in known_classes: nested_classes.append( generate_dataclass(prop_info, nested_class_name) ) known_classes.add(nested_class_name) + else: + field_types = map_json_type( + prop_type, + nested_types=set(), + parent=field_name, + ) - field_def = f"{field_name}: {nested_class_name}" + assert field_types, f"Python type not found for {prop} {prop_info}" - elif prop_type == "array": - items = prop_info.get("items", {}) - if items.get("type") == "object": - nested_class_name = prop.capitalize() - nested_classes.append(generate_dataclass(items, nested_class_name)) - field_def = f"{field_name}: List[{nested_class_name}]" + serialised_types = " | ".join(field_types) + field_def = f"{field_name}: {serialised_types}" - fields.append(field_def) + if "default" in prop_info or field_name not in prop_info.get("required", []): + if "default" in prop_info: + default_value = prop_info.get("default") + if default_value is None: + field_types |= {"None"} + serialised_types = " | ".join(field_types) + field_def = f"{field_name}: {serialised_types} = None" + elif isinstance(default_value, list): + field_def = f"{field_def} = field(default_factory=list)" + elif isinstance(default_value, dict): + field_types |= {"dict[str,Any]"} + serialised_types = " | ".join(field_types) + field_def = f"{field_name}: {serialised_types} = field(default_factory=dict)" + elif default_value == "‹name›": + # Special case for nix submodules + pass + else: + # Other default values unhandled yet. + raise ValueError( + f"Unhandled default value for field '{field_name}' - default value: {default_value}" + ) - fields_str = "\n ".join(fields) + fields_with_default.append(field_def) + + if "default" not in prop_info: + # Field is not required and but also specifies no default value + # Trying to infer default value from type + if "dict" in str(serialised_types): + field_def = f"{field_name}: {serialised_types} = field(default_factory=dict)" + fields_with_default.append(field_def) + elif "list" in str(serialised_types): + field_def = f"{field_name}: {serialised_types} = field(default_factory=list)" + fields_with_default.append(field_def) + elif "None" in str(serialised_types): + field_def = f"{field_name}: {serialised_types} = None" + fields_with_default.append(field_def) + else: + # Field is not required and but also specifies no default value + required_fields.append(field_def) + else: + required_fields.append(field_def) + + fields_str = "\n ".join(required_fields + fields_with_default) nested_classes_str = "\n\n".join(nested_classes) class_def = f"@dataclass\nclass {class_name}:\n {fields_str}\n" @@ -130,10 +167,10 @@ def run_gen(args: argparse.Namespace) -> None: f.write( """ # DON NOT EDIT THIS FILE MANUALLY. IT IS GENERATED. -# UPDATE: +# UPDATE # ruff: noqa: N815 # ruff: noqa: N806 -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any\n\n """ )