Serde: add unit tests for all serialization and deserialization logic

This commit is contained in:
Johannes Kirschbauer
2024-07-26 14:22:07 +02:00
parent 2b6a5f8385
commit c648e647bf
6 changed files with 372 additions and 216 deletions

View File

@@ -1,92 +0,0 @@
import dataclasses
import logging
from dataclasses import fields, is_dataclass
from pathlib import Path
from types import UnionType
from typing import Any, get_args
import gi
gi.require_version("WebKit", "6.0")
log = logging.getLogger(__name__)
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in dataclasses.asdict(obj).items()
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not data:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type)
if field_value is not None:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Field has a default value. We cannot set the value to None
if field_value is not None:
field_values[field.name] = field_value
else:
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None

View File

@@ -1,141 +1,22 @@
import dataclasses
import json
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass, fields, is_dataclass from dataclasses import dataclass
from functools import wraps from functools import wraps
from inspect import Parameter, Signature, signature from inspect import Parameter, Signature, signature
from pathlib import Path
from types import UnionType
from typing import ( from typing import (
Annotated, Annotated,
Any, Any,
Generic, Generic,
Literal, Literal,
TypeVar, TypeVar,
get_args,
get_origin,
get_type_hints, get_type_hints,
) )
from .serde import dataclass_to_dict, from_dict, sanitize_string
__all__ = ["from_dict", "dataclass_to_dict", "sanitize_string"]
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
def sanitize_string(s: str) -> str:
# Using the native string sanitizer to handle all edge cases
# Remove the outer quotes '"string"'
return json.dumps(s)[1:-1]
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if is_dataclass(obj):
return {
# Use either the original name or name
sanitize_string(
field.metadata.get("original_name", field.name)
): dataclass_to_dict(getattr(obj, field.name))
for field in fields(obj) # type: ignore
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return sanitize_string(str(obj))
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def get_second_type(type_hint: type[dict]) -> type:
"""
Get the value type of a dictionary type hint
"""
args = get_args(type_hint)
if len(args) == 2:
# Return the second argument, which should be the value type (Machine)
return args[1]
raise ValueError(f"Invalid type hint for dict: {type_hint}")
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if data is None:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
original_name = field.metadata.get("original_name", field.name)
field_value = data.get(original_name)
field_type = get_inner_type(field.type) # type: ignore
if original_name in data:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
elif get_origin(field_type) is dict and isinstance(field_value, dict):
# The field is a dictionary with a specific type
inner_type = get_second_type(field_type)
field_value = {
k: from_dict(inner_type, v) for k, v in field_value.items()
}
elif get_origin is list and isinstance(field_value, list):
# The field is a list with a specific type
inner_type = get_args(field_type)[0]
field_value = [from_dict(inner_type, v) for v in field_value]
# Set the value
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Fields with default value
# a: Int = 1
# b: list = Field(default_factory=list)
if original_name in data or field_value is not None:
field_values[field.name] = field_value
else:
# Fields without default value
# a: Int
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e} {data}")
return None
T = TypeVar("T") T = TypeVar("T")
ResponseDataType = TypeVar("ResponseDataType") ResponseDataType = TypeVar("ResponseDataType")

View File

