diff --git a/pkgs/clan-cli/clan_cli/api/__init__.py b/pkgs/clan-cli/clan_cli/api/__init__.py index 7d54871e7..937035fb2 100644 --- a/pkgs/clan-cli/clan_cli/api/__init__.py +++ b/pkgs/clan-cli/clan_cli/api/__init__.py @@ -1,10 +1,12 @@ from collections.abc import Callable from dataclasses import dataclass -from typing import Any, Generic, Literal, TypeVar +from functools import wraps +from typing import Annotated, Any, Generic, Literal, TypeVar, get_type_hints + +from clan_cli.errors import ClanError T = TypeVar("T") - ResponseDataType = TypeVar("ResponseDataType") @@ -16,22 +18,55 @@ class ApiError: @dataclass -class ApiResponse(Generic[ResponseDataType]): - status: Literal["success", "error"] - errors: list[ApiError] | None - data: ResponseDataType | None +class SuccessDataClass(Generic[ResponseDataType]): + status: Annotated[Literal["success"], "The status of the response."] + data: ResponseDataType + + +@dataclass +class ErrorDataClass: + status: Literal["error"] + errors: list[ApiError] + + +ApiResponse = SuccessDataClass[ResponseDataType] | ErrorDataClass class _MethodRegistry: def __init__(self) -> None: + self._orig: dict[str, Callable[[Any], Any]] = {} self._registry: dict[str, Callable[[Any], Any]] = {} def register(self, fn: Callable[..., T]) -> Callable[..., T]: - self._registry[fn.__name__] = fn + self._orig[fn.__name__] = fn + + @wraps(fn) + def wrapper(*args: Any, **kwargs: Any) -> ApiResponse[T]: + try: + data: T = fn(*args, **kwargs) + return SuccessDataClass(status="success", data=data) + except ClanError as e: + return ErrorDataClass( + status="error", + errors=[ + ApiError( + message=e.msg, + description=e.description, + location=[fn.__name__, e.location], + ) + ], + ) + + # @wraps preserves all metadata of fn + # we need to update the annotation, because our wrapper changes the return type + # This overrides the new return type annotation with the generic typeVar filled in + orig_return_type = get_type_hints(fn).get("return") + wrapper.__annotations__["return"] = ApiResponse[orig_return_type] # type: ignore + + self._registry[fn.__name__] = wrapper return fn def to_json_schema(self) -> dict[str, Any]: - # Import only when needed from typing import get_type_hints from clan_cli.api.util import type_to_dict diff --git a/pkgs/clan-cli/clan_cli/api/util.py b/pkgs/clan-cli/clan_cli/api/util.py index 277a273e1..57e02a240 100644 --- a/pkgs/clan-cli/clan_cli/api/util.py +++ b/pkgs/clan-cli/clan_cli/api/util.py @@ -1,20 +1,92 @@ +import copy import dataclasses import pathlib from types import NoneType, UnionType -from typing import Any, Union +from typing import ( + Annotated, + Any, + Literal, + TypeVar, + Union, + get_args, + get_origin, +) -def type_to_dict(t: Any, scope: str = "") -> dict: +class JSchemaTypeError(Exception): + pass + + +# Inspect the fields of the parameterized type +def inspect_dataclass_fields(t: type) -> dict[TypeVar, type]: + """ + Returns a map of type variables to actual types for a parameterized type. + """ + origin = get_origin(t) + type_args = get_args(t) + if origin is None: + return {} + + type_params = origin.__parameters__ + # Create a map from type parameters to actual type arguments + type_map = dict(zip(type_params, type_args)) + + return type_map + + +def apply_annotations(schema: dict[str, Any], annotations: list[Any]) -> dict[str, Any]: + """ + Add metadata from typing.annotations to the json Schema. + The annotations can be a dict, a tuple, or a string and is directly applied to the schema as shown below. + No further validation is done, the caller is responsible for following json-schema. + + Examples + + ```python + # String annotation + Annotated[int, "This is an int"] -> {"type": "integer", "description": "This is an int"} + + # Dict annotation + Annotated[int, {"minimum": 0, "maximum": 10}] -> {"type": "integer", "minimum": 0, "maximum": 10} + + # Tuple annotation + Annotated[int, ("minimum", 0)] -> {"type": "integer", "minimum": 0} + ``` + """ + for annotation in annotations: + if isinstance(annotation, dict): + # Assuming annotation is a dict that can directly apply to the schema + schema.update(annotation) + elif isinstance(annotation, tuple) and len(annotation) == 2: + # Assuming a tuple where first element is a keyword (like 'minLength') and the second is the value + schema[annotation[0]] = annotation[1] + elif isinstance(annotation, str): + # String annotations can be used for description + schema.update({"description": f"{annotation}"}) + return schema + + +def type_to_dict(t: Any, scope: str = "", type_map: dict[TypeVar, type] = {}) -> dict: if t is None: return {"type": "null"} if dataclasses.is_dataclass(t): fields = dataclasses.fields(t) properties = { - f.name: type_to_dict(f.type, f"{scope} {t.__name__}.{f.name}") + f.name: type_to_dict(f.type, f"{scope} {t.__name__}.{f.name}", type_map) for f in fields } - required = [pn for pn, pv in properties.items() if "null" not in pv["type"]] + + required = [] + for pn, pv in properties.items(): + if pv.get("type") is not None: + if "null" not in pv["type"]: + required.append(pn) + + elif pv.get("oneOf") is not None: + if "null" not in [i["type"] for i in pv.get("oneOf", [])]: + required.append(pn) + return { "type": "object", "properties": properties, @@ -22,24 +94,54 @@ def type_to_dict(t: Any, scope: str = "") -> dict: # Dataclasses can only have the specified properties "additionalProperties": False, } + elif type(t) is UnionType: return { - "type": [type_to_dict(arg, scope)["type"] for arg in t.__args__], + "oneOf": [type_to_dict(arg, scope, type_map) for arg in t.__args__], } + if isinstance(t, TypeVar): + # if t is a TypeVar, look up the type in the type_map + # And return the resolved type instead of the TypeVar + resolved = type_map.get(t) + if not resolved: + raise JSchemaTypeError( + f"{scope} - TypeVar {t} not found in type_map, map: {type_map}" + ) + return type_to_dict(type_map.get(t), scope, type_map) + elif hasattr(t, "__origin__"): # Check if it's a generic type - origin = getattr(t, "__origin__", None) + origin = get_origin(t) + args = get_args(t) if origin is None: # Non-generic user-defined or built-in type # TODO: handle custom types - raise BaseException("Unhandled Type: ", origin) + raise JSchemaTypeError("Unhandled Type: ", origin) + + elif origin is Literal: + # Handle Literal values for enums in JSON Schema + return { + "type": "string", + "enum": list(args), # assumes all args are strings + } + + elif origin is Annotated: + base_type, *metadata = get_args(t) + schema = type_to_dict(base_type, scope) # Generate schema for the base type + return apply_annotations(schema, metadata) elif origin is Union: - return {"type": [type_to_dict(arg, scope)["type"] for arg in t.__args__]} + union_types = [type_to_dict(arg, scope, type_map) for arg in t.__args__] + return { + "oneOf": union_types, + } - elif issubclass(origin, list): - return {"type": "array", "items": type_to_dict(t.__args__[0], scope)} + elif origin in {list, set, frozenset}: + return { + "type": "array", + "items": type_to_dict(t.__args__[0], scope, type_map), + } elif issubclass(origin, dict): value_type = t.__args__[1] @@ -48,10 +150,19 @@ def type_to_dict(t: Any, scope: str = "") -> dict: else: return { "type": "object", - "additionalProperties": type_to_dict(value_type, scope), + "additionalProperties": type_to_dict(value_type, scope, type_map), } + # Generic dataclass with type parameters + elif dataclasses.is_dataclass(origin): + # This behavior should mimic the scoping of typeVars in dataclasses + # Once type_to_dict() encounters a TypeVar, it will look up the type in the type_map + # When type_to_dict() returns the map goes out of scope. + # This behaves like a stack, where the type_map is pushed and popped as we traverse the dataclass fields + new_map = copy.deepcopy(type_map) + new_map.update(inspect_dataclass_fields(t)) + return type_to_dict(origin, scope, new_map) - raise BaseException(f"Error api type not yet supported {t!s}") + raise JSchemaTypeError(f"Error api type not yet supported {t!s}") elif isinstance(t, type): if t is str: @@ -65,7 +176,7 @@ def type_to_dict(t: Any, scope: str = "") -> dict: if t is object: return {"type": "object"} if t is Any: - raise BaseException( + raise JSchemaTypeError( f"Usage of the Any type is not supported for API functions. In: {scope}" ) @@ -79,6 +190,6 @@ def type_to_dict(t: Any, scope: str = "") -> dict: if t is NoneType: return {"type": "null"} - raise BaseException(f"Error primitive type not supported {t!s}") + raise JSchemaTypeError(f"Error primitive type not supported {t!s}") else: - raise BaseException(f"Error type not supported {t!s}") + raise JSchemaTypeError(f"Error type not supported {t!s}") diff --git a/pkgs/clan-cli/clan_cli/errors.py b/pkgs/clan-cli/clan_cli/errors.py index 0f01801ad..84c3957f5 100644 --- a/pkgs/clan-cli/clan_cli/errors.py +++ b/pkgs/clan-cli/clan_cli/errors.py @@ -1,4 +1,5 @@ import shutil +from dataclasses import dataclass from math import floor from pathlib import Path @@ -15,25 +16,17 @@ def text_heading(heading: str) -> str: return f"{'=' * filler} {heading} {'=' * filler}" +@dataclass class CmdOut: - def __init__( - self, - stdout: str, - stderr: str, - cwd: Path, - command: str, - returncode: int, - msg: str | None, - ) -> None: - super().__init__() - self.stdout = stdout - self.stderr = stderr - self.cwd = cwd - self.command = command - self.returncode = returncode - self.msg = msg + stdout: str + stderr: str + cwd: Path + command: str + returncode: int + msg: str | None - self.error_str = f""" + def __str__(self) -> str: + error_str = f""" {text_heading(heading="Command")} {self.command} {text_heading(heading="Stderr")} @@ -45,15 +38,30 @@ Message: {self.msg} Working Directory: '{self.cwd}' Return Code: {self.returncode} """ - - def __str__(self) -> str: - return self.error_str + return error_str class ClanError(Exception): """Base class for exceptions in this module.""" - pass + description: str | None + location: str + + def __init__( + self, + msg: str | None = None, + *, + description: str | None = None, + location: str | None = None, + ) -> None: + self.description = description + self.location = location or "Unknown location" + self.msg = msg or "" + if self.description: + exception_msg = f"{self.location}: {self.msg} - {self.description}" + else: + exception_msg = f"{self.location}: {self.msg}" + super().__init__(exception_msg) class ClanHttpError(ClanError): diff --git a/pkgs/clan-cli/clan_cli/flakes/create.py b/pkgs/clan-cli/clan_cli/flakes/create.py index d8aba488a..e698a5ef7 100644 --- a/pkgs/clan-cli/clan_cli/flakes/create.py +++ b/pkgs/clan-cli/clan_cli/flakes/create.py @@ -1,66 +1,90 @@ # !/usr/bin/env python3 import argparse +from dataclasses import dataclass from pathlib import Path +from clan_cli.api import API + from ..cmd import CmdOut, run from ..errors import ClanError from ..nix import nix_command, nix_shell -DEFAULT_URL: str = "git+https://git.clan.lol/clan/clan-core" +DEFAULT_TEMPLATE_URL: str = "git+https://git.clan.lol/clan/clan-core" -def create_flake(directory: Path, url: str) -> dict[str, CmdOut]: +@dataclass +class CreateClanResponse: + git_init: CmdOut + git_add: CmdOut + git_config: CmdOut + flake_update: CmdOut + + +@API.register +def create_clan(directory: Path, template_url: str) -> CreateClanResponse: if not directory.exists(): directory.mkdir() else: - raise ClanError(f"Flake at '{directory}' already exists") - response = {} + raise ClanError( + location=f"{directory.resolve()}", + msg="Cannot create clan", + description="Directory already exists", + ) + + cmd_responses = {} command = nix_command( [ "flake", "init", "-t", - url, + template_url, ] ) out = run(command, cwd=directory) command = nix_shell(["nixpkgs#git"], ["git", "init"]) out = run(command, cwd=directory) - response["git init"] = out + cmd_responses["git init"] = out command = nix_shell(["nixpkgs#git"], ["git", "add", "."]) out = run(command, cwd=directory) - response["git add"] = out + cmd_responses["git add"] = out command = nix_shell(["nixpkgs#git"], ["git", "config", "user.name", "clan-tool"]) out = run(command, cwd=directory) - response["git config"] = out + cmd_responses["git config"] = out command = nix_shell( ["nixpkgs#git"], ["git", "config", "user.email", "clan@example.com"] ) out = run(command, cwd=directory) - response["git config"] = out + cmd_responses["git config"] = out command = ["nix", "flake", "update"] out = run(command, cwd=directory) - response["flake update"] = out + cmd_responses["flake update"] = out + response = CreateClanResponse( + git_init=cmd_responses["git init"], + git_add=cmd_responses["git add"], + git_config=cmd_responses["git config"], + flake_update=cmd_responses["flake update"], + ) return response -def create_flake_command(args: argparse.Namespace) -> None: - create_flake(args.path, args.url) - - -# takes a (sub)parser and configures it def register_create_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument( "--url", type=str, - help="url for the flake", - default=DEFAULT_URL, + help="url to the clan template", + default=DEFAULT_TEMPLATE_URL, ) - parser.add_argument("path", type=Path, help="Path to the flake", default=Path(".")) + parser.add_argument( + "path", type=Path, help="Path to the clan directory", default=Path(".") + ) + + def create_flake_command(args: argparse.Namespace) -> None: + create_clan(args.path, args.url) + parser.set_defaults(func=create_flake_command) diff --git a/pkgs/webview-ui/app/src/Config.tsx b/pkgs/webview-ui/app/src/Config.tsx index e737c1338..cc1ac0ce8 100644 --- a/pkgs/webview-ui/app/src/Config.tsx +++ b/pkgs/webview-ui/app/src/Config.tsx @@ -8,9 +8,8 @@ import { import { OperationResponse, pyApi } from "./message"; export const makeCountContext = () => { - const [machines, setMachines] = createSignal< - OperationResponse<"list_machines"> - >([]); + const [machines, setMachines] = + createSignal>(); const [loading, setLoading] = createSignal(false); pyApi.list_machines.receive((machines) => { @@ -41,7 +40,7 @@ export const CountContext = createContext([ loading: () => false, // eslint-disable-next-line - machines: () => ([]), + machines: () => undefined, }, { // eslint-disable-next-line diff --git a/pkgs/webview-ui/app/src/message.ts b/pkgs/webview-ui/app/src/message.ts index e376d09ac..15f5616ff 100644 --- a/pkgs/webview-ui/app/src/message.ts +++ b/pkgs/webview-ui/app/src/message.ts @@ -72,6 +72,12 @@ const deserialize = // Create the API object const pyApi: PyApi = {} as PyApi; + +pyApi.create_clan.receive((r) => { + if (r.status === "success") { + r.status; + } +}); operationNames.forEach((opName) => { const name = opName as OperationNames; // @ts-expect-error - TODO: Fix this. Typescript is not recognizing the receive function correctly diff --git a/pkgs/webview-ui/app/src/routes/machines/view.tsx b/pkgs/webview-ui/app/src/routes/machines/view.tsx index dff0e5dc3..bb2acab5f 100644 --- a/pkgs/webview-ui/app/src/routes/machines/view.tsx +++ b/pkgs/webview-ui/app/src/routes/machines/view.tsx @@ -1,13 +1,30 @@ -import { For, Match, Switch, createEffect, type Component } from "solid-js"; +import { + For, + Match, + Switch, + createEffect, + createSignal, + type Component, +} from "solid-js"; import { useCountContext } from "../../Config"; import { route } from "@/src/App"; export const MachineListView: Component = () => { const [{ machines, loading }, { getMachines }] = useCountContext(); + const [data, setData] = createSignal([]); createEffect(() => { if (route() === "machines") getMachines(); }); + + createEffect(() => { + const response = machines(); + if (response?.status === "success") { + console.log(response.data); + setData(response.data); + } + }); + return (
@@ -32,12 +49,12 @@ export const MachineListView: Component = () => {
- + No machines found
    - + {(entry) => (