Serde: add unit tests for all serialization and deserialization logic

This commit is contained in:
Johannes Kirschbauer
2024-07-26 14:22:07 +02:00
parent 76e192bd49
commit d7b6fc16a4
6 changed files with 372 additions and 216 deletions

View File

@@ -1,92 +0,0 @@
import dataclasses
import logging
from dataclasses import fields, is_dataclass
from pathlib import Path
from types import UnionType
from typing import Any, get_args
import gi
gi.require_version("WebKit", "6.0")
log = logging.getLogger(__name__)
def sanitize_string(s: str) -> str:
return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if dataclasses.is_dataclass(obj):
return {
sanitize_string(k): dataclass_to_dict(v)
for k, v in dataclasses.asdict(obj).items()
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return str(obj)
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not data:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
field_value = data.get(field.name)
field_type = get_inner_type(field.type)
if field_value is not None:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Field has a default value. We cannot set the value to None
if field_value is not None:
field_values[field.name] = field_value
else:
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e}")
return None

View File

@@ -1,141 +1,22 @@
import dataclasses
import json
from collections.abc import Callable
from dataclasses import dataclass, fields, is_dataclass
from dataclasses import dataclass
from functools import wraps
from inspect import Parameter, Signature, signature
from pathlib import Path
from types import UnionType
from typing import (
Annotated,
Any,
Generic,
Literal,
TypeVar,
get_args,
get_origin,
get_type_hints,
)
from .serde import dataclass_to_dict, from_dict, sanitize_string
__all__ = ["from_dict", "dataclass_to_dict", "sanitize_string"]
from clan_cli.errors import ClanError
def sanitize_string(s: str) -> str:
# Using the native string sanitizer to handle all edge cases
# Remove the outer quotes '"string"'
return json.dumps(s)[1:-1]
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if is_dataclass(obj):
return {
# Use either the original name or name
sanitize_string(
field.metadata.get("original_name", field.name)
): dataclass_to_dict(getattr(obj, field.name))
for field in fields(obj) # type: ignore
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return sanitize_string(str(obj))
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def get_second_type(type_hint: type[dict]) -> type:
"""
Get the value type of a dictionary type hint
"""
args = get_args(type_hint)
if len(args) == 2:
# Return the second argument, which should be the value type (Machine)
return args[1]
raise ValueError(f"Invalid type hint for dict: {type_hint}")
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if data is None:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
original_name = field.metadata.get("original_name", field.name)
field_value = data.get(original_name)
field_type = get_inner_type(field.type) # type: ignore
if original_name in data:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
elif get_origin(field_type) is dict and isinstance(field_value, dict):
# The field is a dictionary with a specific type
inner_type = get_second_type(field_type)
field_value = {
k: from_dict(inner_type, v) for k, v in field_value.items()
}
elif get_origin is list and isinstance(field_value, list):
# The field is a list with a specific type
inner_type = get_args(field_type)[0]
field_value = [from_dict(inner_type, v) for v in field_value]
# Set the value
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Fields with default value
# a: Int = 1
# b: list = Field(default_factory=list)
if original_name in data or field_value is not None:
field_values[field.name] = field_value
else:
# Fields without default value
# a: Int
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e} {data}")
return None
T = TypeVar("T")
ResponseDataType = TypeVar("ResponseDataType")

View File

