PGH003: fix

This commit is contained in:
Jörg Thalheim
2025-08-20 21:14:33 +02:00
parent cbf9678534
commit 8f8426de52
29 changed files with 212 additions and 102 deletions

View File

@@ -180,15 +180,15 @@ class CompositeLogger(AbstractLogger):
stack.enter_context(logger.nested(message, attributes))
yield
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
for logger in self.logger_list:
logger.info(*args, **kwargs)
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
for logger in self.logger_list:
logger.warning(*args, **kwargs)
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
for logger in self.logger_list:
logger.error(*args, **kwargs)
sys.exit(1)
@@ -245,13 +245,13 @@ class TerminalLogger(AbstractLogger):
toc = time.time()
self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)")
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
self.log(*args, **kwargs)
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
self.log(*args, **kwargs)
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
self.log(*args, **kwargs)
def print_serial_logs(self, enable: bool) -> None:
@@ -297,13 +297,13 @@ class XMLLogger(AbstractLogger):
self.xml.characters(message)
self.xml.endElement("line")
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def info(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
self.log(*args, **kwargs)
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def warning(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
self.log(*args, **kwargs)
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore
def error(self, *args: Any, **kwargs: Any) -> None: # type: ignore[no-untyped-def]
self.log(*args, **kwargs)
def log(self, message: str, attributes: dict[str, str] | None = None) -> None:

View File

@@ -43,6 +43,8 @@ class ApiBridge(ABC):
def process_request(self, request: BackendRequest) -> None:
"""Process an API request through the middleware chain."""
from .middleware import MiddlewareContext
with ExitStack() as stack:
context = MiddlewareContext(
request=request,

View File

@@ -15,6 +15,10 @@ from clan_lib.nix import nix_eval
from clan_lib.persist.inventory_store import InventoryStore
from clan_lib.templates import list_templates
from .secrets.groups import list_groups
from .secrets.secrets import list_secrets
from .secrets.users import list_users
"""
This module provides dynamic completions.
The completions should feel fast.

View File

@@ -86,7 +86,7 @@ def flash_command(args: argparse.Namespace) -> None:
run_machine_flash(
machine=machine,
mode=opts.mode, # type: ignore
mode=opts.mode, # type: ignore[arg-type]
disks=opts.disks,
system_config=opts.system_config,
dry_run=opts.dry_run,

View File

@@ -23,8 +23,7 @@ def hyperlink_same_text_and_url(url: str) -> str:
def help_hyperlink(description: str, url: str) -> str:
"""Keep the description and the link the same to support legacy terminals.
"""
"""Keep the description and the link the same to support legacy terminals."""
if sys.argv[0].__contains__("docs.py"):
return docs_hyperlink(description, url)

View File

@@ -6,13 +6,6 @@ from pathlib import Path
from clan_lib.errors import ClanError
from clan_lib.git import commit_files
from clan_cli.completions import (
add_dynamic_completer,
complete_groups,
complete_machines,
complete_secrets,
complete_users,
)
from clan_cli.machines.types import machine_name_type, validate_hostname
from clan_cli.secrets.sops import load_age_plugins
@@ -238,6 +231,11 @@ def remove_machine_command(args: argparse.Namespace) -> None:
def add_group_argument(parser: argparse.ArgumentParser) -> None:
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_groups,
)
group_action = parser.add_argument(
"group",
help="the name of the secret",
@@ -336,6 +334,11 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machines to add",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(add_machine_action, complete_machines)
add_machine_parser.set_defaults(func=add_machine_command)
@@ -350,6 +353,11 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machines to remove",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(remove_machine_action, complete_machines)
remove_machine_parser.set_defaults(func=remove_machine_command)
@@ -361,6 +369,11 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user to add",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(add_user_action, complete_users)
add_user_parser.set_defaults(func=add_user_command)
@@ -375,6 +388,11 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user to remove",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(remove_user_action, complete_users)
remove_user_parser.set_defaults(func=remove_user_command)
@@ -389,6 +407,11 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the secret",
type=secret_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_secrets,
)
add_dynamic_completer(add_secret_action, complete_secrets)
add_secret_parser.set_defaults(func=add_secret_command)
@@ -403,5 +426,10 @@ def register_groups_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the secret",
type=secret_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_secrets,
)
add_dynamic_completer(remove_secret_action, complete_secrets)
remove_secret_parser.set_defaults(func=remove_secret_command)

View File

@@ -7,12 +7,6 @@ from clan_lib.cmd import RunOpts, run
from clan_lib.errors import ClanError
from clan_lib.nix import nix_shell
from clan_cli.completions import (
add_dynamic_completer,
complete_groups,
complete_machines,
complete_users,
)
from clan_cli.secrets.sops import load_age_plugins
from .secrets import encrypt_secret, sops_secrets_folder
@@ -75,6 +69,11 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
default=[],
help="the group to import the secrets to",
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_groups,
)
add_dynamic_completer(group_action, complete_groups)
machine_action = parser.add_argument(
"--machine",
@@ -83,6 +82,11 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
default=[],
help="the machine to import the secrets to",
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(machine_action, complete_machines)
user_action = parser.add_argument(
"--user",
@@ -91,6 +95,11 @@ def register_import_sops_parser(parser: argparse.ArgumentParser) -> None:
default=[],
help="the user to import the secrets to",
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(user_action, complete_users)
parser.add_argument(
"--prefix",

View File

@@ -4,11 +4,6 @@ from pathlib import Path
from clan_lib.flake import require_flake
from clan_lib.git import commit_files
from clan_cli.completions import (
add_dynamic_completer,
complete_machines,
complete_secrets,
)
from clan_cli.machines.types import machine_name_type, validate_hostname
from . import secrets, sops
@@ -177,6 +172,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machine",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(add_machine_action, complete_machines)
add_parser.add_argument(
"key",
@@ -192,6 +192,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machine",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(get_machine_parser, complete_machines)
get_parser.set_defaults(func=get_command)
@@ -202,6 +207,11 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machine",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(remove_machine_parser, complete_machines)
remove_parser.set_defaults(func=remove_command)
@@ -215,6 +225,12 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machine",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
complete_secrets,
)
add_dynamic_completer(machine_add_secret_parser, complete_machines)
add_secret_action = add_secret_parser.add_argument(
"secret",
@@ -234,6 +250,12 @@ def register_machines_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the machine",
type=machine_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
complete_secrets,
)
add_dynamic_completer(machine_remove_parser, complete_machines)
remove_secret_action = remove_secret_parser.add_argument(
"secret",

View File

@@ -12,14 +12,6 @@ from typing import IO
from clan_lib.errors import ClanError
from clan_lib.git import commit_files
from clan_cli.completions import (
add_dynamic_completer,
complete_groups,
complete_machines,
complete_secrets,
complete_users,
)
from . import sops
from .folders import (
list_objects,
@@ -258,6 +250,11 @@ def add_secret_argument(parser: argparse.ArgumentParser, autocomplete: bool) ->
type=secret_name_type,
)
if autocomplete:
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_secrets,
)
add_dynamic_completer(secrets_parser, complete_secrets)
@@ -465,6 +462,11 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
default=[],
help="the group to import the secrets to (can be repeated)",
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_groups,
)
add_dynamic_completer(set_group_action, complete_groups)
machine_parser = parser_set.add_argument(
"--machine",
@@ -473,6 +475,11 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
default=[],
help="the machine to import the secrets to (can be repeated)",
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_machines,
)
add_dynamic_completer(machine_parser, complete_machines)
set_user_action = parser_set.add_argument(
"--user",
@@ -481,6 +488,11 @@ def register_secrets_parser(subparser: argparse._SubParsersAction) -> None:
default=[],
help="the user to import the secrets to (can be repeated)",
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(set_user_action, complete_users)
parser_set.add_argument(
"-e",

View File

@@ -9,8 +9,6 @@ from clan_lib.errors import ClanError
from clan_lib.flake import require_flake
from clan_lib.git import commit_files
from clan_cli.completions import add_dynamic_completer, complete_secrets, complete_users
from . import groups, secrets, sops
from .filters import get_secrets_filter_for_user
from .folders import (
@@ -283,6 +281,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(get_user_action, complete_users)
get_parser.set_defaults(func=get_command)
@@ -292,6 +295,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(remove_user_action, complete_users)
remove_parser.set_defaults(func=remove_command)
@@ -304,6 +312,12 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_secrets,
complete_users,
)
add_dynamic_completer(add_secret_user_action, complete_users)
add_secrets_action = add_secret_parser.add_argument(
"secret",
@@ -322,6 +336,12 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the group",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_secrets,
complete_users,
)
add_dynamic_completer(remove_secret_user_action, complete_users)
remove_secrets_action = remove_secret_parser.add_argument(
"secret",
@@ -340,6 +360,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(add_key_user_action, complete_users)
_add_key_flags(add_key_parser)
add_key_parser.set_defaults(func=add_key_command)
@@ -353,6 +378,11 @@ def register_users_parser(parser: argparse.ArgumentParser) -> None:
help="the name of the user",
type=user_name_type,
)
from clan_cli.completions import ( # noqa: PLC0415
add_dynamic_completer,
complete_users,
)
add_dynamic_completer(remove_key_user_action, complete_users)
_add_key_flags(remove_key_parser)
remove_key_parser.set_defaults(func=remove_key_command)

View File

@@ -20,7 +20,7 @@ def list_command(args: argparse.Namespace) -> None:
builtin_template_set: TemplateClanType | None = templates.builtins.get(
template_type,
None,
) # type: ignore
) # type: ignore[assignment]
if not builtin_template_set:
continue
@@ -41,7 +41,7 @@ def list_command(args: argparse.Namespace) -> None:
]
last_idx = len(visible_inputs) - 1
for input_idx, (input_name, input_templates) in enumerate(visible_inputs):
custom_templates: TemplateClanType = input_templates[template_type] # type: ignore
custom_templates: TemplateClanType = input_templates[template_type] # type: ignore[literal-required]
is_last_input = input_idx == last_idx
prefix = "" if not is_last_input else " "
if not is_last_input:

View File

@@ -194,7 +194,7 @@ API.register(get_system_file)
# we need to update the annotation, because our wrapper changes the return type
# This overrides the new return type annotation with the generic typeVar filled in
orig_return_type = get_type_hints(fn).get("return")
wrapper.__annotations__["return"] = ApiResponse[orig_return_type] # type: ignore
wrapper.__annotations__["return"] = ApiResponse[orig_return_type] # type: ignore[misc,valid-type]
self._registry[fn.__name__] = wrapper

View File

@@ -116,7 +116,7 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
): _to_dict(getattr(obj, field.name))
for field in fields(obj)
if not field.name.startswith("_")
and getattr(obj, field.name) is not None # type: ignore
and getattr(obj, field.name) is not None # type: ignore[no-any-return]
}
if isinstance(obj, list | tuple | set):
return [_to_dict(item) for item in obj]
@@ -131,7 +131,7 @@ def dataclass_to_dict(obj: Any, *, use_alias: bool = True) -> Any:
return _to_dict(obj)
T = TypeVar("T", bound=dataclass) # type: ignore
T = TypeVar("T", bound=dataclass) # type: ignore[valid-type]
def is_union_type(type_hint: type | UnionType) -> bool:
@@ -179,7 +179,7 @@ def unwrap_none_type(type_hint: type | UnionType) -> type:
# Return the first non-None type
return next(t for t in get_args(type_hint) if t is not type(None))
return type_hint # type: ignore
return type_hint # type: ignore[return-value]
def unwrap_union_type(type_hint: type | UnionType) -> list[type]:
@@ -192,7 +192,7 @@ def unwrap_union_type(type_hint: type | UnionType) -> list[type]:
# Return the first non-None type
return list(get_args(type_hint))
return [type_hint] # type: ignore
return [type_hint] # type: ignore[list-item]
JsonValue = str | float | dict[str, Any] | list[Any] | None
@@ -265,11 +265,11 @@ def construct_value(
return field_value
if t is int and not isinstance(field_value, str):
return int(field_value) # type: ignore
return int(field_value) # type: ignore[arg-type]
if t is float and not isinstance(field_value, str):
return float(field_value) # type: ignore
return float(field_value) # type: ignore[arg-type]
if t is bool and isinstance(field_value, bool):
return field_value # type: ignore
return field_value # type: ignore[misc]
# Union types construct the first non-None type
if is_union_type(t):
@@ -313,14 +313,14 @@ def construct_value(
# Enums
if origin is Enum:
try:
return t(field_value) # type: ignore
return t(field_value) # type: ignore[operator]
except ValueError:
msg = f"Expected one of {', '.join(str(origin))}, got {field_value}"
raise ClanError(msg, location=f"{loc}") from ValueError
if isinstance(t, type) and issubclass(t, Enum):
try:
return t(field_value) # type: ignore
return t(field_value) # type: ignore[operator]
except ValueError:
msg = f"Expected one of {', '.join(t.__members__)}, got {field_value}"
raise ClanError(msg, location=f"{loc}") from ValueError
@@ -338,7 +338,7 @@ def construct_value(
msg = f"Expected TypedDict {t}, got {field_value}"
raise ClanError(msg, location=f"{loc}")
return t(field_value) # type: ignore
return t(field_value) # type: ignore[call-arg,operator]
if inspect.isclass(t) and t.__name__ == "Unknown":
# Return the field value as is
@@ -387,7 +387,7 @@ def construct_dataclass[T: Any](
field_value = data.get(data_field_name)
if field_value is None and (
field.type is None or is_type_in_union(field.type, type(None)) # type: ignore
field.type is None or is_type_in_union(field.type, type(None)) # type: ignore[arg-type]
):
field_values[field.name] = None
else:
@@ -402,7 +402,7 @@ def construct_dataclass[T: Any](
msg = f"Default value missing for: '{field_name}' in {t} {formatted_path}, got Value: {data}"
raise ClanError(msg)
return t(**field_values) # type: ignore
return t(**field_values) # type: ignore[return-value]
def from_dict(
@@ -420,5 +420,5 @@ def from_dict(
if not isinstance(data, dict):
msg = f"{data} is not a dict. Expected {t}"
raise ClanError(msg)
return construct_dataclass(t, data, path) # type: ignore
return construct_dataclass(t, data, path) # type: ignore[misc]
return construct_value(t, data, path)

View File

@@ -383,7 +383,7 @@ def test_unknown_serialize() -> None:
class Person:
name: Unknown
data = Person(["a", "b"]) # type: ignore
data = Person(["a", "b"]) # type: ignore[arg-type]
person = dataclass_to_dict(data)
assert person == {"name": ["a", "b"]}

View File

@@ -128,8 +128,8 @@ def type_to_dict(
raise JSchemaTypeError(msg)
properties[f.metadata.get("alias", f.name)] = type_to_dict(
f.type,
f"{scope} {t.__name__}.{f.name}", # type: ignore
type_map, # type: ignore
f"{scope} {t.__name__}.{f.name}", # type: ignore[union-attr]
type_map, # type: ignore[misc]
)
required = set()
@@ -301,7 +301,7 @@ def type_to_dict(
return {
"type": "string",
# Construct every enum value and use the same method as the serde module for converting it into the same literal string
"enum": [dataclass_to_dict(t(value)) for value in t], # type: ignore
"enum": [dataclass_to_dict(t(value)) for value in t], # type: ignore[var-annotated]
}
if t is Any:
msg = f"{scope} - Usage of the Any type is not supported for API functions. In: {scope}"

View File

@@ -10,6 +10,6 @@ class ClanJSONEncoder(json.JSONEncoder):
return o.to_json()
# Check if the object is a dataclass
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o) # type: ignore
return dataclasses.asdict(o) # type: ignore[arg-type]
# Otherwise, use the default serialization
return super().default(o)

View File

@@ -83,7 +83,7 @@ def get_machine_details(machine: Machine) -> MachineDetails:
header = extract_header(content)
data, _rest = parse_frontmatter(header)
if data:
disk_schema = data # type: ignore
disk_schema = data # type: ignore[assignment]
return MachineDetails(
machine=machine_inv,

View File

@@ -4,6 +4,7 @@ import os
import re
import shlex
from contextlib import ExitStack
from typing import cast
from clan_cli.facts.generate import generate_facts
from clan_cli.facts.upload import upload_secrets
@@ -138,13 +139,15 @@ def run_machine_update(
"""
with ExitStack() as stack:
_target_host: Host = stack.enter_context(target_host.host_connection()) # type: ignore
_target_host: Host = cast(
Host, stack.enter_context(target_host.host_connection())
)
_build_host: Host
# If no build host is specified, use the target host as the build host.
if build_host is None:
_build_host = _target_host # type: ignore
_build_host = _target_host
else:
_build_host = stack.enter_context(build_host.host_connection()) # type: ignore
_build_host = cast(Host, stack.enter_context(build_host.host_connection()))
# Some operations require root privileges on the target host.
target_host_root = stack.enter_context(_target_host.become_root())

View File

@@ -169,7 +169,7 @@ class InventoryStore:
with self.inventory_file.open() as f:
try:
res: dict = json.load(f)
inventory = Inventory(res) # type: ignore
inventory = Inventory(res) # type: ignore[misc]
except json.JSONDecodeError as e:
# Error decoding the inventory file
msg = f"Error decoding inventory file: {e}"

View File

@@ -106,11 +106,11 @@ def test_simple_read_write(setup_test_files: Path) -> None:
_keys=["foo", "protected"],
)
store._flake.invalidate_cache()
data: dict = store.read() # type: ignore
data = store.read() # type: ignore[assignment]
assert data == {"foo": "bar", "protected": "protected"}
set_value_by_path(data, "foo", "foo") # type: ignore
store.write(data, "test", commit=False) # type: ignore
set_value_by_path(data, "foo", "foo") # type: ignore[arg-type]
store.write(data, "test", commit=False) # type: ignore[arg-type]
# Default method to access the inventory
assert store.read() == {"foo": "foo", "protected": "protected"}
@@ -120,7 +120,7 @@ def test_simple_read_write(setup_test_files: Path) -> None:
# clan_lib.errors.ClanError: Key 'protected' is not writeable.
invalid_data = {"protected": "foo"}
with pytest.raises(ClanError) as e:
store.write(invalid_data, "test", commit=False) # type: ignore
store.write(invalid_data, "test", commit=False) # type: ignore[arg-type]
assert (
str(e.value)
== "Key 'protected' is not writeable. It seems its value is statically defined in nix."
@@ -132,8 +132,8 @@ def test_simple_read_write(setup_test_files: Path) -> None:
# Remove the foo key from the persisted data
# Technically data = { } should also work
data = {"protected": "protected"}
store.write(data, "test", commit=False) # type: ignore
data = {"protected": "protected"} # type: ignore[typeddict-unknown-key]
store.write(data, "test", commit=False) # type: ignore[arg-type]
@pytest.mark.with_core
@@ -159,23 +159,23 @@ def test_simple_deferred(setup_test_files: Path) -> None:
# Create a new "deferredModule" "C"
set_value_by_path(data, "foo.c", {})
store.write(data, "test", commit=False) # type: ignore
store.write(data, "test", commit=False) # type: ignore[arg-type]
assert store.read() == {"foo": {"a": {}, "b": {}, "c": {}}}
# Remove the "deferredModule" "C"
delete_by_path(data, "foo.c") # type: ignore
delete_by_path(data, "foo.c") # type: ignore[arg-type]
store.write(data, "test", commit=False)
assert store.read() == {"foo": {"a": {}, "b": {}}}
# Write settings into a new "deferredModule" "C" and read them back
set_value_by_path(data, "foo.c", {"timeout": "1s"})
store.write(data, "test", commit=False) # type: ignore
store.write(data, "test", commit=False) # type: ignore[arg-type]
assert store.read() == {"foo": {"a": {}, "b": {}, "c": {"timeout": "1s"}}}
# Remove the "deferredModle" "C" along with its settings
delete_by_path(data, "foo.c") # type: ignore
delete_by_path(data, "foo.c") # type: ignore[arg-type]
store.write(data, "test", commit=False)
assert store.read() == {"foo": {"a": {}, "b": {}}}
@@ -203,7 +203,7 @@ def test_simple_deferred(setup_test_files: Path) -> None:
# assert data == {"foo": {"a": {}, "b": {}}}
# # Create a new "deferredModule" "a" which collides with existing foo.a
# set_value_by_path(data, "foo.a", {"timeout": "1s"}) # type: ignore
# set_value_by_path(data, "foo.a", {"timeout": "1s"}) # type: ignore[misc]
# with pytest.raises(ClanError) as e:
# store.write(data, "test", commit=False)
# assert (

View File

@@ -51,9 +51,9 @@ def merge_objects(
)
elif isinstance(update_val, list) and isinstance(curr_val, list):
if merge_lists:
result[key] = list(dict.fromkeys(curr_val + update_val)) # type: ignore
result[key] = list(dict.fromkeys(curr_val + update_val)) # type: ignore[assignment]
else:
result[key] = update_val # type: ignore
result[key] = update_val # type: ignore[assignment]
elif (
update_val is not None
and curr_val is not None
@@ -62,9 +62,9 @@ def merge_objects(
msg = f"Type mismatch for key '{key}'. Cannot update {type(curr_val)} with {type(update_val)}"
raise ClanError(msg, location=json.dumps([*path, key]))
elif key in update:
result[key] = update_val # type: ignore
result[key] = update_val # type: ignore[assignment]
elif key in curr:
result[key] = curr_val # type: ignore
result[key] = curr_val # type: ignore[assignment]
return cast("T", result)

View File

@@ -702,8 +702,8 @@ def test_delete_non_existent_path_deep() -> None:
def test_merge_objects_empty() -> None:
obj1 = {} # type: ignore
obj2 = {} # type: ignore
obj1: dict[str, int] = {} # type: ignore[var-annotated]
obj2: dict[str, int] = {} # type: ignore[var-annotated]
merged = merge_objects(obj1, obj2)

View File

@@ -106,7 +106,7 @@ def test_parse_deployment_address(
def noop() -> Generator[None, Any]:
yield
maybe_check_exception = noop() # type: ignore
maybe_check_exception = noop() # type: ignore[assignment]
with maybe_check_exception:
machine_name = "foo"

View File

@@ -2,6 +2,7 @@ import json
import os
from copy import deepcopy
from pathlib import Path
from typing import Any
# !!! IMPORTANT !!!
# AVOID VERBS NOT IN THIS LIST
@@ -186,7 +187,7 @@ def get_tag_key(tags: list[str]) -> tuple:
return tuple(tags)
def sort_openapi_paths_by_tag_tree(openapi: dict) -> None:
def sort_openapi_paths_by_tag_tree(openapi: dict[str, Any]) -> None:
# Extract (tags, path, method, operation) tuples
operations = []
@@ -218,7 +219,7 @@ def main() -> None:
functions = schema["properties"]
# === Start OpenAPI 3.0 spec in JSON ===
openapi = {
openapi: dict[str, Any] = {
"openapi": "3.0.3",
"info": {
"title": "Function-Based Python API",
@@ -262,11 +263,11 @@ def main() -> None:
# Register schemas under components
args_name = make_schema_name(func_name, "args")
return_name = make_schema_name(func_name, "return")
openapi["components"]["schemas"][args_name] = args_schema # type: ignore
openapi["components"]["schemas"][return_name] = return_schema # type: ignore
openapi["components"]["schemas"][args_name] = args_schema # type: ignore[misc]
openapi["components"]["schemas"][return_name] = return_schema # type: ignore[misc]
tag = operation_to_tag(func_name)
# Create a POST endpoint for the function
openapi["paths"][f"/{func_name}"] = { # type: ignore
openapi["paths"][f"/{func_name}"] = { # type: ignore[misc]
"post": {
"summary": func_name,
"operationId": func_name,
@@ -301,7 +302,7 @@ def main() -> None:
for def_name, def_schema in defs.items():
fixed_schema = fix_nullables(deepcopy(def_schema))
fix_error_refs(fixed_schema)
openapi["components"]["schemas"][def_name] = fixed_schema # type: ignore
openapi["components"]["schemas"][def_name] = fixed_schema # type: ignore[misc]
# === Write to output JSON ===
with Path("openapi.json").open("w") as f:

View File

@@ -818,7 +818,7 @@ class Win32Implementation(BaseImplementation):
]
def __init__(self, application: Gtk.Application) -> None:
from ctypes import windll # type: ignore
from ctypes import windll # type: ignore[attr-defined]
super().__init__(application)
@@ -834,11 +834,11 @@ class Win32Implementation(BaseImplementation):
self.update_icon()
def _register_class(self) -> None:
from ctypes import byref, windll # type: ignore
from ctypes import byref, windll # type: ignore[attr-defined]
self._window_class = self.WNDCLASSW( # type: ignore
self._window_class = self.WNDCLASSW( # type: ignore[attr-defined]
style=(self.CS_VREDRAW | self.CS_HREDRAW),
lpfn_wnd_proc=self.WNDCLASSW.LPFN_WND_PROC(self.on_process_window_message), # type: ignore
lpfn_wnd_proc=self.WNDCLASSW.LPFN_WND_PROC(self.on_process_window_message), # type: ignore[attr-defined]
h_cursor=windll.user32.LoadCursorW(0, self.IDC_ARROW),
hbr_background=self.COLOR_WINDOW,
lpsz_class_name=self.WINDOW_CLASS_NAME,
@@ -859,7 +859,7 @@ class Win32Implementation(BaseImplementation):
self._window_class = None
def _create_window(self) -> None:
from ctypes import windll # type: ignore
from ctypes import windll # type: ignore[attr-defined]
style = self.WS_OVERLAPPED | self.WS_SYSMENU
self._h_wnd = windll.user32.CreateWindowExW(
@@ -1180,7 +1180,7 @@ class TrayIcon:
self.implementation = Win32Implementation(self.application)
else:
try:
self.implementation = StatusNotifierImplementation(self.application) # type: ignore
self.implementation = StatusNotifierImplementation(self.application) # type: ignore[misc]
except ImplUnavailableError:
self.available = False

View File

@@ -74,7 +74,7 @@ class ErrorToast:
views = ViewStack.use().view
# we cannot check this type, python is not smart enough
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view: Logs = views.get_child_by_name("logs") # type: ignore[assignment]
logs_view.set_message(details)
self.toast.connect(

View File

@@ -125,7 +125,7 @@ class ClanStore:
def log_details(self, vm: VMObject, gfile: Gio.File) -> None:
views = ViewStack.use().view
logs_view: Logs = views.get_child_by_name("logs") # type: ignore
logs_view: Logs = views.get_child_by_name("logs") # type: ignore[assignment]
def file_read_callback(
source_object: Gio.File,

View File

@@ -283,7 +283,7 @@ class ClanList(Gtk.Box):
views = ViewStack.use().view
# Reset the logs view
logs: Logs = views.get_child_by_name("logs") # type: ignore
logs: Logs = views.get_child_by_name("logs") # type: ignore[assignment]
if logs is None:
msg = "Logs view not found"

View File

@@ -50,14 +50,14 @@ class Logs(Gtk.Box):
buffer = self.text_view.get_buffer()
buffer.set_text(message)
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore[misc]
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
def append_message(self, message: str) -> None:
"""Append to the end of a potentially existent log message"""
buffer = self.text_view.get_buffer()
end_iter = buffer.get_end_iter()
buffer.insert(end_iter, message) # type: ignore
buffer.insert(end_iter, message) # type: ignore[misc]
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore
mark = buffer.create_mark(None, buffer.get_end_iter(), False) # type: ignore[misc]
self.text_view.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)