Deserializer: replace pydantic

This commit is contained in:
Johannes Kirschbauer
2024-07-30 12:15:24 +02:00
parent e1e83883c0
commit 5d8fa57f23
3 changed files with 138 additions and 27 deletions

View File

@@ -29,17 +29,19 @@ Dependencies:
Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module. Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module.
""" """
import dataclasses
import json import json
from dataclasses import dataclass, fields, is_dataclass from dataclasses import dataclass, fields, is_dataclass
from pathlib import Path from pathlib import Path
from types import UnionType
from typing import ( from typing import (
Any, Any,
TypeVar, TypeVar,
Union,
get_args,
get_origin,
) )
from pydantic import TypeAdapter, ValidationError
from pydantic_core import ErrorDetails
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -83,24 +85,136 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
T = TypeVar("T", bound=dataclass) # type: ignore T = TypeVar("T", bound=dataclass) # type: ignore
def from_dict(t: type[T], data: Any) -> T: def is_union_type(type_hint: type | UnionType) -> bool:
""" return (
Dynamically instantiate a data class from a dictionary, handling nested data classes. type(type_hint) is UnionType
We use dataclasses. But the deserialization logic of pydantic takes a lot of complexity. or isinstance(type_hint, UnionType)
""" or get_origin(type_hint) is Union
adapter = TypeAdapter(t) )
try:
return adapter.validate_python(
data,
)
except ValidationError as e:
fst_error: ErrorDetails = e.errors()[0]
if not fst_error:
raise ClanError(msg=str(e))
msg = fst_error.get("msg")
loc = fst_error.get("loc") def is_type_in_union(union_type: type | UnionType, target_type: type) -> bool:
field_path = "Unknown" if get_origin(union_type) is UnionType:
if loc: return any(issubclass(arg, target_type) for arg in get_args(union_type))
field_path = str(loc) return False
raise ClanError(msg=msg, location=f"{t!s}: {field_path}", description=str(e))
def unwrap_none_type(type_hint: type | UnionType) -> type:
"""
Takes a type union and returns the first non-None type.
None | str
=>
str
"""
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint # type: ignore
JsonValue = str | float | dict[str, Any] | list[Any] | None
def construct_field(t: type, field_value: JsonValue, loc: list[str] = []) -> Any:
"""
Construct a field value from a type hint and a field value.
"""
# If the field is another dataclass
# Field_value must be a dictionary
if is_dataclass(t) and isinstance(field_value, dict):
return from_dict(t, field_value)
# If the field expects a path
# Field_value must be a string
elif issubclass(t, Path) or is_type_in_union(t, Path):
if not isinstance(field_value, str):
raise ClanError(
f"Expected string, cannot construct pathlib.Path() from: {field_value} ",
location=f"{loc}",
)
return Path(field_value)
# Trivial values
elif t is str:
if not isinstance(field_value, str):
raise ClanError(f"Expected string, got {field_value}", location=f"{loc}")
return field_value
elif t is int and not isinstance(field_value, str):
return int(field_value) # type: ignore
elif t is float and not isinstance(field_value, str):
return float(field_value) # type: ignore
# Union types construct the first non-None type
elif is_union_type(t):
# Unwrap the union type
t = unwrap_none_type(t)
# Construct the field value
return construct_field(t, field_value)
# Nested types
# list
# dict
elif get_origin(t) is list:
if not isinstance(field_value, list):
raise ClanError(f"Expected list, got {field_value}", location=f"{loc}")
return [construct_field(get_args(t)[0], item) for item in field_value]
elif get_origin(t) is dict and isinstance(field_value, dict):
return {
key: construct_field(get_args(t)[1], value)
for key, value in field_value.items()
}
# Unhandled
else:
raise ClanError(f"Unhandled field type {t} with value {field_value}")
def from_dict(t: type[T], data: dict[str, Any], path: list[str] = []) -> T:
"""
type t MUST be a dataclass
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if not is_dataclass(t):
raise ClanError(f"{t.__name__} is not a dataclass")
# Attempt to create an instance of the data_class#
field_values: dict[str, Any] = {}
required: list[str] = []
for field in fields(t):
if field.name.startswith("_"):
continue
# The first type in a Union
# str <- None | str | Path
field_type: type[Any] = unwrap_none_type(field.type) # type: ignore
data_field_name = field.metadata.get("alias", field.name)
# Check if the field is required
# breakpoint()
if (
field.default is dataclasses.MISSING
and field.default_factory is dataclasses.MISSING
):
required.append(field.name)
# Populate the field_values dictionary with the field value
# if present in the data
if data_field_name in data:
field_value = data.get(data_field_name)
field_values[field.name] = construct_field(field_type, field_value)
# Check that all required field are present.
for field_name in required:
if field_name not in field_values:
formatted_path = " ".join(path)
raise ClanError(
f"Required field missing: '{field_name}' in {t} {formatted_path}, got Value: {data}"
)
return t(**field_values) # type: ignore

View File

@@ -17,8 +17,6 @@
setuptools, setuptools,
stdenv, stdenv,
pydantic,
# custom args # custom args
clan-core-path, clan-core-path,
nixpkgs, nixpkgs,
@@ -30,7 +28,6 @@
let let
pythonDependencies = [ pythonDependencies = [
argcomplete # Enables shell completions argcomplete # Enables shell completions
pydantic # Dataclass deserialisation / validation / schemas
]; ];
# load nixpkgs runtime dependencies from a json file # load nixpkgs runtime dependencies from a json file

View File

@@ -45,11 +45,11 @@ def test_nested() -> None:
class Person: class Person:
name: str name: str
# deeply nested dataclasses # deeply nested dataclasses
home: Path | str | None
age: Age age: Age
age_list: list[Age] age_list: list[Age]
age_dict: dict[str, Age] age_dict: dict[str, Age]
# Optional field # Optional field
home: Path | None
person_dict = { person_dict = {
"name": "John", "name": "John",