diff --git a/pkgs/clan-app/clan_app/components/serializer.py b/pkgs/clan-app/clan_app/components/serializer.py deleted file mode 100644 index d3b0563e3..000000000 --- a/pkgs/clan-app/clan_app/components/serializer.py +++ /dev/null @@ -1,92 +0,0 @@ -import dataclasses -import logging -from dataclasses import fields, is_dataclass -from pathlib import Path -from types import UnionType -from typing import Any, get_args - -import gi - -gi.require_version("WebKit", "6.0") - -log = logging.getLogger(__name__) - - -def sanitize_string(s: str) -> str: - return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") - - -def dataclass_to_dict(obj: Any) -> Any: - """ - Utility function to convert dataclasses to dictionaries - It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries - - It does NOT convert member functions. - """ - if dataclasses.is_dataclass(obj): - return { - sanitize_string(k): dataclass_to_dict(v) - for k, v in dataclasses.asdict(obj).items() - } - elif isinstance(obj, list | tuple): - return [dataclass_to_dict(item) for item in obj] - elif isinstance(obj, dict): - return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()} - elif isinstance(obj, Path): - return str(obj) - elif isinstance(obj, str): - return sanitize_string(obj) - else: - return obj - - -def is_union_type(type_hint: type) -> bool: - return type(type_hint) is UnionType - - -def get_inner_type(type_hint: type) -> type: - if is_union_type(type_hint): - # Return the first non-None type - return next(t for t in get_args(type_hint) if t is not type(None)) - return type_hint - - -def from_dict(t: type, data: dict[str, Any] | None) -> Any: - """ - Dynamically instantiate a data class from a dictionary, handling nested data classes. - """ - if not data: - return None - - try: - # Attempt to create an instance of the data_class - field_values = {} - for field in fields(t): - field_value = data.get(field.name) - field_type = get_inner_type(field.type) - if field_value is not None: - # If the field is another dataclass, recursively instantiate it - if is_dataclass(field_type): - field_value = from_dict(field_type, field_value) - elif isinstance(field_type, Path | str) and isinstance( - field_value, str - ): - field_value = ( - Path(field_value) if field_type == Path else field_value - ) - - if ( - field.default is not dataclasses.MISSING - or field.default_factory is not dataclasses.MISSING - ): - # Field has a default value. We cannot set the value to None - if field_value is not None: - field_values[field.name] = field_value - else: - field_values[field.name] = field_value - - return t(**field_values) - - except (TypeError, ValueError) as e: - print(f"Failed to instantiate {t.__name__}: {e}") - return None diff --git a/pkgs/clan-cli/clan_cli/api/__init__.py b/pkgs/clan-cli/clan_cli/api/__init__.py index 5237bc2a5..192bd4524 100644 --- a/pkgs/clan-cli/clan_cli/api/__init__.py +++ b/pkgs/clan-cli/clan_cli/api/__init__.py @@ -1,141 +1,22 @@ -import dataclasses -import json from collections.abc import Callable -from dataclasses import dataclass, fields, is_dataclass +from dataclasses import dataclass from functools import wraps from inspect import Parameter, Signature, signature -from pathlib import Path -from types import UnionType from typing import ( Annotated, Any, Generic, Literal, TypeVar, - get_args, - get_origin, get_type_hints, ) +from .serde import dataclass_to_dict, from_dict, sanitize_string + +__all__ = ["from_dict", "dataclass_to_dict", "sanitize_string"] + from clan_cli.errors import ClanError - -def sanitize_string(s: str) -> str: - # Using the native string sanitizer to handle all edge cases - # Remove the outer quotes '"string"' - return json.dumps(s)[1:-1] - - -def dataclass_to_dict(obj: Any) -> Any: - """ - Utility function to convert dataclasses to dictionaries - It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries - - It does NOT convert member functions. - """ - if is_dataclass(obj): - return { - # Use either the original name or name - sanitize_string( - field.metadata.get("original_name", field.name) - ): dataclass_to_dict(getattr(obj, field.name)) - for field in fields(obj) # type: ignore - } - elif isinstance(obj, list | tuple): - return [dataclass_to_dict(item) for item in obj] - elif isinstance(obj, dict): - return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()} - elif isinstance(obj, Path): - return sanitize_string(str(obj)) - elif isinstance(obj, str): - return sanitize_string(obj) - else: - return obj - - -def is_union_type(type_hint: type) -> bool: - return type(type_hint) is UnionType - - -def get_inner_type(type_hint: type) -> type: - if is_union_type(type_hint): - # Return the first non-None type - return next(t for t in get_args(type_hint) if t is not type(None)) - return type_hint - - -def get_second_type(type_hint: type[dict]) -> type: - """ - Get the value type of a dictionary type hint - """ - args = get_args(type_hint) - if len(args) == 2: - # Return the second argument, which should be the value type (Machine) - return args[1] - - raise ValueError(f"Invalid type hint for dict: {type_hint}") - - -def from_dict(t: type, data: dict[str, Any] | None) -> Any: - """ - Dynamically instantiate a data class from a dictionary, handling nested data classes. - """ - if data is None: - return None - - try: - # Attempt to create an instance of the data_class - field_values = {} - for field in fields(t): - original_name = field.metadata.get("original_name", field.name) - - field_value = data.get(original_name) - - field_type = get_inner_type(field.type) # type: ignore - - if original_name in data: - # If the field is another dataclass, recursively instantiate it - if is_dataclass(field_type): - field_value = from_dict(field_type, field_value) - elif isinstance(field_type, Path | str) and isinstance( - field_value, str - ): - field_value = ( - Path(field_value) if field_type == Path else field_value - ) - elif get_origin(field_type) is dict and isinstance(field_value, dict): - # The field is a dictionary with a specific type - inner_type = get_second_type(field_type) - field_value = { - k: from_dict(inner_type, v) for k, v in field_value.items() - } - elif get_origin is list and isinstance(field_value, list): - # The field is a list with a specific type - inner_type = get_args(field_type)[0] - field_value = [from_dict(inner_type, v) for v in field_value] - - # Set the value - if ( - field.default is not dataclasses.MISSING - or field.default_factory is not dataclasses.MISSING - ): - # Fields with default value - # a: Int = 1 - # b: list = Field(default_factory=list) - if original_name in data or field_value is not None: - field_values[field.name] = field_value - else: - # Fields without default value - # a: Int - field_values[field.name] = field_value - - return t(**field_values) - - except (TypeError, ValueError) as e: - print(f"Failed to instantiate {t.__name__}: {e} {data}") - return None - - T = TypeVar("T") ResponseDataType = TypeVar("ResponseDataType") diff --git a/pkgs/clan-cli/clan_cli/api/serde.py b/pkgs/clan-cli/clan_cli/api/serde.py new file mode 100644 index 000000000..25fa33498 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/api/serde.py @@ -0,0 +1,101 @@ +""" +This module provides utility functions for serialization and deserialization of data classes. + +Functions: +- sanitize_string(s: str) -> str: Ensures a string is properly escaped for json serializing. +- dataclass_to_dict(obj: Any) -> Any: Converts a data class and its nested data classes, lists, tuples, and dictionaries to dictionaries. +- from_dict(t: type[T], data: Any) -> T: Dynamically instantiates a data class from a dictionary, constructing nested data classes, validates all required fields exist and have the expected type. + +Classes: +- TypeAdapter: A Pydantic type adapter for data classes. + +Exceptions: +- ValidationError: Raised when there is a validation error during deserialization. +- ClanError: Raised when there is an error during serialization or deserialization. + +Dependencies: +- dataclasses: Provides the @dataclass decorator and related functions for creating data classes. +- json: Provides functions for working with JSON data. +- collections.abc: Provides abstract base classes for collections. +- functools: Provides functions for working with higher-order functions and decorators. +- inspect: Provides functions for inspecting live objects. +- operator: Provides functions for working with operators. +- pathlib: Provides classes for working with filesystem paths. +- types: Provides functions for working with types. +- typing: Provides support for type hints. +- pydantic: A library for data validation and settings management. +- pydantic_core: Core functionality for Pydantic. + +Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module. +""" + +import json +from dataclasses import dataclass, fields, is_dataclass +from pathlib import Path +from typing import ( + Any, + TypeVar, +) + +from pydantic import TypeAdapter, ValidationError +from pydantic_core import ErrorDetails + +from clan_cli.errors import ClanError + + +def sanitize_string(s: str) -> str: + # Using the native string sanitizer to handle all edge cases + # Remove the outer quotes '"string"' + return json.dumps(s)[1:-1] + + +def dataclass_to_dict(obj: Any) -> Any: + """ + Utility function to convert dataclasses to dictionaries + It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries + + It does NOT convert member functions. + """ + if is_dataclass(obj): + return { + # Use either the original name or name + sanitize_string( + field.metadata.get("original_name", field.name) + ): dataclass_to_dict(getattr(obj, field.name)) + for field in fields(obj) + if not field.name.startswith("_") # type: ignore + } + elif isinstance(obj, list | tuple): + return [dataclass_to_dict(item) for item in obj] + elif isinstance(obj, dict): + return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()} + elif isinstance(obj, Path): + return sanitize_string(str(obj)) + elif isinstance(obj, str): + return sanitize_string(obj) + else: + return obj + + +T = TypeVar("T", bound=dataclass) # type: ignore + + +def from_dict(t: type[T], data: Any) -> T: + """ + Dynamically instantiate a data class from a dictionary, handling nested data classes. + We use dataclasses. But the deserialization logic of pydantic takes a lot of complexity. + """ + adapter = TypeAdapter(t) + try: + return adapter.validate_python(data) + except ValidationError as e: + fst_error: ErrorDetails = e.errors()[0] + if not fst_error: + raise ClanError(msg=str(e)) + + msg = fst_error.get("msg") + loc = fst_error.get("loc") + field_path = "Unknown" + if loc: + field_path = str(loc) + raise ClanError(msg=msg, location=f"{t!s}: {field_path}", description=str(e)) diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index a1968f850..bdde4d6c3 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -17,6 +17,8 @@ setuptools, stdenv, + pydantic, + # custom args clan-core-path, nixpkgs, @@ -28,6 +30,7 @@ let pythonDependencies = [ argcomplete # Enables shell completions + pydantic ]; # load nixpkgs runtime dependencies from a json file diff --git a/pkgs/clan-cli/tests/test_deserializers.py b/pkgs/clan-cli/tests/test_deserializers.py new file mode 100644 index 000000000..778d3728f --- /dev/null +++ b/pkgs/clan-cli/tests/test_deserializers.py @@ -0,0 +1,157 @@ +from dataclasses import dataclass, field +from pathlib import Path + +import pytest + +# Functions to test +from clan_cli.api import ( + dataclass_to_dict, + from_dict, +) +from clan_cli.errors import ClanError +from clan_cli.inventory import ( + Inventory, + Machine, + MachineDeploy, + Meta, + Service, + ServiceBorgbackup, + ServiceBorgbackupRole, + ServiceBorgbackupRoleClient, + ServiceBorgbackupRoleServer, + ServiceMeta, +) + + +def test_simple() -> None: + @dataclass + class Person: + name: str + + person_dict = { + "name": "John", + } + + expected_person = Person( + name="John", + ) + + assert from_dict(Person, person_dict) == expected_person + + +def test_nested() -> None: + @dataclass + class Age: + value: str + + @dataclass + class Person: + name: str + # deeply nested dataclasses + age: Age + age_list: list[Age] + age_dict: dict[str, Age] + # Optional field + home: Path | None + + person_dict = { + "name": "John", + "age": { + "value": "99", + }, + "age_list": [{"value": "66"}, {"value": "77"}], + "age_dict": {"now": {"value": "55"}, "max": {"value": "100"}}, + "home": "/home", + } + + expected_person = Person( + name="John", + age=Age("99"), + age_list=[Age("66"), Age("77")], + age_dict={"now": Age("55"), "max": Age("100")}, + home=Path("/home"), + ) + + assert from_dict(Person, person_dict) == expected_person + + +def test_simple_field_missing() -> None: + @dataclass + class Person: + name: str + + person_dict = {} + + with pytest.raises(ClanError): + from_dict(Person, person_dict) + + +def test_deserialize_extensive_inventory() -> None: + data = { + "meta": {"name": "superclan", "description": "nice clan"}, + "services": { + "borgbackup": { + "instance1": { + "meta": { + "name": "borg1", + }, + "roles": { + "client": {}, + "server": {}, + }, + } + }, + }, + "machines": {"foo": {"name": "foo", "deploy": {}}}, + } + expected = Inventory( + meta=Meta(name="superclan", description="nice clan"), + services=Service( + borgbackup={ + "instance1": ServiceBorgbackup( + meta=ServiceMeta(name="borg1"), + roles=ServiceBorgbackupRole( + client=ServiceBorgbackupRoleClient(), + server=ServiceBorgbackupRoleServer(), + ), + ) + } + ), + machines={"foo": Machine(deploy=MachineDeploy(), name="foo")}, + ) + assert from_dict(Inventory, data) == expected + + +def test_alias_field() -> None: + @dataclass + class Person: + name: str = field(metadata={"alias": "--user-name--"}) + + data = {"--user-name--": "John"} + expected = Person(name="John") + + assert from_dict(Person, data) == expected + + +def test_path_field() -> None: + @dataclass + class Person: + name: Path + + data = {"name": "John"} + expected = Person(name=Path("John")) + + assert from_dict(Person, data) == expected + + +def test_private_public_fields() -> None: + @dataclass + class Person: + name: Path + _name: str | None = None + + data = {"name": "John"} + expected = Person(name=Path("John")) + assert from_dict(Person, data) == expected + + assert dataclass_to_dict(expected) == data diff --git a/pkgs/clan-cli/tests/test_serializers.py b/pkgs/clan-cli/tests/test_serializers.py new file mode 100644 index 000000000..fa6557d90 --- /dev/null +++ b/pkgs/clan-cli/tests/test_serializers.py @@ -0,0 +1,106 @@ +from dataclasses import dataclass, field + +# Functions to test +from clan_cli.api import ( + dataclass_to_dict, + sanitize_string, +) + + +# +def test_sanitize_string() -> None: + # Simple strings + assert sanitize_string("Hello World") == "Hello World" + assert sanitize_string("Hello\nWorld") == "Hello\\nWorld" + assert sanitize_string("Hello\tWorld") == "Hello\\tWorld" + assert sanitize_string("Hello\rWorld") == "Hello\\rWorld" + assert sanitize_string("Hello\fWorld") == "Hello\\fWorld" + assert sanitize_string("Hello\vWorld") == "Hello\\u000bWorld" + assert sanitize_string("Hello\bWorld") == "Hello\\bWorld" + assert sanitize_string("Hello\\World") == "Hello\\\\World" + assert sanitize_string('Hello"World') == 'Hello\\"World' + assert sanitize_string("Hello'World") == "Hello'World" + assert sanitize_string("Hello\0World") == "Hello\\u0000World" + # Console escape characters + + assert sanitize_string("\033[1mBold\033[0m") == "\\u001b[1mBold\\u001b[0m" # Red + assert sanitize_string("\033[31mRed\033[0m") == "\\u001b[31mRed\\u001b[0m" # Blue + assert ( + sanitize_string("\033[42mGreen\033[0m") == "\\u001b[42mGreen\\u001b[0m" + ) # Green + assert sanitize_string("\033[4mUnderline\033[0m") == "\\u001b[4mUnderline\\u001b[0m" + assert ( + sanitize_string("\033[91m\033[1mBold Red\033[0m") + == "\\u001b[91m\\u001b[1mBold Red\\u001b[0m" + ) + + +def test_dataclass_to_dict() -> None: + @dataclass + class Person: + name: str + age: int + + person = Person(name="John", age=25) + expected_dict = {"name": "John", "age": 25} + assert dataclass_to_dict(person) == expected_dict + + +def test_dataclass_to_dict_nested() -> None: + @dataclass + class Address: + city: str = "afghanistan" + zip: str = "01234" + + @dataclass + class Person: + name: str + age: int + address: Address = field(default_factory=Address) + + person1 = Person(name="John", age=25) + expected_dict1 = { + "name": "John", + "age": 25, + "address": {"city": "afghanistan", "zip": "01234"}, + } + # address must be constructed with default values if not passed + assert dataclass_to_dict(person1) == expected_dict1 + + person2 = Person(name="John", age=25, address=Address(zip="0", city="Anywhere")) + expected_dict2 = { + "name": "John", + "age": 25, + "address": {"zip": "0", "city": "Anywhere"}, + } + assert dataclass_to_dict(person2) == expected_dict2 + + +def test_dataclass_to_dict_defaults() -> None: + @dataclass + class Foo: + home: dict[str, str] = field(default_factory=dict) + work: list[str] = field(default_factory=list) + + @dataclass + class Person: + name: str = field(default="jon") + age: int = field(default=1) + foo: Foo = field(default_factory=Foo) + + default_person = Person() + expected_default = { + "name": "jon", + "age": 1, + "foo": {"home": {}, "work": []}, + } + # address must be constructed with default values if not passed + assert dataclass_to_dict(default_person) == expected_default + + real_person = Person(name="John", age=25, foo=Foo(home={"a": "b"}, work=["a", "b"])) + expected = { + "name": "John", + "age": 25, + "foo": {"home": {"a": "b"}, "work": ["a", "b"]}, + } + assert dataclass_to_dict(real_person) == expected