Merge pull request 'clan-config: get rid of jsonschema dependency' (#143) from DavHau-clan-config into main
This commit is contained in:
@@ -1,27 +1,17 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Type, Union
|
||||
|
||||
import jsonschema
|
||||
from typing import Any, Optional, Type
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
from . import parsing
|
||||
|
||||
script_dir = Path(__file__).parent
|
||||
|
||||
|
||||
type_map: dict[str, type] = {
|
||||
"array": list,
|
||||
"boolean": bool,
|
||||
"integer": int,
|
||||
"number": float,
|
||||
"string": str,
|
||||
}
|
||||
|
||||
|
||||
class Kwargs:
|
||||
def __init__(self) -> None:
|
||||
self.type: Optional[Type] = None
|
||||
@@ -40,59 +30,6 @@ class AllContainer(list):
|
||||
return True
|
||||
|
||||
|
||||
def schema_from_module_file(
|
||||
file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json",
|
||||
) -> dict[str, Any]:
|
||||
absolute_path = Path(file).absolute()
|
||||
# define a nix expression that loads the given module file using lib.evalModules
|
||||
nix_expr = f"""
|
||||
let
|
||||
lib = import <nixpkgs/lib>;
|
||||
slib = import {script_dir}/jsonschema {{inherit lib;}};
|
||||
in
|
||||
slib.parseModule {absolute_path}
|
||||
"""
|
||||
# run the nix expression and parse the output as json
|
||||
return json.loads(
|
||||
subprocess.check_output(
|
||||
["nix", "eval", "--impure", "--json", "--expr", nix_expr]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]:
|
||||
result: dict[str, Type] = {}
|
||||
for name, value in schema.get("properties", {}).items():
|
||||
assert isinstance(value, dict)
|
||||
type_ = value["type"]
|
||||
if type_ == "object":
|
||||
# handle additionalProperties
|
||||
if "additionalProperties" in value:
|
||||
sub_type = value["additionalProperties"].get("type")
|
||||
if sub_type not in type_map:
|
||||
raise ClanError(
|
||||
f"Unsupported object type {sub_type} (field {name})"
|
||||
)
|
||||
result[f"{name}.<name>"] = type_map[sub_type]
|
||||
continue
|
||||
# handle properties
|
||||
sub_result = options_types_from_schema(value)
|
||||
for sub_name, sub_type in sub_result.items():
|
||||
result[f"{name}.{sub_name}"] = sub_type
|
||||
continue
|
||||
elif type_ == "array":
|
||||
if "items" not in value:
|
||||
raise ClanError(f"Untyped arrays are not supported (field: {name})")
|
||||
sub_type = value["items"].get("type")
|
||||
if sub_type not in type_map:
|
||||
raise ClanError(f"Unsupported list type {sub_type} (field {name})")
|
||||
sub_type_: type = type_map[sub_type]
|
||||
result[name] = list[sub_type_] # type: ignore
|
||||
continue
|
||||
result[name] = type_map[type_]
|
||||
return result
|
||||
|
||||
|
||||
def process_args(args: argparse.Namespace, schema: dict) -> None:
|
||||
option = args.option
|
||||
value_arg = args.value
|
||||
@@ -107,23 +44,19 @@ def process_args(args: argparse.Namespace, schema: dict) -> None:
|
||||
current[option_path[-1]] = value_arg
|
||||
|
||||
# validate the result against the schema and cast the value to the expected type
|
||||
try:
|
||||
jsonschema.validate(result, schema)
|
||||
except jsonschema.ValidationError as e:
|
||||
schema_type = type_map[e.schema["type"]]
|
||||
# we use nargs="+", so we need to unwrap non-list values
|
||||
if isinstance(e.instance, list) and schema_type != list:
|
||||
instance_unwrapped = e.instance[0]
|
||||
else:
|
||||
instance_unwrapped = e.instance
|
||||
# try casting the value to the expected type
|
||||
try:
|
||||
value_casted = schema_type(instance_unwrapped)
|
||||
except TypeError:
|
||||
raise ClanError(
|
||||
f"Invalid value for {'.'.join(e.relative_path)}: {instance_unwrapped} (expected type: {schema_type})"
|
||||
) from e
|
||||
current[option_path[-1]] = value_casted
|
||||
schema_type = parsing.type_from_schema_path(schema, option_path)
|
||||
|
||||
# we use nargs="+", so we need to unwrap non-list values
|
||||
if isinstance(schema_type(), list):
|
||||
subtype = schema_type.__args__[0]
|
||||
casted = [subtype(x) for x in value_arg]
|
||||
elif isinstance(schema_type(), dict):
|
||||
subtype = schema_type.__args__[1]
|
||||
raise ClanError("Dicts are not supported")
|
||||
else:
|
||||
casted = schema_type(value_arg[0])
|
||||
|
||||
current[option_path[-1]] = casted
|
||||
|
||||
# print the result as json
|
||||
print(json.dumps(result, indent=2))
|
||||
@@ -134,7 +67,7 @@ def register_parser(
|
||||
file: Path = Path(f"{script_dir}/jsonschema/example-schema.json"),
|
||||
) -> None:
|
||||
if file.name.endswith(".nix"):
|
||||
schema = schema_from_module_file(file)
|
||||
schema = parsing.schema_from_module_file(file)
|
||||
else:
|
||||
schema = json.loads(file.read_text())
|
||||
return _register_parser(parser, schema)
|
||||
@@ -155,7 +88,7 @@ def _register_parser(
|
||||
parser = argparse.ArgumentParser(description=schema.get("description"))
|
||||
|
||||
# get all possible options from the schema
|
||||
options = options_types_from_schema(schema)
|
||||
options = parsing.options_types_from_schema(schema)
|
||||
|
||||
# inject callback function to process the input later
|
||||
parser.set_defaults(func=lambda args: process_args(args, schema=schema))
|
||||
|
||||
110
pkgs/clan-cli/clan_cli/config/parsing.py
Normal file
110
pkgs/clan-cli/clan_cli/config/parsing.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import json
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Type, Union
|
||||
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
script_dir = Path(__file__).parent
|
||||
|
||||
|
||||
type_map: dict[str, type] = {
|
||||
"array": list,
|
||||
"boolean": bool,
|
||||
"integer": int,
|
||||
"number": float,
|
||||
"string": str,
|
||||
}
|
||||
|
||||
|
||||
def schema_from_module_file(
|
||||
file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json",
|
||||
) -> dict[str, Any]:
|
||||
absolute_path = Path(file).absolute()
|
||||
# define a nix expression that loads the given module file using lib.evalModules
|
||||
nix_expr = f"""
|
||||
let
|
||||
lib = import <nixpkgs/lib>;
|
||||
slib = import {script_dir}/jsonschema {{inherit lib;}};
|
||||
in
|
||||
slib.parseModule {absolute_path}
|
||||
"""
|
||||
# run the nix expression and parse the output as json
|
||||
return json.loads(
|
||||
subprocess.check_output(
|
||||
["nix", "eval", "--impure", "--json", "--expr", nix_expr]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def subtype_from_schema(schema: dict[str, Any]) -> Type:
|
||||
if schema["type"] == "object":
|
||||
if "additionalProperties" in schema:
|
||||
sub_type = subtype_from_schema(schema["additionalProperties"])
|
||||
return dict[str, sub_type] # type: ignore
|
||||
elif "properties" in schema:
|
||||
raise ClanError("Nested dicts are not supported")
|
||||
else:
|
||||
raise ClanError("Unknown object type")
|
||||
elif schema["type"] == "array":
|
||||
if "items" not in schema:
|
||||
raise ClanError("Untyped arrays are not supported")
|
||||
sub_type = subtype_from_schema(schema["items"])
|
||||
return list[sub_type] # type: ignore
|
||||
else:
|
||||
return type_map[schema["type"]]
|
||||
|
||||
|
||||
def type_from_schema_path(
|
||||
schema: dict[str, Any],
|
||||
path: list[str],
|
||||
full_path: Optional[list[str]] = None,
|
||||
) -> Type:
|
||||
if full_path is None:
|
||||
full_path = path
|
||||
if len(path) == 0:
|
||||
return subtype_from_schema(schema)
|
||||
elif schema["type"] == "object":
|
||||
if "properties" in schema:
|
||||
subtype = type_from_schema_path(schema["properties"][path[0]], path[1:])
|
||||
return subtype
|
||||
elif "additionalProperties" in schema:
|
||||
subtype = type_from_schema_path(schema["additionalProperties"], path[1:])
|
||||
return subtype
|
||||
else:
|
||||
raise ClanError(f"Unknown type for path {path}")
|
||||
else:
|
||||
raise ClanError(f"Unknown type for path {path}")
|
||||
|
||||
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]:
|
||||
result: dict[str, Type] = {}
|
||||
for name, value in schema.get("properties", {}).items():
|
||||
assert isinstance(value, dict)
|
||||
type_ = value["type"]
|
||||
if type_ == "object":
|
||||
# handle additionalProperties
|
||||
if "additionalProperties" in value:
|
||||
sub_type = value["additionalProperties"].get("type")
|
||||
if sub_type not in type_map:
|
||||
raise ClanError(
|
||||
f"Unsupported object type {sub_type} (field {name})"
|
||||
)
|
||||
result[f"{name}.<name>"] = type_map[sub_type]
|
||||
continue
|
||||
# handle properties
|
||||
sub_result = options_types_from_schema(value)
|
||||
for sub_name, sub_type in sub_result.items():
|
||||
result[f"{name}.{sub_name}"] = sub_type
|
||||
continue
|
||||
elif type_ == "array":
|
||||
if "items" not in value:
|
||||
raise ClanError(f"Untyped arrays are not supported (field: {name})")
|
||||
sub_type = value["items"].get("type")
|
||||
if sub_type not in type_map:
|
||||
raise ClanError(f"Unsupported list type {sub_type} (field {name})")
|
||||
sub_type_: type = type_map[sub_type]
|
||||
result[name] = list[sub_type_] # type: ignore
|
||||
continue
|
||||
result[name] = type_map[type_]
|
||||
return result
|
||||
@@ -3,7 +3,6 @@
|
||||
, black
|
||||
, bubblewrap
|
||||
, installShellFiles
|
||||
, jsonschema
|
||||
, mypy
|
||||
, nix
|
||||
, openssh
|
||||
@@ -22,7 +21,7 @@
|
||||
, rsync
|
||||
}:
|
||||
let
|
||||
dependencies = [ argcomplete jsonschema ];
|
||||
dependencies = [ argcomplete ];
|
||||
|
||||
testDependencies = [
|
||||
pytest
|
||||
|
||||
@@ -7,6 +7,7 @@ from typing import Any
|
||||
import pytest
|
||||
|
||||
from clan_cli import config
|
||||
from clan_cli.config import parsing
|
||||
|
||||
example_schema = f"{Path(config.__file__).parent}/jsonschema/example-schema.json"
|
||||
|
||||
@@ -65,7 +66,7 @@ def test_walk_jsonschema_all_types() -> None:
|
||||
"number": float,
|
||||
"string": str,
|
||||
}
|
||||
assert config.options_types_from_schema(schema) == expected
|
||||
assert config.parsing.options_types_from_schema(schema) == expected
|
||||
|
||||
|
||||
def test_walk_jsonschema_nested() -> None:
|
||||
@@ -87,7 +88,7 @@ def test_walk_jsonschema_nested() -> None:
|
||||
"name.first": str,
|
||||
"name.last": str,
|
||||
}
|
||||
assert config.options_types_from_schema(schema) == expected
|
||||
assert config.parsing.options_types_from_schema(schema) == expected
|
||||
|
||||
|
||||
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
|
||||
@@ -106,4 +107,44 @@ def test_walk_jsonschema_dynamic_attrs() -> None:
|
||||
"age": int,
|
||||
"users.<name>": str, # <name> is a placeholder for any string
|
||||
}
|
||||
assert config.options_types_from_schema(schema) == expected
|
||||
assert config.parsing.options_types_from_schema(schema) == expected
|
||||
|
||||
|
||||
def test_type_from_schema_path_simple() -> None:
|
||||
schema = dict(
|
||||
type="boolean",
|
||||
)
|
||||
assert parsing.type_from_schema_path(schema, []) == bool
|
||||
|
||||
|
||||
def test_type_from_schema_path_nested() -> None:
|
||||
schema = dict(
|
||||
type="object",
|
||||
properties=dict(
|
||||
name=dict(
|
||||
type="object",
|
||||
properties=dict(
|
||||
first=dict(type="string"),
|
||||
last=dict(type="string"),
|
||||
),
|
||||
),
|
||||
age=dict(type="integer"),
|
||||
),
|
||||
)
|
||||
assert parsing.type_from_schema_path(schema, ["age"]) == int
|
||||
assert parsing.type_from_schema_path(schema, ["name", "first"]) == str
|
||||
|
||||
|
||||
def test_type_from_schema_path_dynamic_attrs() -> None:
|
||||
schema = dict(
|
||||
type="object",
|
||||
properties=dict(
|
||||
age=dict(type="integer"),
|
||||
users=dict(
|
||||
type="object",
|
||||
additionalProperties=dict(type="string"),
|
||||
),
|
||||
),
|
||||
)
|
||||
assert parsing.type_from_schema_path(schema, ["age"]) == int
|
||||
assert parsing.type_from_schema_path(schema, ["users", "foo"]) == str
|
||||
|
||||
Reference in New Issue
Block a user