enable ASYNC, DTZ, YTT and EM lints
This commit is contained in:
@@ -40,7 +40,8 @@ def map_type(nix_type: str) -> Any:
|
||||
subtype = nix_type.removeprefix("list of ")
|
||||
return list[map_type(subtype)] # type: ignore
|
||||
else:
|
||||
raise ClanError(f"Unknown type {nix_type}")
|
||||
msg = f"Unknown type {nix_type}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
# merge two dicts recursively
|
||||
@@ -79,7 +80,8 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||
elif value[0] in ["false", "False", "no", "n", "0"]:
|
||||
return False
|
||||
else:
|
||||
raise ClanError(f"Invalid value {value} for boolean")
|
||||
msg = f"Invalid value {value} for boolean"
|
||||
raise ClanError(msg)
|
||||
# handle lists
|
||||
elif get_origin(input_type) is list:
|
||||
subtype = input_type.__args__[0]
|
||||
@@ -87,9 +89,8 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||
# handle dicts
|
||||
elif get_origin(input_type) is dict:
|
||||
if not isinstance(value, dict):
|
||||
raise ClanError(
|
||||
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||
)
|
||||
msg = f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||
raise ClanError(msg)
|
||||
subtype = input_type.__args__[1]
|
||||
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
||||
elif str(input_type) == "str | None":
|
||||
@@ -98,12 +99,12 @@ def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||
return value[0]
|
||||
else:
|
||||
if len(value) > 1:
|
||||
raise ClanError(f"Too many values for {opt_description}")
|
||||
msg = f"Too many values for {opt_description}"
|
||||
raise ClanError(msg)
|
||||
return input_type(value[0])
|
||||
except ValueError as e:
|
||||
raise ClanError(
|
||||
f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||
) from e
|
||||
msg = f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||
raise ClanError(msg) from e
|
||||
|
||||
|
||||
def options_for_machine(
|
||||
@@ -237,7 +238,8 @@ def find_option(
|
||||
# element of the option path and find matching parent option
|
||||
# (see examples above for why this is needed)
|
||||
if len(option_path) == 1:
|
||||
raise ClanError(f"Option {option_description} not found")
|
||||
msg = f"Option {option_description} not found"
|
||||
raise ClanError(msg)
|
||||
option_path_parent = option_path[:-1]
|
||||
attr_prefix = option_path[-1]
|
||||
return find_option(
|
||||
|
||||
@@ -89,7 +89,8 @@ def config_for_machine(flake_dir: Path, machine_name: str) -> dict:
|
||||
def set_config_for_machine(flake_dir: Path, machine_name: str, config: dict) -> None:
|
||||
hostname_regex = r"^(?!-)[A-Za-z0-9-]{1,63}(?<!-)$"
|
||||
if not re.match(hostname_regex, machine_name):
|
||||
raise ClanError("Machine name must be a valid hostname")
|
||||
msg = "Machine name must be a valid hostname"
|
||||
raise ClanError(msg)
|
||||
if "networking" in config and "hostName" in config["networking"]:
|
||||
if machine_name != config["networking"]["hostName"]:
|
||||
raise ClanHttpError(
|
||||
|
||||
@@ -42,12 +42,15 @@ def subtype_from_schema(schema: dict[str, Any]) -> type:
|
||||
sub_type = subtype_from_schema(schema["additionalProperties"])
|
||||
return dict[str, sub_type] # type: ignore
|
||||
elif "properties" in schema:
|
||||
raise ClanError("Nested dicts are not supported")
|
||||
msg = "Nested dicts are not supported"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
raise ClanError("Unknown object type")
|
||||
msg = "Unknown object type"
|
||||
raise ClanError(msg)
|
||||
elif schema["type"] == "array":
|
||||
if "items" not in schema:
|
||||
raise ClanError("Untyped arrays are not supported")
|
||||
msg = "Untyped arrays are not supported"
|
||||
raise ClanError(msg)
|
||||
sub_type = subtype_from_schema(schema["items"])
|
||||
return list[sub_type] # type: ignore
|
||||
else:
|
||||
@@ -71,9 +74,11 @@ def type_from_schema_path(
|
||||
subtype = type_from_schema_path(schema["additionalProperties"], path[1:])
|
||||
return subtype
|
||||
else:
|
||||
raise ClanError(f"Unknown type for path {path}")
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
else:
|
||||
raise ClanError(f"Unknown type for path {path}")
|
||||
msg = f"Unknown type for path {path}"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
@@ -86,9 +91,8 @@ def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
if "additionalProperties" in value:
|
||||
sub_type = value["additionalProperties"].get("type")
|
||||
if sub_type not in type_map:
|
||||
raise ClanError(
|
||||
f"Unsupported object type {sub_type} (field {name})"
|
||||
)
|
||||
msg = f"Unsupported object type {sub_type} (field {name})"
|
||||
raise ClanError(msg)
|
||||
result[f"{name}.<name>"] = type_map[sub_type]
|
||||
continue
|
||||
# handle properties
|
||||
@@ -98,10 +102,12 @@ def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
|
||||
continue
|
||||
elif type_ == "array":
|
||||
if "items" not in value:
|
||||
raise ClanError(f"Untyped arrays are not supported (field: {name})")
|
||||
msg = f"Untyped arrays are not supported (field: {name})"
|
||||
raise ClanError(msg)
|
||||
sub_type = value["items"].get("type")
|
||||
if sub_type not in type_map:
|
||||
raise ClanError(f"Unsupported list type {sub_type} (field {name})")
|
||||
msg = f"Unsupported list type {sub_type} (field {name})"
|
||||
raise ClanError(msg)
|
||||
sub_type_: type = type_map[sub_type]
|
||||
result[name] = list[sub_type_] # type: ignore
|
||||
continue
|
||||
|
||||
@@ -110,5 +110,6 @@ def machine_schema(
|
||||
env=env,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
raise ClanError(f"Failed to read schema:\n{proc.stderr}")
|
||||
msg = f"Failed to read schema:\n{proc.stderr}"
|
||||
raise ClanError(msg)
|
||||
return json.loads(proc.stdout)
|
||||
|
||||
Reference in New Issue
Block a user