@@ -0,0 +1,101 @@
"""
This module provides utility functions for serialization and deserialization of data classes.
Functions:
- sanitize_string(s: str) -> str: Ensures a string is properly escaped for json serializing.
- dataclass_to_dict(obj: Any) -> Any: Converts a data class and its nested data classes, lists, tuples, and dictionaries to dictionaries.
- from_dict(t: type[T], data: Any) -> T: Dynamically instantiates a data class from a dictionary, constructing nested data classes, validates all required fields exist and have the expected type.
Classes:
- TypeAdapter: A Pydantic type adapter for data classes.
Exceptions:
- ValidationError: Raised when there is a validation error during deserialization.
- ClanError: Raised when there is an error during serialization or deserialization.
Dependencies:
- dataclasses: Provides the @dataclass decorator and related functions for creating data classes.
- json: Provides functions for working with JSON data.
- collections.abc: Provides abstract base classes for collections.
- functools: Provides functions for working with higher-order functions and decorators.
- inspect: Provides functions for inspecting live objects.
- operator: Provides functions for working with operators.
- pathlib: Provides classes for working with filesystem paths.
- types: Provides functions for working with types.
- typing: Provides support for type hints.
- pydantic: A library for data validation and settings management.
- pydantic_core: Core functionality for Pydantic.
Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module.
"""
import json
from dataclasses import dataclass, fields, is_dataclass
from pathlib import Path
from typing import (
Any,
TypeVar,
)
from pydantic import TypeAdapter, ValidationError
from pydantic_core import ErrorDetails
from clan_cli.errors import ClanError
def sanitize_string(s: str) -> str:
# Using the native string sanitizer to handle all edge cases
# Remove the outer quotes '"string"'
return json.dumps(s)[1:-1]
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if is_dataclass(obj):
return {
# Use either the original name or name
sanitize_string(
field.metadata.get("original_name", field.name)
): dataclass_to_dict(getattr(obj, field.name))
for field in fields(obj)
if not field.name.startswith("_") # type: ignore
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return sanitize_string(str(obj))
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
T = TypeVar("T", bound=dataclass) # type: ignore
def from_dict(t: type[T], data: Any) -> T:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
We use dataclasses. But the deserialization logic of pydantic takes a lot of complexity.
"""
adapter = TypeAdapter(t)
try:
return adapter.validate_python(data)
except ValidationError as e:
fst_error: ErrorDetails = e.errors()[0]
if not fst_error:
raise ClanError(msg=str(e))
msg = fst_error.get("msg")
loc = fst_error.get("loc")
field_path = "Unknown"
if loc:
field_path = str(loc)
raise ClanError(msg=msg, location=f"{t!s}: {field_path}", description=str(e))

View File

@@ -17,6 +17,8 @@
setuptools,
stdenv,
pydantic,
# custom args
clan-core-path,
nixpkgs,
@@ -28,6 +30,7 @@
let
pythonDependencies = [
argcomplete # Enables shell completions
pydantic
];
# load nixpkgs runtime dependencies from a json file

View File

@@ -0,0 +1,157 @@
from dataclasses import dataclass, field
from pathlib import Path
import pytest
# Functions to test
from clan_cli.api import (
dataclass_to_dict,
from_dict,
)
from clan_cli.errors import ClanError
from clan_cli.inventory import (
Inventory,
Machine,
MachineDeploy,
Meta,
Service,
ServiceBorgbackup,
ServiceBorgbackupRole,
ServiceBorgbackupRoleClient,
ServiceBorgbackupRoleServer,
ServiceMeta,
)
def test_simple() -> None:
@dataclass
class Person:
name: str
person_dict = {
"name": "John",
}
expected_person = Person(
name="John",
)
assert from_dict(Person, person_dict) == expected_person
def test_nested() -> None:
@dataclass
class Age:
value: str
@dataclass
class Person:
name: str
# deeply nested dataclasses
age: Age
age_list: list[Age]
age_dict: dict[str, Age]
# Optional field
home: Path | None
person_dict = {
"name": "John",
"age": {
"value": "99",
},
"age_list": [{"value": "66"}, {"value": "77"}],
"age_dict": {"now": {"value": "55"}, "max": {"value": "100"}},
"home": "/home",
}
expected_person = Person(
name="John",
age=Age("99"),
age_list=[Age("66"), Age("77")],
age_dict={"now": Age("55"), "max": Age("100")},
home=Path("/home"),
)
assert from_dict(Person, person_dict) == expected_person
def test_simple_field_missing() -> None:
@dataclass
class Person:
name: str
person_dict = {}
with pytest.raises(ClanError):
from_dict(Person, person_dict)
def test_deserialize_extensive_inventory() -> None:
data = {
"meta": {"name": "superclan", "description": "nice clan"},
"services": {
"borgbackup": {
"instance1": {
"meta": {
"name": "borg1",
},
"roles": {
"client": {},
"server": {},
},
}
},
},
"machines": {"foo": {"name": "foo", "deploy": {}}},
}
expected = Inventory(
meta=Meta(name="superclan", description="nice clan"),
services=Service(
borgbackup={
"instance1": ServiceBorgbackup(
meta=ServiceMeta(name="borg1"),
roles=ServiceBorgbackupRole(
client=ServiceBorgbackupRoleClient(),
server=ServiceBorgbackupRoleServer(),
),
)
}
),
machines={"foo": Machine(deploy=MachineDeploy(), name="foo")},
)
assert from_dict(Inventory, data) == expected
def test_alias_field() -> None:
@dataclass
class Person:
name: str = field(metadata={"alias": "--user-name--"})
data = {"--user-name--": "John"}
expected = Person(name="John")
assert from_dict(Person, data) == expected
def test_path_field() -> None:
@dataclass
class Person:
name: Path
data = {"name": "John"}
expected = Person(name=Path("John"))
assert from_dict(Person, data) == expected
def test_private_public_fields() -> None:
@dataclass
class Person:
name: Path
_name: str | None = None
data = {"name": "John"}
expected = Person(name=Path("John"))
assert from_dict(Person, data) == expected
assert dataclass_to_dict(expected) == data

View File

@@ -0,0 +1,106 @@
from dataclasses import dataclass, field
# Functions to test
from clan_cli.api import (
dataclass_to_dict,
sanitize_string,
)
#
def test_sanitize_string() -> None:
# Simple strings
assert sanitize_string("Hello World") == "Hello World"
assert sanitize_string("Hello\nWorld") == "Hello\\nWorld"
assert sanitize_string("Hello\tWorld") == "Hello\\tWorld"
assert sanitize_string("Hello\rWorld") == "Hello\\rWorld"
assert sanitize_string("Hello\fWorld") == "Hello\\fWorld"
assert sanitize_string("Hello\vWorld") == "Hello\\u000bWorld"
assert sanitize_string("Hello\bWorld") == "Hello\\bWorld"
assert sanitize_string("Hello\\World") == "Hello\\\\World"
assert sanitize_string('Hello"World') == 'Hello\\"World'
assert sanitize_string("Hello'World") == "Hello'World"
assert sanitize_string("Hello\0World") == "Hello\\u0000World"
# Console escape characters
assert sanitize_string("\033[1mBold\033[0m") == "\\u001b[1mBold\\u001b[0m" # Red
assert sanitize_string("\033[31mRed\033[0m") == "\\u001b[31mRed\\u001b[0m" # Blue
assert (
sanitize_string("\033[42mGreen\033[0m") == "\\u001b[42mGreen\\u001b[0m"
) # Green
assert sanitize_string("\033[4mUnderline\033[0m") == "\\u001b[4mUnderline\\u001b[0m"
assert (
sanitize_string("\033[91m\033[1mBold Red\033[0m")
== "\\u001b[91m\\u001b[1mBold Red\\u001b[0m"
)
def test_dataclass_to_dict() -> None:
@dataclass
class Person:
name: str
age: int
person = Person(name="John", age=25)
expected_dict = {"name": "John", "age": 25}
assert dataclass_to_dict(person) == expected_dict
def test_dataclass_to_dict_nested() -> None:
@dataclass
class Address:
city: str = "afghanistan"
zip: str = "01234"
@dataclass
class Person:
name: str
age: int
address: Address = field(default_factory=Address)
person1 = Person(name="John", age=25)
expected_dict1 = {
"name": "John",
"age": 25,
"address": {"city": "afghanistan", "zip": "01234"},
}
# address must be constructed with default values if not passed
assert dataclass_to_dict(person1) == expected_dict1
person2 = Person(name="John", age=25, address=Address(zip="0", city="Anywhere"))
expected_dict2 = {
"name": "John",
"age": 25,
"address": {"zip": "0", "city": "Anywhere"},
}
assert dataclass_to_dict(person2) == expected_dict2
def test_dataclass_to_dict_defaults() -> None:
@dataclass
class Foo:
home: dict[str, str] = field(default_factory=dict)
work: list[str] = field(default_factory=list)
@dataclass
class Person:
name: str = field(default="jon")
age: int = field(default=1)
foo: Foo = field(default_factory=Foo)
default_person = Person()
expected_default = {
"name": "jon",
"age": 1,
"foo": {"home": {}, "work": []},
}
# address must be constructed with default values if not passed
assert dataclass_to_dict(default_person) == expected_default
real_person = Person(name="John", age=25, foo=Foo(home={"a": "b"}, work=["a", "b"]))
expected = {
"name": "John",
"age": 25,
"foo": {"home": {"a": "b"}, "work": ["a", "b"]},
}
assert dataclass_to_dict(real_person) == expected