@@ -0,0 +1,101 @@
"""
This module provides utility functions for serialization and deserialization of data classes.
Functions:
- sanitize_string(s: str) -> str: Ensures a string is properly escaped for json serializing.
- dataclass_to_dict(obj: Any) -> Any: Converts a data class and its nested data classes, lists, tuples, and dictionaries to dictionaries.
- from_dict(t: type[T], data: Any) -> T: Dynamically instantiates a data class from a dictionary, constructing nested data classes, validates all required fields exist and have the expected type.
Classes:
- TypeAdapter: A Pydantic type adapter for data classes.
Exceptions:
- ValidationError: Raised when there is a validation error during deserialization.
- ClanError: Raised when there is an error during serialization or deserialization.
Dependencies:
- dataclasses: Provides the @dataclass decorator and related functions for creating data classes.
- json: Provides functions for working with JSON data.
- collections.abc: Provides abstract base classes for collections.
- functools: Provides functions for working with higher-order functions and decorators.
- inspect: Provides functions for inspecting live objects.
- operator: Provides functions for working with operators.
- pathlib: Provides classes for working with filesystem paths.
- types: Provides functions for working with types.
- typing: Provides support for type hints.
- pydantic: A library for data validation and settings management.
- pydantic_core: Core functionality for Pydantic.
Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module.
"""
import json
from dataclasses import dataclass, fields, is_dataclass
from pathlib import Path
from typing import (
Any,
TypeVar,
)
from pydantic import TypeAdapter, ValidationError
from pydantic_core import ErrorDetails
from clan_cli.errors import ClanError
def sanitize_string(s: str) -> str:
# Using the native string sanitizer to handle all edge cases
# Remove the outer quotes '"string"'
return json.dumps(s)[1:-1]
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if is_dataclass(obj):
return {
# Use either the original name or name
sanitize_string(
field.metadata.get("original_name", field.name)
): dataclass_to_dict(getattr(obj, field.name))
for field in fields(obj)
if not field.name.startswith("_") # type: ignore
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return sanitize_string(str(obj))
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
T = TypeVar("T", bound=dataclass) # type: ignore
def from_dict(t: type[T], data: Any) -> T:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
We use dataclasses. But the deserialization logic of pydantic takes a lot of complexity.
"""
adapter = TypeAdapter(t)
try:
return adapter.validate_python(data)
except ValidationError as e:
fst_error: ErrorDetails = e.errors()[0]
if not fst_error:
raise ClanError(msg=str(e))
msg = fst_error.get("msg")
loc = fst_error.get("loc")
field_path = "Unknown"
if loc:
field_path = str(loc)
raise ClanError(msg=msg, location=f"{t!s}: {field_path}", description=str(e))

View File

@@ -17,6 +17,8 @@
setuptools, setuptools,
stdenv, stdenv,
pydantic,
# custom args # custom args
clan-core-path, clan-core-path,
nixpkgs, nixpkgs,
@@ -28,6 +30,7 @@
let let
pythonDependencies = [ pythonDependencies = [
argcomplete # Enables shell completions argcomplete # Enables shell completions
pydantic
]; ];
# load nixpkgs runtime dependencies from a json file # load nixpkgs runtime dependencies from a json file

View File

@@ -0,0 +1,157 @@
from dataclasses import dataclass, field
from pathlib import Path
import pytest
# Functions to test
from clan_cli.api import (
dataclass_to_dict,
from_dict,
)
from clan_cli.errors import ClanError
from clan_cli.inventory import (
Inventory,
Machine,
MachineDeploy,
Meta,
Service,
ServiceBorgbackup,
ServiceBorgbackupRole,
ServiceBorgbackupRoleClient,
ServiceBorgbackupRoleServer,
ServiceMeta,
)
def test_simple() -> None:
@dataclass
class Person:
name: str
person_dict = {
"name": "John",
}
expected_person = Person(
name="John",
)
assert from_dict(Person, person_dict) == expected_person
def test_nested() -> None:
@dataclass
class Age:
value: str
@dataclass
class Person:
name: str
# deeply nested dataclasses
age: Age
age_list: list[Age]
age_dict: dict[str, Age]
# Optional field
home: Path | None
person_dict = {
"name": "John",
"age": {
"value": "99",
},
"age_list": [{"value": "66"}, {"value": "77"}],
"age_dict": {"now": {"value": "55"}, "max": {"value": "100"}},
"home": "/home",
}
expected_person = Person(
name="John",
age=Age("99"),
age_list=[Age("66"), Age("77")],
age_dict={"now": Age("55"), "max": Age("100")},
home=Path("/home"),
)
assert from_dict(Person, person_dict) == expected_person
def test_simple_field_missing() -> None:
@dataclass
class Person:
name: str
person_dict = {}
with pytest.raises(ClanError):
from_dict(Person, person_dict)
def test_deserialize_extensive_inventory() -> None:
data = {
"meta": {"name": "superclan", "description": "nice clan"},
"services": {
"borgbackup": {
"instance1": {
"meta": {
"name": "borg1",
},
"roles": {
"client": {},
"server": {},
},
}
},
},
"machines": {"foo": {"name": "foo", "deploy": {}}},
}
expected = Inventory(
meta=Meta(name="superclan", description="nice clan"),
services=Service(
borgbackup={
"instance1": ServiceBorgbackup(
meta=ServiceMeta(name="borg1"),
roles=ServiceBorgbackupRole(
client=ServiceBorgbackupRoleClient(),
server=ServiceBorgbackupRoleServer(),
),
)
}
),
machines={"foo": Machine(deploy=MachineDeploy(), name="foo")},
)
assert from_dict(Inventory, data) == expected
def test_alias_field() -> None:
@dataclass
class Person:
name: str = field(metadata={"alias": "--user-name--"})
data = {"--user-name--": "John"}
expected = Person(name="John")
assert from_dict(Person, data) == expected
def test_path_field() -> None:
@dataclass
class Person:
name: Path
data = {"name": "John"}
expected = Person(name=Path("John"))
assert from_dict(Person, data) == expected
def test_private_public_fields() -> None:
@dataclass
class Person:
name: Path
_name: str | None = None
data = {"name": "John"}
expected = Person(name=Path("John"))
assert from_dict(Person, data) == expected
assert dataclass_to_dict(expected) == data

View File

@@ -0,0 +1,106 @@
from dataclasses import dataclass, field
# Functions to test
from clan_cli.api import (
dataclass_to_dict,
sanitize_string,
)
#
def test_sanitize_string() -> None:
# Simple strings
assert sanitize_string("Hello World") == "Hello World"
assert sanitize_string("Hello\nWorld") == "Hello\\nWorld"
assert sanitize_string("Hello\tWorld") == "Hello\\tWorld"
assert sanitize_string("Hello\rWorld") == "Hello\\rWorld"
assert sanitize_string("Hello\fWorld") == "Hello\\fWorld"
assert sanitize_string("Hello\vWorld") == "Hello\\u000bWorld"
assert sanitize_string("Hello\bWorld") == "Hello\\bWorld"
assert sanitize_string("Hello\\World") == "Hello\\\\World"
assert sanitize_string('Hello"World') == 'Hello\\"World'
assert sanitize_string("Hello'World") == "Hello'World"
assert sanitize_string("Hello\0World") == "Hello\\u0000World"
# Console escape characters
assert sanitize_string("\033[1mBold\033[0m") == "\\u001b[1mBold\\u001b[0m" # Red
assert sanitize_string("\033[31mRed\033[0m") == "\\u001b[31mRed\\u001b[0m" # Blue
assert (
sanitize_string("\033[42mGreen\033[0m") == "\\u001b[42mGreen\\u001b[0m"
) # Green
assert sanitize_string("\033[4mUnderline\033[0m") == "\\u001b[4mUnderline\\u001b[0m"
assert (
sanitize_string("\033[91m\033[1mBold Red\033[0m")
== "\\u001b[91m\\u001b[1mBold Red\\u001b[0m"
)
def test_dataclass_to_dict() -> None:
@dataclass
class Person:
name: str
age: int
person = Person(name="John", age=25)
expected_dict = {"name": "John", "age": 25}
assert dataclass_to_dict(person) == expected_dict
def test_dataclass_to_dict_nested() -> None:
@dataclass
class Address:
city: str = "afghanistan"
zip: str = "01234"
@dataclass
class Person:
name: str
age: int
address: Address = field(default_factory=Address)
person1 = Person(name="John", age=25)
expected_dict1 = {
"name": "John",
"age": 25,
"address": {"city": "afghanistan", "zip": "01234"},
}
# address must be constructed with default values if not passed
assert dataclass_to_dict(person1) == expected_dict1
person2 = Person(name="John", age=25, address=Address(zip="0", city="Anywhere"))
expected_dict2 = {
"name": "John",
"age": 25,
"address": {"zip": "0", "city": "Anywhere"},
}
assert dataclass_to_dict(person2) == expected_dict2
def test_dataclass_to_dict_defaults() -> None:
@dataclass
class Foo:
home: dict[str, str] = field(default_factory=dict)
work: list[str] = field(default_factory=list)
@dataclass
class Person:
name: str = field(default="jon")
age: int = field(default=1)
foo: Foo = field(default_factory=Foo)
default_person = Person()
expected_default = {
"name": "jon",
"age": 1,
"foo": {"home": {}, "work": []},
}
# address must be constructed with default values if not passed
assert dataclass_to_dict(default_person) == expected_default
real_person = Person(name="John", age=25, foo=Foo(home={"a": "b"}, work=["a", "b"]))
expected = {
"name": "John",
"age": 25,
"foo": {"home": {"a": "b"}, "work": ["a", "b"]},
}
assert dataclass_to_dict(real_person) == expected