From 348e5037897cffc68f1ab0fe6390bef5513a0f1a Mon Sep 17 00:00:00 2001 From: DavHau Date: Tue, 15 Aug 2023 13:24:25 +0200 Subject: [PATCH] clan-config: get rid of jsonschema dependency --- pkgs/clan-cli/clan_cli/config/__init__.py | 103 ++++---------------- pkgs/clan-cli/clan_cli/config/parsing.py | 110 ++++++++++++++++++++++ pkgs/clan-cli/default.nix | 3 +- pkgs/clan-cli/tests/test_config.py | 47 ++++++++- 4 files changed, 173 insertions(+), 90 deletions(-) create mode 100644 pkgs/clan-cli/clan_cli/config/parsing.py diff --git a/pkgs/clan-cli/clan_cli/config/__init__.py b/pkgs/clan-cli/clan_cli/config/__init__.py index 3e9c36ddf..aef2e45be 100644 --- a/pkgs/clan-cli/clan_cli/config/__init__.py +++ b/pkgs/clan-cli/clan_cli/config/__init__.py @@ -1,27 +1,17 @@ # !/usr/bin/env python3 import argparse import json -import subprocess import sys from pathlib import Path -from typing import Any, Optional, Type, Union - -import jsonschema +from typing import Any, Optional, Type from clan_cli.errors import ClanError +from . import parsing + script_dir = Path(__file__).parent -type_map: dict[str, type] = { - "array": list, - "boolean": bool, - "integer": int, - "number": float, - "string": str, -} - - class Kwargs: def __init__(self) -> None: self.type: Optional[Type] = None @@ -40,59 +30,6 @@ class AllContainer(list): return True -def schema_from_module_file( - file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json", -) -> dict[str, Any]: - absolute_path = Path(file).absolute() - # define a nix expression that loads the given module file using lib.evalModules - nix_expr = f""" - let - lib = import ; - slib = import {script_dir}/jsonschema {{inherit lib;}}; - in - slib.parseModule {absolute_path} - """ - # run the nix expression and parse the output as json - return json.loads( - subprocess.check_output( - ["nix", "eval", "--impure", "--json", "--expr", nix_expr] - ) - ) - - -def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]: - result: dict[str, Type] = {} - for name, value in schema.get("properties", {}).items(): - assert isinstance(value, dict) - type_ = value["type"] - if type_ == "object": - # handle additionalProperties - if "additionalProperties" in value: - sub_type = value["additionalProperties"].get("type") - if sub_type not in type_map: - raise ClanError( - f"Unsupported object type {sub_type} (field {name})" - ) - result[f"{name}."] = type_map[sub_type] - continue - # handle properties - sub_result = options_types_from_schema(value) - for sub_name, sub_type in sub_result.items(): - result[f"{name}.{sub_name}"] = sub_type - continue - elif type_ == "array": - if "items" not in value: - raise ClanError(f"Untyped arrays are not supported (field: {name})") - sub_type = value["items"].get("type") - if sub_type not in type_map: - raise ClanError(f"Unsupported list type {sub_type} (field {name})") - sub_type_: type = type_map[sub_type] - result[name] = list[sub_type_] # type: ignore - continue - result[name] = type_map[type_] - return result - - def process_args(args: argparse.Namespace, schema: dict) -> None: option = args.option value_arg = args.value @@ -107,23 +44,19 @@ def process_args(args: argparse.Namespace, schema: dict) -> None: current[option_path[-1]] = value_arg # validate the result against the schema and cast the value to the expected type - try: - jsonschema.validate(result, schema) - except jsonschema.ValidationError as e: - schema_type = type_map[e.schema["type"]] - # we use nargs="+", so we need to unwrap non-list values - if isinstance(e.instance, list) and schema_type != list: - instance_unwrapped = e.instance[0] - else: - instance_unwrapped = e.instance - # try casting the value to the expected type - try: - value_casted = schema_type(instance_unwrapped) - except TypeError: - raise ClanError( - f"Invalid value for {'.'.join(e.relative_path)}: {instance_unwrapped} (expected type: {schema_type})" - ) from e - current[option_path[-1]] = value_casted + schema_type = parsing.type_from_schema_path(schema, option_path) + + # we use nargs="+", so we need to unwrap non-list values + if isinstance(schema_type(), list): + subtype = schema_type.__args__[0] + casted = [subtype(x) for x in value_arg] + elif isinstance(schema_type(), dict): + subtype = schema_type.__args__[1] + raise ClanError("Dicts are not supported") + else: + casted = schema_type(value_arg[0]) + + current[option_path[-1]] = casted # print the result as json print(json.dumps(result, indent=2)) @@ -134,7 +67,7 @@ def register_parser( file: Path = Path(f"{script_dir}/jsonschema/example-schema.json"), ) -> None: if file.name.endswith(".nix"): - schema = schema_from_module_file(file) + schema = parsing.schema_from_module_file(file) else: schema = json.loads(file.read_text()) return _register_parser(parser, schema) @@ -155,7 +88,7 @@ def _register_parser( parser = argparse.ArgumentParser(description=schema.get("description")) # get all possible options from the schema - options = options_types_from_schema(schema) + options = parsing.options_types_from_schema(schema) # inject callback function to process the input later parser.set_defaults(func=lambda args: process_args(args, schema=schema)) diff --git a/pkgs/clan-cli/clan_cli/config/parsing.py b/pkgs/clan-cli/clan_cli/config/parsing.py new file mode 100644 index 000000000..92b476b99 --- /dev/null +++ b/pkgs/clan-cli/clan_cli/config/parsing.py @@ -0,0 +1,110 @@ +import json +import subprocess +from pathlib import Path +from typing import Any, Optional, Type, Union + +from clan_cli.errors import ClanError + +script_dir = Path(__file__).parent + + +type_map: dict[str, type] = { + "array": list, + "boolean": bool, + "integer": int, + "number": float, + "string": str, +} + + +def schema_from_module_file( + file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json", +) -> dict[str, Any]: + absolute_path = Path(file).absolute() + # define a nix expression that loads the given module file using lib.evalModules + nix_expr = f""" + let + lib = import ; + slib = import {script_dir}/jsonschema {{inherit lib;}}; + in + slib.parseModule {absolute_path} + """ + # run the nix expression and parse the output as json + return json.loads( + subprocess.check_output( + ["nix", "eval", "--impure", "--json", "--expr", nix_expr] + ) + ) + + +def subtype_from_schema(schema: dict[str, Any]) -> Type: + if schema["type"] == "object": + if "additionalProperties" in schema: + sub_type = subtype_from_schema(schema["additionalProperties"]) + return dict[str, sub_type] # type: ignore + elif "properties" in schema: + raise ClanError("Nested dicts are not supported") + else: + raise ClanError("Unknown object type") + elif schema["type"] == "array": + if "items" not in schema: + raise ClanError("Untyped arrays are not supported") + sub_type = subtype_from_schema(schema["items"]) + return list[sub_type] # type: ignore + else: + return type_map[schema["type"]] + + +def type_from_schema_path( + schema: dict[str, Any], + path: list[str], + full_path: Optional[list[str]] = None, +) -> Type: + if full_path is None: + full_path = path + if len(path) == 0: + return subtype_from_schema(schema) + elif schema["type"] == "object": + if "properties" in schema: + subtype = type_from_schema_path(schema["properties"][path[0]], path[1:]) + return subtype + elif "additionalProperties" in schema: + subtype = type_from_schema_path(schema["additionalProperties"], path[1:]) + return subtype + else: + raise ClanError(f"Unknown type for path {path}") + else: + raise ClanError(f"Unknown type for path {path}") + + +def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]: + result: dict[str, Type] = {} + for name, value in schema.get("properties", {}).items(): + assert isinstance(value, dict) + type_ = value["type"] + if type_ == "object": + # handle additionalProperties + if "additionalProperties" in value: + sub_type = value["additionalProperties"].get("type") + if sub_type not in type_map: + raise ClanError( + f"Unsupported object type {sub_type} (field {name})" + ) + result[f"{name}."] = type_map[sub_type] + continue + # handle properties + sub_result = options_types_from_schema(value) + for sub_name, sub_type in sub_result.items(): + result[f"{name}.{sub_name}"] = sub_type + continue + elif type_ == "array": + if "items" not in value: + raise ClanError(f"Untyped arrays are not supported (field: {name})") + sub_type = value["items"].get("type") + if sub_type not in type_map: + raise ClanError(f"Unsupported list type {sub_type} (field {name})") + sub_type_: type = type_map[sub_type] + result[name] = list[sub_type_] # type: ignore + continue + result[name] = type_map[type_] + return result diff --git a/pkgs/clan-cli/default.nix b/pkgs/clan-cli/default.nix index 5a6cda126..cf3101df2 100644 --- a/pkgs/clan-cli/default.nix +++ b/pkgs/clan-cli/default.nix @@ -3,7 +3,6 @@ , black , bubblewrap , installShellFiles -, jsonschema , mypy , nix , openssh @@ -22,7 +21,7 @@ , rsync }: let - dependencies = [ argcomplete jsonschema ]; + dependencies = [ argcomplete ]; testDependencies = [ pytest diff --git a/pkgs/clan-cli/tests/test_config.py b/pkgs/clan-cli/tests/test_config.py index c18bf0143..82eb8246a 100644 --- a/pkgs/clan-cli/tests/test_config.py +++ b/pkgs/clan-cli/tests/test_config.py @@ -7,6 +7,7 @@ from typing import Any import pytest from clan_cli import config +from clan_cli.config import parsing example_schema = f"{Path(config.__file__).parent}/jsonschema/example-schema.json" @@ -65,7 +66,7 @@ def test_walk_jsonschema_all_types() -> None: "number": float, "string": str, } - assert config.options_types_from_schema(schema) == expected + assert config.parsing.options_types_from_schema(schema) == expected def test_walk_jsonschema_nested() -> None: @@ -87,7 +88,7 @@ def test_walk_jsonschema_nested() -> None: "name.first": str, "name.last": str, } - assert config.options_types_from_schema(schema) == expected + assert config.parsing.options_types_from_schema(schema) == expected # test walk_jsonschema with dynamic attributes (e.g. "additionalProperties") @@ -106,4 +107,44 @@ def test_walk_jsonschema_dynamic_attrs() -> None: "age": int, "users.": str, # is a placeholder for any string } - assert config.options_types_from_schema(schema) == expected + assert config.parsing.options_types_from_schema(schema) == expected + + +def test_type_from_schema_path_simple() -> None: + schema = dict( + type="boolean", + ) + assert parsing.type_from_schema_path(schema, []) == bool + + +def test_type_from_schema_path_nested() -> None: + schema = dict( + type="object", + properties=dict( + name=dict( + type="object", + properties=dict( + first=dict(type="string"), + last=dict(type="string"), + ), + ), + age=dict(type="integer"), + ), + ) + assert parsing.type_from_schema_path(schema, ["age"]) == int + assert parsing.type_from_schema_path(schema, ["name", "first"]) == str + + +def test_type_from_schema_path_dynamic_attrs() -> None: + schema = dict( + type="object", + properties=dict( + age=dict(type="integer"), + users=dict( + type="object", + additionalProperties=dict(type="string"), + ), + ), + ) + assert parsing.type_from_schema_path(schema, ["age"]) == int + assert parsing.type_from_schema_path(schema, ["users", "foo"]) == str