Files
clan-core/pkgs/clan-cli/openapi.py
Qubasa 0c54a0f6de clan-app: Fix delete tasks
clan-app: Fix delete tasks
2025-07-08 18:11:59 +07:00

221 lines
7.3 KiB
Python

import json
import os
from copy import deepcopy
from pathlib import Path
# !!! IMPORTANT !!!
# AVOID VERBS NOT IN THIS LIST
# IF YOU WANT TO ADD TO THIS LIST CREATE AN ISSUE/DISCUSS FIRST
#
# Verbs are restricted to make API usage intuitive and consistent.
#
# Discouraged verbs:
# do Too vague
# process Sounds generic; lacks clarity.
# generate Ambiguous: does it mutate state or not? Prefer 'run'
# handle Abstract and fuzzy
# show overlaps with get or list
# describe overlap with get or list
# can, is often used for helpers, use check instead for structure responses
COMMON_VERBS = {
"get", # fetch single item
"list", # fetch collection
"create", # instantiate resource
"set", # update or configure
"delete", # remove resource
"open", # initiate session, shell, file, etc.
"check", # validate, probe, or assert
"run", # start imperative task or action; machine-deploy etc.
}
def is_verb(word: str) -> bool:
return word in COMMON_VERBS
def singular(word: str) -> str:
if word.endswith("ies"):
return word[:-3] + "y"
if word.endswith("ses"):
return word[:-2]
if word.endswith("s") and not word.endswith("ss"):
return word[:-1]
return word
def normalize_op_name(op_name: str) -> list[str]:
parts = op_name.lower().split("_")
# parts contains [ VERB NOUN NOUN ... ]
# Where each NOUN is a SUB-RESOURCE
verb = parts[0]
nouns = parts[1:]
if not nouns:
nouns = ["misc"]
# msg = "Operation names MUST have at least one NOUN"
# raise Error(msg)
nouns = [singular(p).capitalize() for p in nouns]
return [verb, *nouns]
def check_operation_name(op_name: str, normalized: list[str]) -> list[str]:
verb = normalized[0]
_nouns = normalized[1:]
warnings = []
if not is_verb(verb):
warnings.append(
f"""Verb '{verb}' of API operation {op_name} is not allowed.
Use one of: {", ".join(COMMON_VERBS)}
"""
)
return warnings
def operation_to_tag(op_name: str) -> str:
normalized = normalize_op_name(op_name)
return " / ".join(normalized[1:])
def fix_nullables(schema: dict) -> dict:
if isinstance(schema, dict):
# If 'oneOf' present
if "oneOf" in schema and isinstance(schema["oneOf"], list):
# Filter out 'type:null' schemas
non_nulls = [s for s in schema["oneOf"] if s.get("type") != "null"]
if len(non_nulls) == 1:
# Only one non-null schema remains - convert to that + nullable:true
new_schema = deepcopy(non_nulls[0])
new_schema["nullable"] = True
# Merge any other keys from original schema except oneOf
for k, v in schema.items():
if k != "oneOf":
new_schema[k] = v
return fix_nullables(new_schema)
# More than one non-null, keep oneOf without nulls
schema["oneOf"] = non_nulls
return {k: fix_nullables(v) for k, v in schema.items()}
# Recursively fix nested schemas
return {k: fix_nullables(v) for k, v in schema.items()}
if isinstance(schema, list):
return [fix_nullables(i) for i in schema]
return schema
def fix_error_refs(schema: dict) -> None:
if isinstance(schema, dict):
for key, value in schema.items():
if key == "$ref" and value == "#/$defs/error":
schema[key] = "#/components/schemas/error"
else:
fix_error_refs(value)
elif isinstance(schema, list):
for item in schema:
fix_error_refs(item)
# === Helper to make reusable schema names ===
def make_schema_name(func_name: str, part: str) -> str:
return f"{func_name}_{part}"
def main() -> None:
input_path = Path(os.environ["INPUT_PATH"])
# === Load input JSON Schema ===
with input_path.open() as f:
schema = json.load(f)
defs = schema.get("$defs", {})
functions = schema["properties"]
# === Start OpenAPI 3.0 spec in JSON ===
openapi = {
"openapi": "3.0.3",
"info": {
"title": "Function-Based Python API",
"version": "1.0.0",
"description": "!!! INTERNAL USE ONLY !!! We don't provide a world usable API yet.\nThis prototype maps python function calls to POST Requests because we are planning towards RESTfull API in the future.",
},
"paths": {},
"components": {"schemas": {}},
}
# === Check all functions ===
warnings: list[str] = []
errors: list[str] = []
for func_name, func_schema in functions.items():
normalized = normalize_op_name(func_name)
check_res = check_operation_name(func_name, normalized)
if check_res:
errors.extend(check_res)
if not func_schema.get("description"):
errors.append(
f"{func_name} doesn't have a description. Python docstring is required for an API function."
)
if warnings:
for message in warnings:
print(f"⚠️ Warn: {message}")
if errors:
for m in errors:
print(f"❌ Error: {m}")
os.abort()
# === Convert each function ===
for func_name, func_schema in functions.items():
args_schema = fix_nullables(deepcopy(func_schema["properties"]["arguments"]))
return_schema = fix_nullables(deepcopy(func_schema["properties"]["return"]))
fix_error_refs(return_schema)
# Register schemas under components
args_name = make_schema_name(func_name, "args")
return_name = make_schema_name(func_name, "return")
openapi["components"]["schemas"][args_name] = args_schema # type: ignore
openapi["components"]["schemas"][return_name] = return_schema # type: ignore
tag = operation_to_tag(func_name)
# Create a POST endpoint for the function
openapi["paths"][f"/{func_name}"] = { # type: ignore
"post": {
"summary": func_name,
"operationId": func_name,
"description": func_schema.get("description"),
"tags": [tag],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{args_name}"}
}
},
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"$ref": f"#/components/schemas/{return_name}"
}
}
},
}
},
}
}
# === Add global definitions from $defs ===
for def_name, def_schema in defs.items():
fixed_schema = fix_nullables(deepcopy(def_schema))
fix_error_refs(fixed_schema)
openapi["components"]["schemas"][def_name] = fixed_schema # type: ignore
# === Write to output JSON ===
with Path("openapi.json").open("w") as f:
json.dump(openapi, f, indent=2)
print("✅ OpenAPI 3.0 JSON written to openapi.json")
if __name__ == "__main__":
main()