drop old settings code
This commit is contained in:
@@ -1,306 +0,0 @@
|
||||
# !/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, get_origin
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.dirs import machine_settings_file
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.git import commit_file
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
script_dir = Path(__file__).parent
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# nixos option type description to python type
|
||||
def map_type(nix_type: str) -> Any:
|
||||
if nix_type == "boolean":
|
||||
return bool
|
||||
if nix_type in [
|
||||
"integer",
|
||||
"signed integer",
|
||||
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
||||
]:
|
||||
return int
|
||||
if nix_type.startswith("string"):
|
||||
return str
|
||||
if nix_type.startswith("null or "):
|
||||
subtype = nix_type.removeprefix("null or ")
|
||||
return map_type(subtype) | None
|
||||
if nix_type.startswith("attribute set of"):
|
||||
subtype = nix_type.removeprefix("attribute set of ")
|
||||
return dict[str, map_type(subtype)] # type: ignore
|
||||
if nix_type.startswith("list of"):
|
||||
subtype = nix_type.removeprefix("list of ")
|
||||
return list[map_type(subtype)] # type: ignore
|
||||
msg = f"Unknown type {nix_type}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
# merge two dicts recursively
|
||||
def merge(a: dict, b: dict, path: list[str] | None = None) -> dict:
|
||||
a = a.copy()
|
||||
if path is None:
|
||||
path = []
|
||||
for key in b:
|
||||
if key in a:
|
||||
if isinstance(a[key], dict) and isinstance(b[key], dict):
|
||||
merge(a[key], b[key], [*path, str(key)])
|
||||
elif isinstance(a[key], list) and isinstance(b[key], list):
|
||||
a[key].extend(b[key])
|
||||
elif a[key] != b[key]:
|
||||
a[key] = b[key]
|
||||
else:
|
||||
a[key] = b[key]
|
||||
return a
|
||||
|
||||
|
||||
# A container inheriting from list, but overriding __contains__ to return True
|
||||
# for all values.
|
||||
# This is used to allow any value for the "choices" field of argparse
|
||||
class AllContainer(list):
|
||||
def __contains__(self, item: Any) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
# value is always a list, as the arg parser cannot know the type upfront
|
||||
# and therefore always allows multiple arguments.
|
||||
def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||
try:
|
||||
# handle bools
|
||||
if isinstance(input_type, bool):
|
||||
if value[0] in ["true", "True", "yes", "y", "1"]:
|
||||
return True
|
||||
if value[0] in ["false", "False", "no", "n", "0"]:
|
||||
return False
|
||||
msg = f"Invalid value {value} for boolean"
|
||||
raise ClanError(msg)
|
||||
# handle lists
|
||||
if get_origin(input_type) is list:
|
||||
subtype = input_type.__args__[0]
|
||||
return [cast([x], subtype, opt_description) for x in value]
|
||||
# handle dicts
|
||||
if get_origin(input_type) is dict:
|
||||
if not isinstance(value, dict):
|
||||
msg = f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||
raise ClanError(msg)
|
||||
subtype = input_type.__args__[1]
|
||||
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
||||
if str(input_type) == "str | None":
|
||||
if value[0] in ["null", "None"]:
|
||||
return None
|
||||
return value[0]
|
||||
if len(value) > 1:
|
||||
msg = f"Too many values for {opt_description}"
|
||||
raise ClanError(msg)
|
||||
return input_type(value[0])
|
||||
except ValueError as e:
|
||||
msg = f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
|
||||
def options_for_machine(
|
||||
flake_dir: Path, machine_name: str, show_trace: bool = False
|
||||
) -> dict:
|
||||
clan_dir = flake_dir
|
||||
flags = []
|
||||
if show_trace:
|
||||
flags.append("--show-trace")
|
||||
flags.append(
|
||||
f"{clan_dir}#nixosConfigurations.{machine_name}.config.clan.core.optionsNix"
|
||||
)
|
||||
cmd = nix_eval(flags=flags)
|
||||
proc = run(
|
||||
cmd,
|
||||
error_msg=f"Failed to read options for machine {machine_name}",
|
||||
)
|
||||
|
||||
return json.loads(proc.stdout)
|
||||
|
||||
|
||||
def read_machine_option_value(
|
||||
flake_dir: Path, machine_name: str, option: str, show_trace: bool = False
|
||||
) -> str:
|
||||
clan_dir = flake_dir
|
||||
# use nix eval to read from .#nixosConfigurations.default.config.{option}
|
||||
# this will give us the evaluated config with the options attribute
|
||||
cmd = nix_eval(
|
||||
flags=[
|
||||
"--show-trace",
|
||||
f"{clan_dir}#nixosConfigurations.{machine_name}.config.{option}",
|
||||
],
|
||||
)
|
||||
proc = run(cmd, error_msg=f"Failed to read option {option}")
|
||||
|
||||
value = json.loads(proc.stdout)
|
||||
# print the value so that the output can be copied and fed as an input.
|
||||
# for example a list should be displayed as space separated values surrounded by quotes.
|
||||
if isinstance(value, list):
|
||||
out = " ".join([json.dumps(x) for x in value])
|
||||
elif isinstance(value, dict):
|
||||
out = json.dumps(value, indent=2)
|
||||
else:
|
||||
out = json.dumps(value, indent=2)
|
||||
return out
|
||||
|
||||
|
||||
def get_option(args: argparse.Namespace) -> None:
|
||||
print(
|
||||
read_machine_option_value(
|
||||
args.flake, args.machine, args.option, args.show_trace
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# Currently writing is disabled
|
||||
def get_or_set_option(args: argparse.Namespace) -> None:
|
||||
if args.value == []:
|
||||
print(
|
||||
read_machine_option_value(
|
||||
args.flake, args.machine, args.option, args.show_trace
|
||||
)
|
||||
)
|
||||
else:
|
||||
# load options
|
||||
if args.options_file is None:
|
||||
options = options_for_machine(
|
||||
args.flake, machine_name=args.machine, show_trace=args.show_trace
|
||||
)
|
||||
else:
|
||||
with args.options_file.open() as f:
|
||||
options = json.load(f)
|
||||
# compute settings json file location
|
||||
if args.settings_file is None:
|
||||
settings_file = machine_settings_file(args.flake.path, args.machine)
|
||||
else:
|
||||
settings_file = args.settings_file
|
||||
# set the option with the given value
|
||||
set_option(
|
||||
flake_dir=args.flake.path,
|
||||
option=args.option,
|
||||
value=args.value,
|
||||
options=options,
|
||||
settings_file=settings_file,
|
||||
option_description=args.option,
|
||||
show_trace=args.show_trace,
|
||||
)
|
||||
if not args.quiet:
|
||||
new_value = read_machine_option_value(args.flake, args.machine, args.option)
|
||||
print(f"New Value for {args.option}:")
|
||||
print(new_value)
|
||||
|
||||
|
||||
def find_option(
|
||||
option: str, value: Any, options: dict, option_description: str | None = None
|
||||
) -> tuple[str, Any]:
|
||||
"""
|
||||
The option path specified by the user doesn't have to match exactly to an
|
||||
entry in the options.json file. Examples
|
||||
|
||||
Example 1:
|
||||
$ clan config services.openssh.settings.SomeSetting 42
|
||||
This is a freeform option that does not appear in the options.json
|
||||
The actual option is `services.openssh.settings`
|
||||
And the value must be wrapped: {"SomeSettings": 42}
|
||||
|
||||
Example 2:
|
||||
$ clan config users.users.my-user.name my-name
|
||||
The actual option is `users.users.<name>.name`
|
||||
"""
|
||||
|
||||
# option description is used for error messages
|
||||
if option_description is None:
|
||||
option_description = option
|
||||
|
||||
option_path = option.split(".")
|
||||
|
||||
# fuzzy search the option paths, so when
|
||||
# specified option path: "foo.bar.baz.bum"
|
||||
# available option path: "foo.<name>.baz.<name>"
|
||||
# we can still find the option
|
||||
first = option_path[0]
|
||||
regex = rf"({first}|<name>)"
|
||||
for elem in option_path[1:]:
|
||||
regex += rf"\.({elem}|<name>)"
|
||||
for opt in options:
|
||||
if re.match(regex, opt):
|
||||
return opt, value
|
||||
|
||||
# if the regex search did not find the option, start stripping the last
|
||||
# element of the option path and find matching parent option
|
||||
# (see examples above for why this is needed)
|
||||
if len(option_path) == 1:
|
||||
msg = f"Option {option_description} not found"
|
||||
raise ClanError(msg)
|
||||
option_path_parent = option_path[:-1]
|
||||
attr_prefix = option_path[-1]
|
||||
return find_option(
|
||||
option=".".join(option_path_parent),
|
||||
value={attr_prefix: value},
|
||||
options=options,
|
||||
option_description=option_description,
|
||||
)
|
||||
|
||||
|
||||
def set_option(
|
||||
flake_dir: Path,
|
||||
option: str,
|
||||
value: Any,
|
||||
options: dict,
|
||||
settings_file: Path,
|
||||
option_description: str = "",
|
||||
show_trace: bool = False,
|
||||
) -> None:
|
||||
option_path_orig = option.split(".")
|
||||
|
||||
# returns for example:
|
||||
# option: "users.users.<name>.name"
|
||||
# value: "my-name"
|
||||
option, value = find_option(
|
||||
option=option,
|
||||
value=value,
|
||||
options=options,
|
||||
option_description=option_description,
|
||||
)
|
||||
option_path = option.split(".")
|
||||
|
||||
option_path_store = option_path_orig[: len(option_path)]
|
||||
|
||||
target_type = map_type(options[option]["type"])
|
||||
casted = cast(value, target_type, option)
|
||||
|
||||
# construct a nested dict from the option path and set the value
|
||||
result: dict[str, Any] = {}
|
||||
current = result
|
||||
for part in option_path_store[:-1]:
|
||||
current[part] = {}
|
||||
current = current[part]
|
||||
current[option_path_store[-1]] = value
|
||||
|
||||
current[option_path_store[-1]] = casted
|
||||
|
||||
# check if there is an existing config file
|
||||
if settings_file.exists():
|
||||
with settings_file.open() as f:
|
||||
current_config = json.load(f)
|
||||
else:
|
||||
current_config = {}
|
||||
|
||||
# merge and save the new config file
|
||||
new_config = merge(current_config, result)
|
||||
settings_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
with settings_file.open("w") as f:
|
||||
json.dump(new_config, f, indent=2)
|
||||
print(file=f) # add newline at the end of the file to make git happy
|
||||
|
||||
if settings_file.resolve().is_relative_to(flake_dir):
|
||||
commit_file(
|
||||
settings_file,
|
||||
repo_dir=flake_dir,
|
||||
commit_message=f"Set option {option_description}",
|
||||
)
|
||||
@@ -1,109 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
||||
from clan_cli.cmd import Log, run
|
||||
from clan_cli.dirs import machine_settings_file, nixpkgs_source, specific_machine_dir
|
||||
from clan_cli.errors import ClanError, ClanHttpError
|
||||
from clan_cli.git import commit_file
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
|
||||
def verify_machine_config(
|
||||
flake_dir: Path,
|
||||
machine_name: str,
|
||||
config: dict | None = None,
|
||||
) -> str | None:
|
||||
"""
|
||||
Verify that the machine evaluates successfully
|
||||
Returns None, in case of success, or a String containing the error_message
|
||||
"""
|
||||
if config is None:
|
||||
config = config_for_machine(flake_dir, machine_name)
|
||||
flake = flake_dir
|
||||
with NamedTemporaryFile(mode="w", dir=flake) as clan_machine_settings_file:
|
||||
json.dump(config, clan_machine_settings_file, indent=2)
|
||||
clan_machine_settings_file.seek(0)
|
||||
env = os.environ.copy()
|
||||
env["CLAN_MACHINE_SETTINGS_FILE"] = clan_machine_settings_file.name
|
||||
cmd = nix_eval(
|
||||
flags=[
|
||||
"--show-trace",
|
||||
"--impure", # needed to access CLAN_MACHINE_SETTINGS_FILE
|
||||
"--expr",
|
||||
f"""
|
||||
let
|
||||
# hardcoding system for now, not sure where to get it from
|
||||
system = "x86_64-linux";
|
||||
flake = builtins.getFlake (toString {flake});
|
||||
clan-core = flake.inputs.clan-core;
|
||||
nixpkgsSrc = flake.inputs.nixpkgs or {nixpkgs_source()};
|
||||
lib = import (nixpkgsSrc + /lib);
|
||||
pkgs = import nixpkgsSrc {{ inherit system; }};
|
||||
config = lib.importJSON (builtins.getEnv "CLAN_MACHINE_SETTINGS_FILE");
|
||||
fakeMachine = pkgs.nixos {{
|
||||
imports =
|
||||
[
|
||||
clan-core.nixosModules.clanCore
|
||||
# potentially the config might affect submodule options,
|
||||
# therefore we need to import it
|
||||
config
|
||||
{{clan.core.clanDir = {flake};}}
|
||||
]
|
||||
# add all clan modules specified via clanImports
|
||||
++ (map (name: clan-core.clanModules.${{name}}) config.clanImports or []);
|
||||
}};
|
||||
in
|
||||
fakeMachine.config.system.build.vm.outPath
|
||||
""",
|
||||
],
|
||||
)
|
||||
|
||||
proc = run(
|
||||
cmd,
|
||||
cwd=flake,
|
||||
env=env,
|
||||
log=Log.BOTH,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
return proc.stderr
|
||||
return None
|
||||
|
||||
|
||||
def config_for_machine(flake_dir: Path, machine_name: str) -> dict:
|
||||
# read the config from a json file located at {flake}/machines/{machine_name}/settings.json
|
||||
if not specific_machine_dir(flake_dir, machine_name).exists():
|
||||
raise ClanHttpError(
|
||||
msg=f"Machine {machine_name} not found. Create the machine first`",
|
||||
status_code=404,
|
||||
)
|
||||
settings_path = machine_settings_file(flake_dir, machine_name)
|
||||
if not settings_path.exists():
|
||||
return {}
|
||||
with settings_path.open() as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def set_config_for_machine(flake_dir: Path, machine_name: str, config: dict) -> None:
|
||||
hostname_regex = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
|
||||
if not re.match(hostname_regex, machine_name):
|
||||
msg = "Machine name must be a valid hostname"
|
||||
raise ClanError(msg)
|
||||
if "networking" in config and "hostName" in config["networking"]:
|
||||
if machine_name != config["networking"]["hostName"]:
|
||||
raise ClanHttpError(
|
||||
msg="Machine name does not match the 'networking.hostName' setting in the config",
|
||||
status_code=400,
|
||||
)
|
||||
config["networking"]["hostName"] = machine_name
|
||||
# create machine folder if it doesn't exist
|
||||
# write the config to a json file located at {flake}/machines/{machine_name}/settings.json
|
||||
settings_path = machine_settings_file(flake_dir, machine_name)
|
||||
settings_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with settings_path.open("w") as f:
|
||||
json.dump(config, f)
|
||||
|
||||
if flake_dir is not None:
|
||||
commit_file(settings_path, flake_dir)
|
||||
@@ -1,111 +0,0 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from clan_cli.cmd import run
|
||||
from clan_cli.errors import ClanError
|
||||
from clan_cli.nix import nix_eval
|
||||
|
||||
script_dir = Path(__file__).parent
|
||||
|
||||
|
||||
type_map: dict[str, type] = {
|
||||
"array": list,
|
||||
"boolean": bool,
|
||||
"integer": int,
|
||||
"number": float,
|
||||
"string": str,
|
||||
}
|
||||
|
||||
|
||||
def schema_from_module_file(
|
||||
file: str | Path = f"{script_dir}/jsonschema/example-schema.json",
|
||||
) -> dict[str, Any]:
|
||||
absolute_path = Path(file).absolute()
|
||||
# define a nix expression that loads the given module file using lib.evalModules
|
||||
nix_expr = f"""
|
||||
let
|
||||
lib = import <nixpkgs/lib>;
|
||||
slib = import {script_dir}/jsonschema {{inherit lib;}};
|
||||
in
|
||||
slib.parseModule {absolute_path}
|
||||
"""
|
||||
# run the nix expression and parse the output as json
|
||||
cmd = nix_eval(["--expr", nix_expr])
|
||||
proc = run(cmd)
|
||||
return json.loads(proc.stdout)
|
||||
|
||||
|
||||
def subtype_from_schema(schema: dict[str, Any]) -> type:
|
||||
if schema["type"] == "object":
|
||||
if "additionalProperties" in schema:
|
||||
sub_type = subtype_from_schema(schema["additionalProperties"])
|
||||
return dict[str, sub_type] # type: ignore
|
||||
if "properties" in schema:
|
||||
msg = "Nested dicts are not supported"
|
||||
raise ClanError(msg)
|
||||
msg = "Unknown object type"
|
||||
raise ClanError(msg)
|
||||
if schema["type"] == "array":
|
||||
if "items" not in schema:
|
||||
msg = "Untyped arrays are not supported"
|
||||
raise ClanError(msg)
|
||||
sub_type = subtype_from_schema(schema["items"])
|
||||
return list[sub_type] # type: ignore
|
||||
return type_map[schema["type"]]
|
||||
|
||||
|
||||
def type_from_schema_path(
|
||||
schema: dict[str, Any],
|
||||
path: list[str],
|
||||
full_path: list[str] | None = None,
|
||||
) -> type:
|
||||
if full_path is None:
|
||||
full_path = path
|
||||
if len(path) == 0:
|
||||
return subtype_from_schema(schema)
|
||||
if schema["type"] == "object":
|
||||
if "properties" in schema:
|
||||
subtype = type_from_schema_path(schema["properties"][path[0]], path[1:])
|
||||
return subtype
|
||||
if "additionalProperties" in schema:
|
||||
subtype = type_from_schema_path(schema["additionalProperties"], path[1:])
|
||||
return subtype
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
result: dict[str, type] = {}
|
||||
for name, value in schema.get("properties", {}).items():
|
||||
assert isinstance(value, dict)
|
||||
type_ = value["type"]
|
||||
if type_ == "object":
|
||||
# handle additionalProperties
|
||||
if "additionalProperties" in value:
|
||||
sub_type = value["additionalProperties"].get("type")
|
||||
if sub_type not in type_map:
|
||||
msg = f"Unsupported object type {sub_type} (field {name})"
|
||||
raise ClanError(msg)
|
||||
result[f"{name}.<name>"] = type_map[sub_type]
|
||||
continue
|
||||
# handle properties
|
||||
sub_result = options_types_from_schema(value)
|
||||
for sub_name, sub_type in sub_result.items():
|
||||
result[f"{name}.{sub_name}"] = sub_type
|
||||
continue
|
||||
if type_ == "array":
|
||||
if "items" not in value:
|
||||
msg = f"Untyped arrays are not supported (field: {name})"
|
||||
raise ClanError(msg)
|
||||
sub_type = value["items"].get("type")
|
||||
if sub_type not in type_map:
|
||||
msg = f"Unsupported list type {sub_type} (field {name})"
|
||||
raise ClanError(msg)
|
||||
sub_type_: type = type_map[sub_type]
|
||||
result[name] = list[sub_type_] # type: ignore
|
||||
continue
|
||||
result[name] = type_map[type_]
|
||||
return result
|
||||
@@ -1,171 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from clan_cli import config
|
||||
from clan_cli.config import parsing
|
||||
from clan_cli.errors import ClanError
|
||||
|
||||
example_options = f"{Path(config.__file__).parent}/jsonschema/options.json"
|
||||
|
||||
|
||||
def test_walk_jsonschema_all_types() -> None:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"array": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string",
|
||||
},
|
||||
},
|
||||
"boolean": {"type": "boolean"},
|
||||
"integer": {"type": "integer"},
|
||||
"number": {"type": "number"},
|
||||
"string": {"type": "string"},
|
||||
},
|
||||
}
|
||||
expected = {
|
||||
"array": list[str],
|
||||
"boolean": bool,
|
||||
"integer": int,
|
||||
"number": float,
|
||||
"string": str,
|
||||
}
|
||||
assert config.parsing.options_types_from_schema(schema) == expected
|
||||
|
||||
|
||||
def test_walk_jsonschema_nested() -> None:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"first": {"type": "string"},
|
||||
"last": {"type": "string"},
|
||||
},
|
||||
},
|
||||
"age": {"type": "integer"},
|
||||
},
|
||||
}
|
||||
expected = {
|
||||
"age": int,
|
||||
"name.first": str,
|
||||
"name.last": str,
|
||||
}
|
||||
assert config.parsing.options_types_from_schema(schema) == expected
|
||||
|
||||
|
||||
# test walk_jsonschema with dynamic attributes (e.g. "additionalProperties")
|
||||
def test_walk_jsonschema_dynamic_attrs() -> None:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"age": {"type": "integer"},
|
||||
"users": {
|
||||
"type": "object",
|
||||
"additionalProperties": {"type": "string"},
|
||||
},
|
||||
},
|
||||
}
|
||||
expected = {
|
||||
"age": int,
|
||||
"users.<name>": str, # <name> is a placeholder for any string
|
||||
}
|
||||
assert config.parsing.options_types_from_schema(schema) == expected
|
||||
|
||||
|
||||
def test_type_from_schema_path_simple() -> None:
|
||||
schema = {
|
||||
"type": "boolean",
|
||||
}
|
||||
assert parsing.type_from_schema_path(schema, []) is bool
|
||||
|
||||
|
||||
def test_type_from_schema_path_nested() -> None:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"first": {"type": "string"},
|
||||
"last": {"type": "string"},
|
||||
},
|
||||
},
|
||||
"age": {"type": "integer"},
|
||||
},
|
||||
}
|
||||
assert parsing.type_from_schema_path(schema, ["age"]) is int
|
||||
assert parsing.type_from_schema_path(schema, ["name", "first"]) is str
|
||||
|
||||
|
||||
def test_type_from_schema_path_dynamic_attrs() -> None:
|
||||
schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"age": {"type": "integer"},
|
||||
"users": {
|
||||
"type": "object",
|
||||
"additionalProperties": {"type": "string"},
|
||||
},
|
||||
},
|
||||
}
|
||||
assert parsing.type_from_schema_path(schema, ["age"]) is int
|
||||
assert parsing.type_from_schema_path(schema, ["users", "foo"]) is str
|
||||
|
||||
|
||||
def test_map_type() -> None:
|
||||
with pytest.raises(ClanError):
|
||||
config.map_type("foo")
|
||||
assert config.map_type("string") is str
|
||||
assert config.map_type("integer") is int
|
||||
assert config.map_type("boolean") is bool
|
||||
assert config.map_type("attribute set of string") == dict[str, str]
|
||||
assert config.map_type("attribute set of integer") == dict[str, int]
|
||||
assert config.map_type("null or string") == str | None
|
||||
|
||||
|
||||
# test the cast function with simple types
|
||||
def test_cast() -> None:
|
||||
assert (
|
||||
config.cast(value=["true"], input_type=bool, opt_description="foo-option")
|
||||
is True
|
||||
)
|
||||
assert (
|
||||
config.cast(value=["null"], input_type=str | None, opt_description="foo-option")
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
config.cast(value=["bar"], input_type=str | None, opt_description="foo-option")
|
||||
== "bar"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("option", "value", "options", "expected"),
|
||||
[
|
||||
("foo.bar", ["baz"], {"foo.bar": {"type": "str"}}, ("foo.bar", ["baz"])),
|
||||
("foo.bar", ["baz"], {"foo": {"type": "attrs"}}, ("foo", {"bar": ["baz"]})),
|
||||
(
|
||||
"users.users.my-user.name",
|
||||
["my-name"],
|
||||
{"users.users.<name>.name": {"type": "str"}},
|
||||
("users.users.<name>.name", ["my-name"]),
|
||||
),
|
||||
(
|
||||
"foo.bar.baz.bum",
|
||||
["val"],
|
||||
{"foo.<name>.baz": {"type": "attrs"}},
|
||||
("foo.<name>.baz", {"bum": ["val"]}),
|
||||
),
|
||||
(
|
||||
"userIds.DavHau",
|
||||
["42"],
|
||||
{"userIds": {"type": "attrs"}},
|
||||
("userIds", {"DavHau": ["42"]}),
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_find_option(option: str, value: list, options: dict, expected: tuple) -> None:
|
||||
assert config.find_option(option, value, options) == expected
|
||||
Reference in New Issue
Block a user