Merge pull request 'ruff-7-misc' (#4939) from ruff-7-misc into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4939
This commit is contained in:
Mic92
2025-08-26 10:43:12 +00:00
29 changed files with 123 additions and 136 deletions

0
nixosModules/clanCore/zerotier/genmoon.py Normal file → Executable file
View File

View File

@@ -59,7 +59,7 @@ class ApiBridge(ABC):
f"{middleware.__class__.__name__} => {request.method_name}", f"{middleware.__class__.__name__} => {request.method_name}",
) )
middleware.process(context) middleware.process(context)
except Exception as e: except Exception as e: # noqa: BLE001
# If middleware fails, handle error # If middleware fails, handle error
self.send_api_error_response( self.send_api_error_response(
request.op_key or "unknown", request.op_key or "unknown",

View File

@@ -148,7 +148,7 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
self.send_header("Content-Type", content_type) self.send_header("Content-Type", content_type)
self.end_headers() self.end_headers()
self.wfile.write(file_data) self.wfile.write(file_data)
except Exception as e: except (OSError, json.JSONDecodeError, UnicodeDecodeError) as e:
log.error(f"Error reading Swagger file: {e!s}") log.error(f"Error reading Swagger file: {e!s}")
self.send_error(500, "Internal Server Error") self.send_error(500, "Internal Server Error")
@@ -252,7 +252,7 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
gen_op_key = str(uuid.uuid4()) gen_op_key = str(uuid.uuid4())
try: try:
self._handle_api_request(method_name, request_data, gen_op_key) self._handle_api_request(method_name, request_data, gen_op_key)
except Exception as e: except RuntimeError as e:
log.exception(f"Error processing API request {method_name}") log.exception(f"Error processing API request {method_name}")
self.send_api_error_response( self.send_api_error_response(
gen_op_key, gen_op_key,
@@ -275,7 +275,7 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
["http_bridge", "POST", method_name], ["http_bridge", "POST", method_name],
) )
return None return None
except Exception as e: except (OSError, ValueError, UnicodeDecodeError) as e:
self.send_api_error_response( self.send_api_error_response(
"post", "post",
f"Error reading request: {e!s}", f"Error reading request: {e!s}",
@@ -305,7 +305,7 @@ class HttpBridge(ApiBridge, BaseHTTPRequestHandler):
op_key=op_key, op_key=op_key,
) )
except Exception as e: except (KeyError, TypeError, ValueError) as e:
self.send_api_error_response( self.send_api_error_response(
gen_op_key, gen_op_key,
str(e), str(e),

View File

@@ -4,13 +4,11 @@ import json
import logging import logging
import threading import threading
import time import time
from unittest.mock import Mock
from urllib.request import Request, urlopen from urllib.request import Request, urlopen
import pytest import pytest
from clan_lib.api import MethodRegistry, tasks from clan_lib.api import MethodRegistry, tasks
from clan_lib.async_run import is_async_cancelled from clan_lib.async_run import is_async_cancelled
from clan_lib.log_manager import LogManager
from clan_app.api.middleware import ( from clan_app.api.middleware import (
ArgumentParsingMiddleware, ArgumentParsingMiddleware,
@@ -53,31 +51,20 @@ def mock_api() -> MethodRegistry:
return api return api
@pytest.fixture
def mock_log_manager() -> Mock:
"""Create a mock log manager."""
log_manager = Mock(spec=LogManager)
log_manager.create_log_file.return_value.get_file_path.return_value = Mock()
log_manager.create_log_file.return_value.get_file_path.return_value.open.return_value = Mock()
return log_manager
@pytest.fixture @pytest.fixture
def http_bridge( def http_bridge(
mock_api: MethodRegistry, mock_api: MethodRegistry,
mock_log_manager: Mock,
) -> tuple[MethodRegistry, tuple]: ) -> tuple[MethodRegistry, tuple]:
"""Create HTTP bridge dependencies for testing.""" """Create HTTP bridge dependencies for testing."""
middleware_chain = ( middleware_chain = (
ArgumentParsingMiddleware(api=mock_api), ArgumentParsingMiddleware(api=mock_api),
# LoggingMiddleware(log_manager=mock_log_manager),
MethodExecutionMiddleware(api=mock_api), MethodExecutionMiddleware(api=mock_api),
) )
return mock_api, middleware_chain return mock_api, middleware_chain
@pytest.fixture @pytest.fixture
def http_server(mock_api: MethodRegistry, mock_log_manager: Mock) -> HttpApiServer: def http_server(mock_api: MethodRegistry) -> HttpApiServer:
"""Create HTTP server with mock dependencies.""" """Create HTTP server with mock dependencies."""
server = HttpApiServer( server = HttpApiServer(
api=mock_api, api=mock_api,
@@ -87,7 +74,6 @@ def http_server(mock_api: MethodRegistry, mock_log_manager: Mock) -> HttpApiServ
# Add middleware # Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
# server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager))
server.add_middleware(MethodExecutionMiddleware(api=mock_api)) server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Bridge will be created automatically when accessed # Bridge will be created automatically when accessed
@@ -114,7 +100,6 @@ class TestHttpBridge:
# The actual HTTP handling will be tested through the server integration tests # The actual HTTP handling will be tested through the server integration tests
assert len(middleware_chain) == 2 assert len(middleware_chain) == 2
assert isinstance(middleware_chain[0], ArgumentParsingMiddleware) assert isinstance(middleware_chain[0], ArgumentParsingMiddleware)
# assert isinstance(middleware_chain[1], LoggingMiddleware)
assert isinstance(middleware_chain[1], MethodExecutionMiddleware) assert isinstance(middleware_chain[1], MethodExecutionMiddleware)
@@ -259,7 +244,6 @@ class TestIntegration:
def test_full_request_flow( def test_full_request_flow(
self, self,
mock_api: MethodRegistry, mock_api: MethodRegistry,
mock_log_manager: Mock,
) -> None: ) -> None:
"""Test complete request flow from server to bridge to middleware.""" """Test complete request flow from server to bridge to middleware."""
server: HttpApiServer = HttpApiServer( server: HttpApiServer = HttpApiServer(
@@ -270,7 +254,6 @@ class TestIntegration:
# Add middleware # Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
# server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager))
server.add_middleware(MethodExecutionMiddleware(api=mock_api)) server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Bridge will be created automatically when accessed # Bridge will be created automatically when accessed
@@ -306,7 +289,6 @@ class TestIntegration:
def test_blocking_task( def test_blocking_task(
self, self,
mock_api: MethodRegistry, mock_api: MethodRegistry,
mock_log_manager: Mock,
) -> None: ) -> None:
shared_threads: dict[str, tasks.WebThread] = {} shared_threads: dict[str, tasks.WebThread] = {}
tasks.BAKEND_THREADS = shared_threads tasks.BAKEND_THREADS = shared_threads
@@ -321,7 +303,6 @@ class TestIntegration:
# Add middleware # Add middleware
server.add_middleware(ArgumentParsingMiddleware(api=mock_api)) server.add_middleware(ArgumentParsingMiddleware(api=mock_api))
# server.add_middleware(LoggingMiddleware(log_manager=mock_log_manager))
server.add_middleware(MethodExecutionMiddleware(api=mock_api)) server.add_middleware(MethodExecutionMiddleware(api=mock_api))
# Start server # Start server

View File

@@ -81,7 +81,7 @@ class Webview:
msg = message_queue.get() # Blocks until available msg = message_queue.get() # Blocks until available
js_code = f"window.notifyBus({json.dumps(msg)});" js_code = f"window.notifyBus({json.dumps(msg)});"
self.eval(js_code) self.eval(js_code)
except Exception as e: except (json.JSONDecodeError, RuntimeError, AttributeError) as e:
print("Bridge notify error:", e) print("Bridge notify error:", e)
sleep(0.01) # avoid busy loop sleep(0.01) # avoid busy loop
@@ -211,7 +211,7 @@ class Webview:
try: try:
result = callback(*args) result = callback(*args)
success = True success = True
except Exception as e: except Exception as e: # noqa: BLE001
result = str(e) result = str(e)
success = False success = False
self.return_(seq.decode(), 0 if success else 1, json.dumps(result)) self.return_(seq.decode(), 0 if success else 1, json.dumps(result))

View File

@@ -13,24 +13,24 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool:
missing_secret_facts = [] missing_secret_facts = []
missing_public_facts = [] missing_public_facts = []
services = [service] if service else list(machine.facts_data.keys()) services = [service] if service else list(machine.facts_data.keys())
for service in services: for svc in services:
for secret_fact in machine.facts_data[service]["secret"]: for secret_fact in machine.facts_data[svc]["secret"]:
if isinstance(secret_fact, str): if isinstance(secret_fact, str):
secret_name = secret_fact secret_name = secret_fact
else: else:
secret_name = secret_fact["name"] secret_name = secret_fact["name"]
if not machine.secret_facts_store.exists(service, secret_name): if not machine.secret_facts_store.exists(svc, secret_name):
machine.info( machine.info(
f"Secret fact '{secret_fact}' for service '{service}' is missing.", f"Secret fact '{secret_fact}' for service '{svc}' is missing.",
) )
missing_secret_facts.append((service, secret_name)) missing_secret_facts.append((svc, secret_name))
for public_fact in machine.facts_data[service]["public"]: for public_fact in machine.facts_data[svc]["public"]:
if not machine.public_facts_store.exists(service, public_fact): if not machine.public_facts_store.exists(svc, public_fact):
machine.info( machine.info(
f"Public fact '{public_fact}' for service '{service}' is missing.", f"Public fact '{public_fact}' for service '{svc}' is missing.",
) )
missing_public_facts.append((service, public_fact)) missing_public_facts.append((svc, public_fact))
machine.debug(f"missing_secret_facts: {missing_secret_facts}") machine.debug(f"missing_secret_facts: {missing_secret_facts}")
machine.debug(f"missing_public_facts: {missing_public_facts}") machine.debug(f"missing_public_facts: {missing_public_facts}")

View File

@@ -178,10 +178,10 @@ def _generate_facts_for_machine(
else: else:
machine_service_facts = machine.facts_data machine_service_facts = machine.facts_data
for service in machine_service_facts: for svc in machine_service_facts:
machine_updated |= generate_service_facts( machine_updated |= generate_service_facts(
machine=machine, machine=machine,
service=service, service=svc,
regenerate=regenerate, regenerate=regenerate,
secret_facts_store=machine.secret_facts_store, secret_facts_store=machine.secret_facts_store,
public_facts_store=machine.public_facts_store, public_facts_store=machine.public_facts_store,

View File

@@ -1,6 +1,7 @@
import argparse import argparse
import logging import logging
from clan_lib.errors import ClanError
from clan_lib.flake import require_flake from clan_lib.flake import require_flake
from clan_lib.network.network import networks_from_flake from clan_lib.network.network import networks_from_flake
@@ -54,7 +55,7 @@ def list_command(args: argparse.Namespace) -> None:
try: try:
is_running = network.is_running() is_running = network.is_running()
running_status = "Yes" if is_running else "No" running_status = "Yes" if is_running else "No"
except Exception: except ClanError:
running_status = "Error" running_status = "Error"
print( print(

View File

@@ -83,7 +83,7 @@ class KeyType(enum.Enum):
except FileNotFoundError: except FileNotFoundError:
return return
except Exception as ex: except OSError as ex:
log.warning(f"Could not read age keys from {key_path}", exc_info=ex) log.warning(f"Could not read age keys from {key_path}", exc_info=ex)
if keys := os.environ.get("SOPS_AGE_KEY"): if keys := os.environ.get("SOPS_AGE_KEY"):

View File

@@ -52,12 +52,12 @@ def list_state_folders(machine: Machine, service: None | str = None) -> None:
description=f"The service: {service} needs to be configured for the machine.", description=f"The service: {service} needs to be configured for the machine.",
) )
for service in state: for svc in state:
if not service: if not svc:
continue continue
print(f"· service: {service}") print(f"· service: {svc}")
service_cfg = state.get(service) service_cfg = state.get(svc)
if not service_cfg: if not service_cfg:
continue # or handle missing config continue # or handle missing config

View File

@@ -1,5 +1,3 @@
#!/usr/bin/env python3
import contextlib import contextlib
import socket import socket
from collections.abc import Callable from collections.abc import Callable

View File

@@ -34,7 +34,7 @@ def start_virtiofsd(socket_path: Path) -> Iterator[None]:
str(store), str(store),
], ],
) )
log.debug("$ {}".format(" ".join(virtiofsd))) log.debug("$ %s", " ".join(virtiofsd))
with subprocess.Popen(virtiofsd) as proc: with subprocess.Popen(virtiofsd) as proc:
try: try:
while not socket_path.exists(): while not socket_path.exists():

View File

@@ -155,7 +155,7 @@ class AsyncThread[**P, R](threading.Thread):
set_should_cancel(lambda: self.stop_event.is_set()) set_should_cancel(lambda: self.stop_event.is_set())
# Arguments for ParamSpec "P@AsyncThread" are missing # Arguments for ParamSpec "P@AsyncThread" are missing
self.result = AsyncResult(_result=self.function(*self.args, **self.kwargs)) self.result = AsyncResult(_result=self.function(*self.args, **self.kwargs))
except Exception as ex: except Exception as ex: # noqa: BLE001
self.result = AsyncResult(_result=ex) self.result = AsyncResult(_result=ex)
finally: finally:
self.finished = True self.finished = True

View File

@@ -11,9 +11,9 @@ def create_backup(machine: Machine, provider: str | None = None) -> None:
msg = "No providers specified" msg = "No providers specified"
raise ClanError(msg) raise ClanError(msg)
with host.host_connection() as ssh: with host.host_connection() as ssh:
for provider in backup_scripts["providers"]: for prov in backup_scripts["providers"]:
proc = ssh.run( proc = ssh.run(
[backup_scripts["providers"][provider]["create"]], [backup_scripts["providers"][prov]["create"]],
) )
if proc.returncode != 0: if proc.returncode != 0:
msg = "failed to start backup" msg = "failed to start backup"

View File

@@ -846,7 +846,7 @@ class Flake:
return return
try: try:
self._cache.load_from_file(path) self._cache.load_from_file(path)
except Exception as e: except (OSError, json.JSONDecodeError, KeyError, ValueError) as e:
log.warning(f"Failed load eval cache: {e}. Continue without cache") log.warning(f"Failed load eval cache: {e}. Continue without cache")
def prefetch(self) -> None: def prefetch(self) -> None:

View File

@@ -14,6 +14,7 @@ def list_log_days() -> list[str]:
Raises: Raises:
ClanError: If LOG_MANAGER_INSTANCE is not initialized. ClanError: If LOG_MANAGER_INSTANCE is not initialized.
""" """
if LOG_MANAGER_INSTANCE is None: if LOG_MANAGER_INSTANCE is None:
msg = "LOG_MANAGER_INSTANCE is not initialized" msg = "LOG_MANAGER_INSTANCE is not initialized"
@@ -63,6 +64,7 @@ def get_log_file(
Raises: Raises:
ClanError: If the log file is not found or LOG_MANAGER_INSTANCE is not initialized. ClanError: If the log file is not found or LOG_MANAGER_INSTANCE is not initialized.
""" """
if LOG_MANAGER_INSTANCE is None: if LOG_MANAGER_INSTANCE is None:
msg = "LOG_MANAGER_INSTANCE is not initialized" msg = "LOG_MANAGER_INSTANCE is not initialized"

View File

@@ -234,7 +234,7 @@ def run_machine_update(
is_mobile = machine.select( is_mobile = machine.select(
"config.system.clan.deployment.nixosMobileWorkaround", "config.system.clan.deployment.nixosMobileWorkaround",
) )
except Exception: except ClanError:
is_mobile = False is_mobile = False
# if the machine is mobile, we retry to deploy with the mobile workaround method # if the machine is mobile, we retry to deploy with the mobile workaround method
if is_mobile: if is_mobile:

View File

@@ -161,12 +161,9 @@ def get_best_remote(machine: "Machine") -> Iterator["Remote"]:
if target_host: if target_host:
log.debug(f"Using targetHost from inventory for {machine.name}: {target_host}") log.debug(f"Using targetHost from inventory for {machine.name}: {target_host}")
# Create a direct network with just this machine # Create a direct network with just this machine
try: remote = Remote.from_ssh_uri(machine_name=machine.name, address=target_host)
remote = Remote.from_ssh_uri(machine_name=machine.name, address=target_host) yield remote
yield remote return
return
except Exception as e:
log.debug(f"Inventory targetHost not reachable for {machine.name}: {e}")
# Step 2: Try existing networks by priority # Step 2: Try existing networks by priority
try: try:
@@ -189,7 +186,7 @@ def get_best_remote(machine: "Machine") -> Iterator["Remote"]:
) )
yield network.remote(machine.name) yield network.remote(machine.name)
return return
except Exception as e: except ClanError as e:
log.debug(f"Failed to reach {machine.name} via {network_name}: {e}") log.debug(f"Failed to reach {machine.name} via {network_name}: {e}")
else: else:
try: try:
@@ -202,34 +199,26 @@ def get_best_remote(machine: "Machine") -> Iterator["Remote"]:
) )
yield connected_network.remote(machine.name) yield connected_network.remote(machine.name)
return return
except Exception as e: except ClanError as e:
log.debug( log.debug(
f"Failed to establish connection to {machine.name} via {network_name}: {e}", f"Failed to establish connection to {machine.name} via {network_name}: {e}",
) )
except Exception as e: except (ImportError, AttributeError, KeyError) as e:
log.debug(f"Failed to use networking modules to determine machines remote: {e}") log.debug(f"Failed to use networking modules to determine machines remote: {e}")
# Step 3: Try targetHost from machine nixos config # Step 3: Try targetHost from machine nixos config
try: target_host = machine.select('config.clan.core.networking."targetHost"')
target_host = machine.select('config.clan.core.networking."targetHost"') if target_host:
if target_host: log.debug(
log.debug( f"Using targetHost from machine config for {machine.name}: {target_host}",
f"Using targetHost from machine config for {machine.name}: {target_host}", )
) # Check if reachable
# Check if reachable remote = Remote.from_ssh_uri(
try: machine_name=machine.name,
remote = Remote.from_ssh_uri( address=target_host,
machine_name=machine.name, )
address=target_host, yield remote
) return
yield remote
return
except Exception as e:
log.debug(
f"Machine config targetHost not reachable for {machine.name}: {e}",
)
except Exception as e:
log.debug(f"Could not get targetHost from machine config: {e}")
# No connection method found # No connection method found
msg = f"Could not find any way to connect to machine '{machine.name}'. No targetHost configured and machine not reachable via any network." msg = f"Could not find any way to connect to machine '{machine.name}'. No targetHost configured and machine not reachable via any network."
@@ -249,12 +238,7 @@ def get_network_overview(networks: dict[str, Network]) -> dict:
else: else:
with module.connection(network) as conn: with module.connection(network) as conn:
for peer_name in conn.peers: for peer_name in conn.peers:
try: result[network_name]["peers"][peer_name] = conn.ping(
result[network_name]["peers"][peer_name] = conn.ping( peer_name,
peer_name, )
)
except ClanError:
log.warning(
f"getting host for machine: {peer_name} in network: {network_name} failed",
)
return result return result

View File

@@ -29,6 +29,7 @@ class QRCodeData:
@contextmanager @contextmanager
def get_best_remote(self) -> Iterator[Remote]: def get_best_remote(self) -> Iterator[Remote]:
errors = []
for address in self.addresses: for address in self.addresses:
try: try:
log.debug(f"Establishing connection via {address}") log.debug(f"Establishing connection via {address}")
@@ -39,8 +40,13 @@ class QRCodeData:
if ping_time is not None: if ping_time is not None:
log.info(f"reachable via {address} after connection") log.info(f"reachable via {address} after connection")
yield address.remote yield address.remote
except Exception as e: except ClanError as e:
log.debug(f"Failed to establish connection via {address}: {e}") errors.append((address, e))
if not errors:
msg = "No reachable remote found in QR code data: " + ", ".join(
f"{addr.remote} ({err})" for addr, err in errors
)
raise ClanError(msg)
def read_qr_json(qr_data: dict[str, Any], flake: Flake) -> QRCodeData: def read_qr_json(qr_data: dict[str, Any], flake: Flake) -> QRCodeData:

View File

@@ -43,18 +43,22 @@ def spawn_tor() -> Iterator[None]:
cmd_args = ["tor", "--HardwareAccel", "1"] cmd_args = ["tor", "--HardwareAccel", "1"]
packages = ["tor"] packages = ["tor"]
cmd = nix_shell(packages, cmd_args) cmd = nix_shell(packages, cmd_args)
process = Popen(cmd)
try: try:
while not is_tor_running(): process = Popen(cmd)
log.debug("Waiting for Tor to start...") try:
time.sleep(0.2) while not is_tor_running():
log.info("Tor is now running") log.debug("Waiting for Tor to start...")
yield time.sleep(0.2)
finally: log.info("Tor is now running")
log.info("Terminating Tor process...") yield
process.terminate() finally:
process.wait() log.info("Terminating Tor process...")
log.info("Tor process terminated") process.terminate()
process.wait()
log.info("Tor process terminated")
except OSError as e:
msg = f"Failed to spawn tor process with command: {cmd}"
raise ClanError(msg) from e
@dataclass(frozen=True) @dataclass(frozen=True)

View File

@@ -239,7 +239,7 @@ def test_run_exception(hosts: list[Remote], runtime: AsyncRuntime) -> None:
runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) # noqa: S604 runtime.async_run(None, host.run_local, ["exit 1"], RunOpts(shell=True)) # noqa: S604
runtime.join_all() runtime.join_all()
runtime.check_all() runtime.check_all()
except Exception: # noqa: S110 except ClanError:
pass pass
else: else:
msg = "should have raised Exception" msg = "should have raised Exception"
@@ -255,7 +255,7 @@ def test_run_function_exception(hosts: list[Remote], runtime: AsyncRuntime) -> N
runtime.async_run(None, some_func, host) runtime.async_run(None, some_func, host)
runtime.join_all() runtime.join_all()
runtime.check_all() runtime.check_all()
except Exception: # noqa: S110 except ClanError:
pass pass
else: else:
msg = "should have raised Exception" msg = "should have raised Exception"

View File

@@ -1,5 +1,3 @@
#!/usr/bin/env python3
import logging import logging
import subprocess import subprocess
import sys import sys
@@ -90,7 +88,7 @@ class SudoAskpassProxy:
ssh_process.stdin.flush() ssh_process.stdin.flush()
else: else:
print(stripped_line) print(stripped_line)
except Exception as e: except (OSError, ClanError) as e:
logger.error(f"Error processing passwords requests output: {e}") logger.error(f"Error processing passwords requests output: {e}")
def run(self) -> str: def run(self) -> str:

0
pkgs/clan-vm-manager/clan_vm_manager/app.py Normal file → Executable file
View File

View File

@@ -84,7 +84,7 @@ def _init_proc(
print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr) print(linebreak + f" {func.__name__}:{pid} " + linebreak, file=sys.stderr)
try: try:
func(**kwargs) func(**kwargs)
except Exception as ex: except Exception as ex: # noqa: BLE001
traceback.print_exc() traceback.print_exc()
if on_except is not None: if on_except is not None:
on_except(ex, mp.current_process()) on_except(ex, mp.current_process())

View File

@@ -345,7 +345,7 @@ class VMObject(GObject.Object):
raise ClanError(msg) raise ClanError(msg)
with self.qmp_wrap.qmp_ctx() as qmp: with self.qmp_wrap.qmp_ctx() as qmp:
qmp.command("system_powerdown") qmp.command("system_powerdown")
except Exception as ex: except (ClanError, OSError, ConnectionError) as ex:
log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}") log.debug(f"QMP command 'system_powerdown' ignored. Error: {ex}")
# Try 20 times to stop the VM # Try 20 times to stop the VM

View File

@@ -139,7 +139,7 @@ class ClanStore:
# Convert the byte array to a string and print it # Convert the byte array to a string and print it
logs_view.set_message(contents.decode("utf-8")) logs_view.set_message(contents.decode("utf-8"))
except Exception as e: except (GLib.Error, UnicodeDecodeError) as e:
print(f"Error reading file: {e}") print(f"Error reading file: {e}")
# only one vm can output logs at a time # only one vm can output logs at a time

View File

@@ -1,7 +1,6 @@
import argparse import argparse
import json import json
import logging import logging
import traceback
from collections.abc import Callable, Iterable from collections.abc import Callable, Iterable
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
@@ -454,8 +453,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
try: main()
main()
except Exception:
print("An error occurred:")
traceback.print_exc()

View File

@@ -1,6 +1,8 @@
{ {
stdenv,
makeWrapper,
python3,
bash, bash,
callPackage,
coreutils, coreutils,
git, git,
lib, lib,
@@ -10,22 +12,37 @@
tea-create-pr, tea-create-pr,
... ...
}: }:
let stdenv.mkDerivation {
writers = callPackage ../builders/script-writers.nix { }; pname = "merge-after-ci";
in version = "0.1.0";
writers.writePython3Bin "merge-after-ci" {
makeWrapperArgs = [ src = ./.;
"--prefix"
"PATH" nativeBuildInputs = [ makeWrapper ];
":"
(lib.makeBinPath [ buildInputs = [ python3 ];
bash
coreutils installPhase = ''
git runHook preInstall
nix
openssh mkdir -p $out/bin
tea cp merge-after-ci.py $out/bin/merge-after-ci
tea-create-pr chmod +x $out/bin/merge-after-ci
])
]; wrapProgram $out/bin/merge-after-ci \
} ./merge-after-ci.py --prefix PATH : ${
lib.makeBinPath [
bash
coreutils
git
nix
openssh
tea
tea-create-pr
python3
]
}
runHook postInstall
'';
}

View File

@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import argparse import argparse
import shlex import shlex
import subprocess import subprocess