Merge remote-tracking branch 'origin/main' into rework-installation

This commit is contained in:
Jörg Thalheim
2024-07-30 11:52:36 +02:00
48 changed files with 1385 additions and 562 deletions

View File

@@ -7,10 +7,10 @@ from types import ModuleType
# These imports are unused, but necessary for @API.register to run once.
from clan_cli.api import directory, mdns_discovery, modules
from clan_cli.arg_actions import AppendOptionAction
from clan_cli.clan import show
from clan_cli.clan import show, update
# API endpoints that are not used in the cli.
__all__ = ["directory", "mdns_discovery", "modules"]
__all__ = ["directory", "mdns_discovery", "modules", "update"]
from . import (
backups,

View File

@@ -1,141 +1,22 @@
import dataclasses
import json
from collections.abc import Callable
from dataclasses import dataclass, fields, is_dataclass
from dataclasses import dataclass
from functools import wraps
from inspect import Parameter, Signature, signature
from pathlib import Path
from types import UnionType
from typing import (
Annotated,
Any,
Generic,
Literal,
TypeVar,
get_args,
get_origin,
get_type_hints,
)
from .serde import dataclass_to_dict, from_dict, sanitize_string
__all__ = ["from_dict", "dataclass_to_dict", "sanitize_string"]
from clan_cli.errors import ClanError
def sanitize_string(s: str) -> str:
# Using the native string sanitizer to handle all edge cases
# Remove the outer quotes '"string"'
return json.dumps(s)[1:-1]
def dataclass_to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if is_dataclass(obj):
return {
# Use either the original name or name
sanitize_string(
field.metadata.get("original_name", field.name)
): dataclass_to_dict(getattr(obj, field.name))
for field in fields(obj) # type: ignore
}
elif isinstance(obj, list | tuple):
return [dataclass_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): dataclass_to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return sanitize_string(str(obj))
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
def is_union_type(type_hint: type) -> bool:
return type(type_hint) is UnionType
def get_inner_type(type_hint: type) -> type:
if is_union_type(type_hint):
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint
def get_second_type(type_hint: type[dict]) -> type:
"""
Get the value type of a dictionary type hint
"""
args = get_args(type_hint)
if len(args) == 2:
# Return the second argument, which should be the value type (Machine)
return args[1]
raise ValueError(f"Invalid type hint for dict: {type_hint}")
def from_dict(t: type, data: dict[str, Any] | None) -> Any:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
"""
if data is None:
return None
try:
# Attempt to create an instance of the data_class
field_values = {}
for field in fields(t):
original_name = field.metadata.get("original_name", field.name)
field_value = data.get(original_name)
field_type = get_inner_type(field.type) # type: ignore
if original_name in data:
# If the field is another dataclass, recursively instantiate it
if is_dataclass(field_type):
field_value = from_dict(field_type, field_value)
elif isinstance(field_type, Path | str) and isinstance(
field_value, str
):
field_value = (
Path(field_value) if field_type == Path else field_value
)
elif get_origin(field_type) is dict and isinstance(field_value, dict):
# The field is a dictionary with a specific type
inner_type = get_second_type(field_type)
field_value = {
k: from_dict(inner_type, v) for k, v in field_value.items()
}
elif get_origin is list and isinstance(field_value, list):
# The field is a list with a specific type
inner_type = get_args(field_type)[0]
field_value = [from_dict(inner_type, v) for v in field_value]
# Set the value
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
# Fields with default value
# a: Int = 1
# b: list = Field(default_factory=list)
if original_name in data or field_value is not None:
field_values[field.name] = field_value
else:
# Fields without default value
# a: Int
field_values[field.name] = field_value
return t(**field_values)
except (TypeError, ValueError) as e:
print(f"Failed to instantiate {t.__name__}: {e} {data}")
return None
T = TypeVar("T")
ResponseDataType = TypeVar("ResponseDataType")

View File

@@ -0,0 +1,106 @@
"""
This module provides utility functions for serialization and deserialization of data classes.
Functions:
- sanitize_string(s: str) -> str: Ensures a string is properly escaped for json serializing.
- dataclass_to_dict(obj: Any) -> Any: Converts a data class and its nested data classes, lists, tuples, and dictionaries to dictionaries.
- from_dict(t: type[T], data: Any) -> T: Dynamically instantiates a data class from a dictionary, constructing nested data classes, validates all required fields exist and have the expected type.
Classes:
- TypeAdapter: A Pydantic type adapter for data classes.
Exceptions:
- ValidationError: Raised when there is a validation error during deserialization.
- ClanError: Raised when there is an error during serialization or deserialization.
Dependencies:
- dataclasses: Provides the @dataclass decorator and related functions for creating data classes.
- json: Provides functions for working with JSON data.
- collections.abc: Provides abstract base classes for collections.
- functools: Provides functions for working with higher-order functions and decorators.
- inspect: Provides functions for inspecting live objects.
- operator: Provides functions for working with operators.
- pathlib: Provides classes for working with filesystem paths.
- types: Provides functions for working with types.
- typing: Provides support for type hints.
- pydantic: A library for data validation and settings management.
- pydantic_core: Core functionality for Pydantic.
Note: This module assumes the presence of other modules and classes such as `ClanError` and `ErrorDetails` from the `clan_cli.errors` module.
"""
import json
from dataclasses import dataclass, fields, is_dataclass
from pathlib import Path
from typing import (
Any,
TypeVar,
)
from pydantic import TypeAdapter, ValidationError
from pydantic_core import ErrorDetails
from clan_cli.errors import ClanError
def sanitize_string(s: str) -> str:
# Using the native string sanitizer to handle all edge cases
# Remove the outer quotes '"string"'
return json.dumps(s)[1:-1]
def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
def _to_dict(obj: Any) -> Any:
"""
Utility function to convert dataclasses to dictionaries
It converts all nested dataclasses, lists, tuples, and dictionaries to dictionaries
It does NOT convert member functions.
"""
if is_dataclass(obj):
return {
# Use either the original name or name
sanitize_string(
field.metadata.get("alias", field.name) if use_alias else field.name
): _to_dict(getattr(obj, field.name))
for field in fields(obj)
if not field.name.startswith("_") # type: ignore
}
elif isinstance(obj, list | tuple):
return [_to_dict(item) for item in obj]
elif isinstance(obj, dict):
return {sanitize_string(k): _to_dict(v) for k, v in obj.items()}
elif isinstance(obj, Path):
return sanitize_string(str(obj))
elif isinstance(obj, str):
return sanitize_string(obj)
else:
return obj
return _to_dict(obj)
T = TypeVar("T", bound=dataclass) # type: ignore
def from_dict(t: type[T], data: Any) -> T:
"""
Dynamically instantiate a data class from a dictionary, handling nested data classes.
We use dataclasses. But the deserialization logic of pydantic takes a lot of complexity.
"""
adapter = TypeAdapter(t)
try:
return adapter.validate_python(
data,
)
except ValidationError as e:
fst_error: ErrorDetails = e.errors()[0]
if not fst_error:
raise ClanError(msg=str(e))
msg = fst_error.get("msg")
loc = fst_error.get("loc")
field_path = "Unknown"
if loc:
field_path = str(loc)
raise ClanError(msg=msg, location=f"{t!s}: {field_path}", description=str(e))

View File

@@ -74,7 +74,9 @@ def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) ->
if dataclasses.is_dataclass(t):
fields = dataclasses.fields(t)
properties = {
f.name: type_to_dict(f.type, f"{scope} {t.__name__}.{f.name}", type_map)
f.metadata.get("alias", f.name): type_to_dict(
f.type, f"{scope} {t.__name__}.{f.name}", type_map
)
for f in fields
if not f.name.startswith("_")
}

View File

@@ -14,7 +14,7 @@ from ..vms.inspect import VmConfig, inspect_vm
@dataclass
class FlakeConfig:
flake_url: str | Path
flake_url: FlakeId
flake_attr: str
clan_name: str
@@ -89,7 +89,7 @@ def inspect_flake(flake_url: str | Path, machine_name: str) -> FlakeConfig:
meta = nix_metadata(flake_url)
return FlakeConfig(
vm=vm,
flake_url=flake_url,
flake_url=FlakeId(flake_url),
clan_name=clan_name,
flake_attr=machine_name,
nar_hash=meta["locked"]["narHash"],

View File

@@ -62,7 +62,7 @@ def list_history() -> list[HistoryEntry]:
def new_history_entry(url: str, machine: str) -> HistoryEntry:
flake = inspect_flake(url, machine)
flake.flake_url = str(flake.flake_url)
flake.flake_url = flake.flake_url
return HistoryEntry(
flake=flake,
last_used=datetime.datetime.now().isoformat(),

View File

@@ -16,7 +16,7 @@ def update_history() -> list[HistoryEntry]:
for entry in logs:
try:
meta = nix_metadata(entry.flake.flake_url)
meta = nix_metadata(str(entry.flake.flake_url))
except ClanCmdError as e:
print(f"Failed to update {entry.flake.flake_url}: {e}")
continue
@@ -31,7 +31,7 @@ def update_history() -> list[HistoryEntry]:
machine_name=entry.flake.flake_attr,
)
flake = inspect_flake(uri.get_url(), uri.machine_name)
flake.flake_url = str(flake.flake_url)
flake.flake_url = flake.flake_url
entry = HistoryEntry(
flake=flake, last_used=datetime.datetime.now().isoformat()
)

View File

@@ -153,7 +153,7 @@ class ServiceSingleDisk:
class Service:
borgbackup: dict[str, ServiceBorgbackup] = field(default_factory = dict)
packages: dict[str, ServicePackage] = field(default_factory = dict)
single_disk: dict[str, ServiceSingleDisk] = field(default_factory = dict, metadata = {"original_name": "single-disk"})
single_disk: dict[str, ServiceSingleDisk] = field(default_factory = dict, metadata = {"alias": "single-disk"})
@dataclass

View File

@@ -13,6 +13,7 @@ from ..facts.upload import upload_secrets
from ..machines.machines import Machine
from ..nix import nix_command, nix_metadata
from ..ssh import HostKeyCheck
from ..vars.generate import generate_vars
from .inventory import get_all_machines, get_selected_machines
from .machine_group import MachineGroup
@@ -93,6 +94,7 @@ def deploy_machine(machines: MachineGroup) -> None:
env["NIX_SSHOPTS"] = ssh_arg
generate_facts([machine], None, False)
generate_vars([machine], None, False)
upload_secrets(machine)
path = upload_sources(".", target)