Merge pull request 'Add check_valid_clan and open_clan_folder api requests' (#4295) from Qubasa/clan-core:impl_open_clan into main
Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4295
This commit is contained in:
@@ -23,9 +23,6 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"path": "../clan-cli/clan_lib"
|
"path": "../clan-cli/clan_lib"
|
||||||
},
|
|
||||||
{
|
|
||||||
"path": "ui-2d"
|
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"settings": {
|
"settings": {
|
||||||
|
|||||||
@@ -1,10 +1,13 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from clan_lib.api import ApiResponse
|
from clan_lib.api import ApiResponse
|
||||||
|
from clan_lib.api.tasks import WebThread
|
||||||
|
from clan_lib.async_run import set_should_cancel
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .middleware import Middleware
|
from .middleware import Middleware
|
||||||
@@ -32,6 +35,7 @@ class ApiBridge(ABC):
|
|||||||
"""Generic interface for API bridges that can handle method calls from different sources."""
|
"""Generic interface for API bridges that can handle method calls from different sources."""
|
||||||
|
|
||||||
middleware_chain: tuple["Middleware", ...]
|
middleware_chain: tuple["Middleware", ...]
|
||||||
|
threads: dict[str, WebThread] = field(default_factory=dict)
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def send_api_response(self, response: BackendResponse) -> None:
|
def send_api_response(self, response: BackendResponse) -> None:
|
||||||
@@ -87,3 +91,50 @@ class ApiBridge(ABC):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.send_api_response(response)
|
self.send_api_response(response)
|
||||||
|
|
||||||
|
def process_request_in_thread(
|
||||||
|
self,
|
||||||
|
request: BackendRequest,
|
||||||
|
*,
|
||||||
|
thread_name: str = "ApiBridgeThread",
|
||||||
|
wait_for_completion: bool = False,
|
||||||
|
timeout: float = 60.0,
|
||||||
|
) -> None:
|
||||||
|
"""Process an API request in a separate thread with cancellation support.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
request: The API request to process
|
||||||
|
thread_name: Name for the thread (for debugging)
|
||||||
|
wait_for_completion: Whether to wait for the thread to complete
|
||||||
|
timeout: Timeout in seconds when waiting for completion
|
||||||
|
"""
|
||||||
|
op_key = request.op_key or "unknown"
|
||||||
|
|
||||||
|
def thread_task(stop_event: threading.Event) -> None:
|
||||||
|
set_should_cancel(lambda: stop_event.is_set())
|
||||||
|
try:
|
||||||
|
log.debug(
|
||||||
|
f"Processing {request.method_name} with args {request.args} "
|
||||||
|
f"and header {request.header} in thread {thread_name}"
|
||||||
|
)
|
||||||
|
self.process_request(request)
|
||||||
|
finally:
|
||||||
|
self.threads.pop(op_key, None)
|
||||||
|
|
||||||
|
stop_event = threading.Event()
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=thread_task, args=(stop_event,), name=thread_name
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
|
||||||
|
|
||||||
|
if wait_for_completion:
|
||||||
|
# Wait for the thread to complete (this blocks until response is sent)
|
||||||
|
thread.join(timeout=timeout)
|
||||||
|
|
||||||
|
# Handle timeout
|
||||||
|
if thread.is_alive():
|
||||||
|
stop_event.set() # Cancel the thread
|
||||||
|
self.send_api_error_response(
|
||||||
|
op_key, "Request timeout", ["api_bridge", request.method_name]
|
||||||
|
)
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ gi.require_version("Gtk", "4.0")
|
|||||||
|
|
||||||
from clan_lib.api import ApiError, ErrorDataClass, SuccessDataClass
|
from clan_lib.api import ApiError, ErrorDataClass, SuccessDataClass
|
||||||
from clan_lib.api.directory import FileRequest
|
from clan_lib.api.directory import FileRequest
|
||||||
|
from clan_lib.clan.check import check_clan_valid
|
||||||
|
from clan_lib.flake import Flake
|
||||||
from gi.repository import Gio, GLib, Gtk
|
from gi.repository import Gio, GLib, Gtk
|
||||||
|
|
||||||
gi.require_version("Gtk", "4.0")
|
gi.require_version("Gtk", "4.0")
|
||||||
@@ -22,13 +24,58 @@ def remove_none(_list: list) -> list:
|
|||||||
RESULT: dict[str, SuccessDataClass[list[str] | None] | ErrorDataClass] = {}
|
RESULT: dict[str, SuccessDataClass[list[str] | None] | ErrorDataClass] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def open_clan_folder(*, op_key: str) -> SuccessDataClass[Flake] | ErrorDataClass:
|
||||||
|
"""
|
||||||
|
Opens the clan folder using the GTK file dialog.
|
||||||
|
Returns the path to the clan folder or an error if it fails.
|
||||||
|
"""
|
||||||
|
file_request = FileRequest(
|
||||||
|
mode="select_folder",
|
||||||
|
title="Select Clan Folder",
|
||||||
|
initial_folder=str(Path.home()),
|
||||||
|
)
|
||||||
|
response = open_file(file_request, op_key=op_key)
|
||||||
|
|
||||||
|
if isinstance(response, ErrorDataClass):
|
||||||
|
return response
|
||||||
|
|
||||||
|
if not response.data or len(response.data) == 0:
|
||||||
|
return ErrorDataClass(
|
||||||
|
op_key=op_key,
|
||||||
|
status="error",
|
||||||
|
errors=[
|
||||||
|
ApiError(
|
||||||
|
message="No folder selected",
|
||||||
|
description="You must select a folder to open.",
|
||||||
|
location=["open_clan_folder"],
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
clan_folder = Flake(response.data[0])
|
||||||
|
if not check_clan_valid(clan_folder):
|
||||||
|
return ErrorDataClass(
|
||||||
|
op_key=op_key,
|
||||||
|
status="error",
|
||||||
|
errors=[
|
||||||
|
ApiError(
|
||||||
|
message="Invalid clan folder",
|
||||||
|
description=f"The selected folder '{clan_folder}' is not a valid clan folder.",
|
||||||
|
location=["open_clan_folder"],
|
||||||
|
)
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return SuccessDataClass(op_key=op_key, data=clan_folder, status="success")
|
||||||
|
|
||||||
|
|
||||||
def open_file(
|
def open_file(
|
||||||
file_request: FileRequest, *, op_key: str
|
file_request: FileRequest, *, op_key: str
|
||||||
) -> SuccessDataClass[list[str] | None] | ErrorDataClass:
|
) -> SuccessDataClass[list[str] | None] | ErrorDataClass:
|
||||||
GLib.idle_add(gtk_open_file, file_request, op_key)
|
GLib.idle_add(gtk_open_file, file_request, op_key)
|
||||||
|
|
||||||
while RESULT.get(op_key) is None:
|
while RESULT.get(op_key) is None:
|
||||||
time.sleep(0.2)
|
time.sleep(0.1)
|
||||||
response = RESULT[op_key]
|
response = RESULT[op_key]
|
||||||
del RESULT[op_key]
|
del RESULT[op_key]
|
||||||
return response
|
return response
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from clan_lib.dirs import user_data_dir
|
|||||||
from clan_lib.log_manager import LogGroupConfig, LogManager
|
from clan_lib.log_manager import LogGroupConfig, LogManager
|
||||||
from clan_lib.log_manager import api as log_manager_api
|
from clan_lib.log_manager import api as log_manager_api
|
||||||
|
|
||||||
from clan_app.api.file_gtk import open_file
|
from clan_app.api.file_gtk import open_clan_folder, open_file
|
||||||
from clan_app.api.middleware import (
|
from clan_app.api.middleware import (
|
||||||
ArgumentParsingMiddleware,
|
ArgumentParsingMiddleware,
|
||||||
LoggingMiddleware,
|
LoggingMiddleware,
|
||||||
@@ -56,7 +56,10 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
|||||||
|
|
||||||
# Populate the API global with all functions
|
# Populate the API global with all functions
|
||||||
load_in_all_api_functions()
|
load_in_all_api_functions()
|
||||||
API.overwrite_fn(open_file)
|
|
||||||
|
# Create a shared threads dictionary for both HTTP and Webview modes
|
||||||
|
shared_threads: dict[str, tasks.WebThread] = {}
|
||||||
|
tasks.BAKEND_THREADS = shared_threads
|
||||||
|
|
||||||
# Start HTTP API server if requested
|
# Start HTTP API server if requested
|
||||||
http_server = None
|
http_server = None
|
||||||
@@ -72,6 +75,7 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
|||||||
swagger_dist=Path(swagger_dist) if swagger_dist else None,
|
swagger_dist=Path(swagger_dist) if swagger_dist else None,
|
||||||
host=app_opts.http_host,
|
host=app_opts.http_host,
|
||||||
port=app_opts.http_port,
|
port=app_opts.http_port,
|
||||||
|
shared_threads=shared_threads,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Add middleware to HTTP server
|
# Add middleware to HTTP server
|
||||||
@@ -103,20 +107,20 @@ def app_run(app_opts: ClanAppOptions) -> int:
|
|||||||
# Create webview if not running in HTTP-only mode
|
# Create webview if not running in HTTP-only mode
|
||||||
if not app_opts.http_api:
|
if not app_opts.http_api:
|
||||||
webview = Webview(
|
webview = Webview(
|
||||||
debug=app_opts.debug, title="Clan App", size=Size(1280, 1024, SizeHint.NONE)
|
debug=app_opts.debug,
|
||||||
|
title="Clan App",
|
||||||
|
size=Size(1280, 1024, SizeHint.NONE),
|
||||||
|
shared_threads=shared_threads,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
API.overwrite_fn(open_file)
|
||||||
|
API.overwrite_fn(open_clan_folder)
|
||||||
|
|
||||||
# Add middleware to the webview
|
# Add middleware to the webview
|
||||||
webview.add_middleware(ArgumentParsingMiddleware(api=API))
|
webview.add_middleware(ArgumentParsingMiddleware(api=API))
|
||||||
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
|
webview.add_middleware(LoggingMiddleware(log_manager=log_manager))
|
||||||
webview.add_middleware(MethodExecutionMiddleware(api=API))
|
webview.add_middleware(MethodExecutionMiddleware(api=API))
|
||||||
|
|
||||||
# Create the bridge
|
|
||||||
webview.create_bridge()
|
|
||||||
|
|
||||||
# Init BAKEND_THREADS global in tasks module
|
|
||||||
tasks.BAKEND_THREADS = webview.threads
|
|
||||||
|
|
||||||
webview.bind_jsonschema_api(API, log_manager=log_manager)
|
webview.bind_jsonschema_api(API, log_manager=log_manager)
|
||||||
webview.navigate(content_uri)
|
webview.navigate(content_uri)
|
||||||
webview.run()
|
webview.run()
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import threading
|
|
||||||
import uuid
|
import uuid
|
||||||
from http.server import BaseHTTPRequestHandler
|
from http.server import BaseHTTPRequestHandler
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -9,7 +8,6 @@ from urllib.parse import urlparse
|
|||||||
|
|
||||||
from clan_lib.api import MethodRegistry, SuccessDataClass, dataclass_to_dict
|
from clan_lib.api import MethodRegistry, SuccessDataClass, dataclass_to_dict
|
||||||
from clan_lib.api.tasks import WebThread
|
from clan_lib.api.tasks import WebThread
|
||||||
from clan_lib.async_run import set_should_cancel
|
|
||||||
|
|
||||||
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
|
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
|
||||||
|
|
||||||
@@ -35,11 +33,12 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
|
|||||||
*,
|
*,
|
||||||
openapi_file: Path | None = None,
|
openapi_file: Path | None = None,
|
||||||
swagger_dist: Path | None = None,
|
swagger_dist: Path | None = None,
|
||||||
|
shared_threads: dict[str, WebThread] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
# Initialize API bridge fields
|
# Initialize API bridge fields
|
||||||
self.api = api
|
self.api = api
|
||||||
self.middleware_chain = middleware_chain
|
self.middleware_chain = middleware_chain
|
||||||
self.threads: dict[str, WebThread] = {}
|
self.threads = shared_threads if shared_threads is not None else {}
|
||||||
|
|
||||||
# Initialize OpenAPI/Swagger fields
|
# Initialize OpenAPI/Swagger fields
|
||||||
self.openapi_file = openapi_file
|
self.openapi_file = openapi_file
|
||||||
@@ -329,31 +328,13 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
|
|||||||
self, api_request: BackendRequest, method_name: str
|
self, api_request: BackendRequest, method_name: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Process the API request in a separate thread."""
|
"""Process the API request in a separate thread."""
|
||||||
op_key = api_request.op_key or "unknown"
|
# Use the inherited thread processing method
|
||||||
|
self.process_request_in_thread(
|
||||||
def thread_task(stop_event: threading.Event) -> None:
|
api_request,
|
||||||
set_should_cancel(lambda: stop_event.is_set())
|
thread_name="HttpThread",
|
||||||
try:
|
wait_for_completion=True,
|
||||||
self.process_request(api_request)
|
timeout=60.0,
|
||||||
finally:
|
|
||||||
self.threads.pop(op_key, None)
|
|
||||||
|
|
||||||
stop_event = threading.Event()
|
|
||||||
thread = threading.Thread(
|
|
||||||
target=thread_task, args=(stop_event,), name="HttpThread"
|
|
||||||
)
|
)
|
||||||
thread.start()
|
|
||||||
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
|
|
||||||
|
|
||||||
# Wait for the thread to complete (this blocks until response is sent)
|
|
||||||
thread.join(timeout=60.0)
|
|
||||||
|
|
||||||
# Handle timeout
|
|
||||||
if thread.is_alive():
|
|
||||||
stop_event.set() # Cancel the thread
|
|
||||||
self.send_api_error_response(
|
|
||||||
op_key, "Request timeout", ["http_bridge", method_name]
|
|
||||||
)
|
|
||||||
|
|
||||||
def log_message(self, format: str, *args: Any) -> None: # noqa: A002
|
def log_message(self, format: str, *args: Any) -> None: # noqa: A002
|
||||||
"""Override default logging to use our logger."""
|
"""Override default logging to use our logger."""
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
|||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from clan_lib.api import MethodRegistry
|
from clan_lib.api import MethodRegistry
|
||||||
|
from clan_lib.api.tasks import WebThread
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from clan_app.api.middleware import Middleware
|
from clan_app.api.middleware import Middleware
|
||||||
@@ -24,6 +25,7 @@ class HttpApiServer:
|
|||||||
port: int = 8080,
|
port: int = 8080,
|
||||||
openapi_file: Path | None = None,
|
openapi_file: Path | None = None,
|
||||||
swagger_dist: Path | None = None,
|
swagger_dist: Path | None = None,
|
||||||
|
shared_threads: dict[str, WebThread] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.api = api
|
self.api = api
|
||||||
self.openapi = openapi_file
|
self.openapi = openapi_file
|
||||||
@@ -34,6 +36,7 @@ class HttpApiServer:
|
|||||||
self._server_thread: threading.Thread | None = None
|
self._server_thread: threading.Thread | None = None
|
||||||
# Bridge is now the request handler itself, no separate instance needed
|
# Bridge is now the request handler itself, no separate instance needed
|
||||||
self._middleware: list[Middleware] = []
|
self._middleware: list[Middleware] = []
|
||||||
|
self.shared_threads = shared_threads or {}
|
||||||
|
|
||||||
def add_middleware(self, middleware: "Middleware") -> None:
|
def add_middleware(self, middleware: "Middleware") -> None:
|
||||||
"""Add middleware to the middleware chain."""
|
"""Add middleware to the middleware chain."""
|
||||||
@@ -58,6 +61,7 @@ class HttpApiServer:
|
|||||||
middleware_chain = tuple(self._middleware)
|
middleware_chain = tuple(self._middleware)
|
||||||
openapi_file = self.openapi
|
openapi_file = self.openapi
|
||||||
swagger_dist = self.swagger_dist
|
swagger_dist = self.swagger_dist
|
||||||
|
shared_threads = self.shared_threads
|
||||||
|
|
||||||
class RequestHandler(HttpBridge):
|
class RequestHandler(HttpBridge):
|
||||||
def __init__(self, request: Any, client_address: Any, server: Any) -> None:
|
def __init__(self, request: Any, client_address: Any, server: Any) -> None:
|
||||||
@@ -69,6 +73,7 @@ class HttpApiServer:
|
|||||||
server=server,
|
server=server,
|
||||||
openapi_file=openapi_file,
|
openapi_file=openapi_file,
|
||||||
swagger_dist=swagger_dist,
|
swagger_dist=swagger_dist,
|
||||||
|
shared_threads=shared_threads,
|
||||||
)
|
)
|
||||||
|
|
||||||
return RequestHandler
|
return RequestHandler
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ class Webview:
|
|||||||
debug: bool = False
|
debug: bool = False
|
||||||
size: Size | None = None
|
size: Size | None = None
|
||||||
window: int | None = None
|
window: int | None = None
|
||||||
|
shared_threads: dict[str, WebThread] | None = None
|
||||||
|
|
||||||
# initialized later
|
# initialized later
|
||||||
_bridge: "WebviewBridge | None" = None
|
_bridge: "WebviewBridge | None" = None
|
||||||
@@ -116,7 +117,17 @@ class Webview:
|
|||||||
"""Create and initialize the WebviewBridge with current middleware."""
|
"""Create and initialize the WebviewBridge with current middleware."""
|
||||||
from .webview_bridge import WebviewBridge
|
from .webview_bridge import WebviewBridge
|
||||||
|
|
||||||
bridge = WebviewBridge(webview=self, middleware_chain=tuple(self._middleware))
|
# Use shared_threads if provided, otherwise let WebviewBridge use its default
|
||||||
|
if self.shared_threads is not None:
|
||||||
|
bridge = WebviewBridge(
|
||||||
|
webview=self,
|
||||||
|
middleware_chain=tuple(self._middleware),
|
||||||
|
threads=self.shared_threads,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
bridge = WebviewBridge(
|
||||||
|
webview=self, middleware_chain=tuple(self._middleware), threads={}
|
||||||
|
)
|
||||||
self._bridge = bridge
|
self._bridge = bridge
|
||||||
return bridge
|
return bridge
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,10 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import threading
|
from dataclasses import dataclass
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from clan_lib.api import dataclass_to_dict
|
from clan_lib.api import dataclass_to_dict
|
||||||
from clan_lib.api.tasks import WebThread
|
from clan_lib.api.tasks import WebThread
|
||||||
from clan_lib.async_run import set_should_cancel
|
|
||||||
|
|
||||||
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
|
from clan_app.api.api_bridge import ApiBridge, BackendRequest, BackendResponse
|
||||||
|
|
||||||
@@ -23,7 +21,7 @@ class WebviewBridge(ApiBridge):
|
|||||||
"""Webview-specific implementation of the API bridge."""
|
"""Webview-specific implementation of the API bridge."""
|
||||||
|
|
||||||
webview: "Webview"
|
webview: "Webview"
|
||||||
threads: dict[str, WebThread] = field(default_factory=dict)
|
threads: dict[str, WebThread] # Inherited from ApiBridge
|
||||||
|
|
||||||
def send_api_response(self, response: BackendResponse) -> None:
|
def send_api_response(self, response: BackendResponse) -> None:
|
||||||
"""Send response back to the webview client."""
|
"""Send response back to the webview client."""
|
||||||
@@ -84,21 +82,9 @@ class WebviewBridge(ApiBridge):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Process in a separate thread
|
# Process in a separate thread using the inherited method
|
||||||
def thread_task(stop_event: threading.Event) -> None:
|
self.process_request_in_thread(
|
||||||
set_should_cancel(lambda: stop_event.is_set())
|
api_request,
|
||||||
|
thread_name="WebviewThread",
|
||||||
try:
|
wait_for_completion=False,
|
||||||
log.debug(
|
|
||||||
f"Calling {method_name}({json.dumps(api_request.args, indent=4)}) with header {json.dumps(api_request.header, indent=4)} and op_key {op_key}"
|
|
||||||
)
|
|
||||||
self.process_request(api_request)
|
|
||||||
finally:
|
|
||||||
self.threads.pop(op_key, None)
|
|
||||||
|
|
||||||
stop_event = threading.Event()
|
|
||||||
thread = threading.Thread(
|
|
||||||
target=thread_task, args=(stop_event,), name="WebviewThread"
|
|
||||||
)
|
)
|
||||||
thread.start()
|
|
||||||
self.threads[op_key] = WebThread(thread=thread, stop_event=stop_event)
|
|
||||||
|
|||||||
@@ -71,8 +71,8 @@ def substitute(
|
|||||||
|
|
||||||
with file.open() as f:
|
with file.open() as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
|
line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
|
||||||
if clan_core_replacement:
|
if clan_core_replacement:
|
||||||
line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
|
|
||||||
line = line.replace("__CLAN_CORE__", clan_core_replacement)
|
line = line.replace("__CLAN_CORE__", clan_core_replacement)
|
||||||
line = line.replace(
|
line = line.replace(
|
||||||
"git+https://git.clan.lol/clan/clan-core", clan_core_replacement
|
"git+https://git.clan.lol/clan/clan-core", clan_core_replacement
|
||||||
@@ -385,6 +385,7 @@ def test_flake(
|
|||||||
flake_template="test_flake",
|
flake_template="test_flake",
|
||||||
monkeypatch=monkeypatch,
|
monkeypatch=monkeypatch,
|
||||||
)
|
)
|
||||||
|
|
||||||
# check that git diff on ./sops is empty
|
# check that git diff on ./sops is empty
|
||||||
if (temporary_home / "test_flake" / "sops").exists():
|
if (temporary_home / "test_flake" / "sops").exists():
|
||||||
git_proc = sp.run(
|
git_proc = sp.run(
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ class MethodRegistry:
|
|||||||
|
|
||||||
def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]:
|
def register_abstract(self, fn: Callable[..., T]) -> Callable[..., T]:
|
||||||
@wraps(fn)
|
@wraps(fn)
|
||||||
def wrapper(*args: Any, op_key: str, **kwargs: Any) -> ApiResponse[T]:
|
def wrapper(*args: Any, **kwargs: Any) -> ApiResponse[T]:
|
||||||
msg = f"""{fn.__name__} - The platform didn't implement this function.
|
msg = f"""{fn.__name__} - The platform didn't implement this function.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ from dataclasses import dataclass, field
|
|||||||
from typing import Any, Literal
|
from typing import Any, Literal
|
||||||
|
|
||||||
from clan_lib.cmd import RunOpts, run
|
from clan_lib.cmd import RunOpts, run
|
||||||
|
from clan_lib.flake import Flake
|
||||||
from clan_lib.nix import nix_shell
|
from clan_lib.nix import nix_shell
|
||||||
|
|
||||||
from . import API
|
from . import API
|
||||||
@@ -38,6 +39,16 @@ def open_file(file_request: FileRequest) -> list[str] | None:
|
|||||||
raise NotImplementedError(msg)
|
raise NotImplementedError(msg)
|
||||||
|
|
||||||
|
|
||||||
|
@API.register_abstract
|
||||||
|
def open_clan_folder() -> Flake:
|
||||||
|
"""
|
||||||
|
Abstract api method to open the clan folder.
|
||||||
|
It must return the path to the clan folder.
|
||||||
|
"""
|
||||||
|
msg = "open_clan_folder() is not implemented"
|
||||||
|
raise NotImplementedError(msg)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class BlkInfo:
|
class BlkInfo:
|
||||||
name: str
|
name: str
|
||||||
|
|||||||
37
pkgs/clan-cli/clan_lib/clan/check.py
Normal file
37
pkgs/clan-cli/clan_lib/clan/check.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from clan_lib.api import API
|
||||||
|
from clan_lib.errors import ClanError
|
||||||
|
from clan_lib.flake import Flake
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@API.register
|
||||||
|
def check_clan_valid(flake: Flake) -> bool:
|
||||||
|
"""Check if a clan is valid by verifying if it has the clanInternals attribute.
|
||||||
|
Args:
|
||||||
|
flake: The Flake instance representing the clan.
|
||||||
|
Returns:
|
||||||
|
bool: True if the clan exists, False otherwise.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
flake.prefetch()
|
||||||
|
except ClanError as e:
|
||||||
|
msg = f"Flake {flake} is not valid: {e}"
|
||||||
|
log.info(msg)
|
||||||
|
return False
|
||||||
|
|
||||||
|
if flake.is_local and not flake.path.exists():
|
||||||
|
msg = f"Path {flake} does not exist"
|
||||||
|
log.info(msg)
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
flake.select("clanInternals.inventoryClass.directory")
|
||||||
|
except ClanError as e:
|
||||||
|
msg = f"Flake {flake} is not a valid clan directory: {e}"
|
||||||
|
log.info(msg)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
24
pkgs/clan-cli/clan_lib/clan/check_test.py
Normal file
24
pkgs/clan-cli/clan_lib/clan/check_test.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from clan_cli.tests.fixtures_flakes import FlakeForTest
|
||||||
|
|
||||||
|
from clan_lib.clan.check import check_clan_valid
|
||||||
|
from clan_lib.flake import Flake
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.with_core
|
||||||
|
def test_check_clan_valid(
|
||||||
|
temporary_home: Path, test_flake_with_core: FlakeForTest, test_flake: FlakeForTest
|
||||||
|
) -> None:
|
||||||
|
# Test with a valid clan
|
||||||
|
flake = Flake(str(test_flake_with_core.path))
|
||||||
|
assert check_clan_valid(flake) is True
|
||||||
|
|
||||||
|
# Test with an invalid clan
|
||||||
|
flake = Flake(str(test_flake.path))
|
||||||
|
assert check_clan_valid(flake) is False
|
||||||
|
|
||||||
|
# Test with a non-existent clan
|
||||||
|
flake = Flake(str(temporary_home))
|
||||||
|
assert check_clan_valid(flake) is False
|
||||||
@@ -1,9 +1,13 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
from clan_lib.api import API
|
from clan_lib.api import API
|
||||||
from clan_lib.errors import ClanError
|
from clan_lib.errors import ClanError
|
||||||
from clan_lib.flake import Flake
|
from clan_lib.flake import Flake
|
||||||
from clan_lib.nix_models.clan import InventoryMeta
|
from clan_lib.nix_models.clan import InventoryMeta
|
||||||
from clan_lib.persist.inventory_store import InventoryStore
|
from clan_lib.persist.inventory_store import InventoryStore
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@API.register
|
@API.register
|
||||||
def get_clan_details(flake: Flake) -> InventoryMeta:
|
def get_clan_details(flake: Flake) -> InventoryMeta:
|
||||||
|
|||||||
Reference in New Issue
Block a user