Merge pull request 'clan-cli: also package mimetypes' (#593) from Mic92-type-checking into main
This commit is contained in:
@@ -19,8 +19,8 @@ test_driver = ["py.typed"]
|
|||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
line-length = 88
|
line-length = 88
|
||||||
|
|
||||||
select = ["E", "F", "I", "U", "N", "RUF"]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = ["E501"]
|
ignore = ["E501", "ANN101", "ANN401", "A003"]
|
||||||
|
|
||||||
[tool.mypy]
|
[tool.mypy]
|
||||||
python_version = "3.11"
|
python_version = "3.11"
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ def retry(fn: Callable, timeout: int = 900) -> None:
|
|||||||
|
|
||||||
|
|
||||||
class Machine:
|
class Machine:
|
||||||
def __init__(self, name: str, toplevel: Path, rootdir: Path, out_dir: str):
|
def __init__(self, name: str, toplevel: Path, rootdir: Path, out_dir: str) -> None:
|
||||||
self.name = name
|
self.name = name
|
||||||
self.toplevel = toplevel
|
self.toplevel = toplevel
|
||||||
self.out_dir = out_dir
|
self.out_dir = out_dir
|
||||||
@@ -198,7 +198,7 @@ class Machine:
|
|||||||
timing out.
|
timing out.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def check_active(_: Any) -> bool:
|
def check_active(_: bool) -> bool:
|
||||||
info = self.get_unit_info(unit)
|
info = self.get_unit_info(unit)
|
||||||
state = info["ActiveState"]
|
state = info["ActiveState"]
|
||||||
if state == "failed":
|
if state == "failed":
|
||||||
@@ -247,7 +247,7 @@ def setup_filesystems() -> None:
|
|||||||
|
|
||||||
|
|
||||||
class Driver:
|
class Driver:
|
||||||
def __init__(self, containers: list[Path], testscript: str, out_dir: str):
|
def __init__(self, containers: list[Path], testscript: str, out_dir: str) -> None:
|
||||||
self.containers = containers
|
self.containers = containers
|
||||||
self.testscript = testscript
|
self.testscript = testscript
|
||||||
self.out_dir = out_dir
|
self.out_dir = out_dir
|
||||||
|
|||||||
@@ -7,11 +7,12 @@ import socket
|
|||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from collections.abc import Iterator
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
from typing import Any, Iterator, Optional
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
class ClanError(Exception):
|
class ClanError(Exception):
|
||||||
@@ -37,7 +38,7 @@ def try_connect_port(port: int) -> bool:
|
|||||||
return result == 0
|
return result == 0
|
||||||
|
|
||||||
|
|
||||||
def find_free_port() -> Optional[int]:
|
def find_free_port() -> int | None:
|
||||||
"""Find an unused localhost port from 1024-65535 and return it."""
|
"""Find an unused localhost port from 1024-65535 and return it."""
|
||||||
with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock:
|
with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock:
|
||||||
sock.bind(("127.0.0.1", 0))
|
sock.bind(("127.0.0.1", 0))
|
||||||
@@ -69,7 +70,7 @@ class ZerotierController:
|
|||||||
path: str,
|
path: str,
|
||||||
method: str = "GET",
|
method: str = "GET",
|
||||||
headers: dict[str, str] = {},
|
headers: dict[str, str] = {},
|
||||||
data: Optional[dict[str, Any]] = None,
|
data: dict[str, Any] | None = None,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
body = None
|
body = None
|
||||||
headers = headers.copy()
|
headers = headers.copy()
|
||||||
@@ -92,8 +93,8 @@ class ZerotierController:
|
|||||||
data=data,
|
data=data,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_network(self, id: str) -> dict[str, Any]:
|
def get_network(self, network_id: str) -> dict[str, Any]:
|
||||||
return self._http_request(f"/controller/network/{id}")
|
return self._http_request(f"/controller/network/{network_id}")
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
|||||||
@@ -21,28 +21,28 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
# nixos option type description to python type
|
# nixos option type description to python type
|
||||||
def map_type(type: str) -> Any:
|
def map_type(nix_type: str) -> Any:
|
||||||
if type == "boolean":
|
if nix_type == "boolean":
|
||||||
return bool
|
return bool
|
||||||
elif type in [
|
elif nix_type in [
|
||||||
"integer",
|
"integer",
|
||||||
"signed integer",
|
"signed integer",
|
||||||
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
||||||
]:
|
]:
|
||||||
return int
|
return int
|
||||||
elif type.startswith("string"):
|
elif nix_type.startswith("string"):
|
||||||
return str
|
return str
|
||||||
elif type.startswith("null or "):
|
elif nix_type.startswith("null or "):
|
||||||
subtype = type.removeprefix("null or ")
|
subtype = nix_type.removeprefix("null or ")
|
||||||
return map_type(subtype) | None
|
return map_type(subtype) | None
|
||||||
elif type.startswith("attribute set of"):
|
elif nix_type.startswith("attribute set of"):
|
||||||
subtype = type.removeprefix("attribute set of ")
|
subtype = nix_type.removeprefix("attribute set of ")
|
||||||
return dict[str, map_type(subtype)] # type: ignore
|
return dict[str, map_type(subtype)] # type: ignore
|
||||||
elif type.startswith("list of"):
|
elif nix_type.startswith("list of"):
|
||||||
subtype = type.removeprefix("list of ")
|
subtype = nix_type.removeprefix("list of ")
|
||||||
return list[map_type(subtype)] # type: ignore
|
return list[map_type(subtype)] # type: ignore
|
||||||
else:
|
else:
|
||||||
raise ClanError(f"Unknown type {type}")
|
raise ClanError(f"Unknown type {nix_type}")
|
||||||
|
|
||||||
|
|
||||||
# merge two dicts recursively
|
# merge two dicts recursively
|
||||||
@@ -70,10 +70,10 @@ class AllContainer(list):
|
|||||||
|
|
||||||
# value is always a list, as the arg parser cannot know the type upfront
|
# value is always a list, as the arg parser cannot know the type upfront
|
||||||
# and therefore always allows multiple arguments.
|
# and therefore always allows multiple arguments.
|
||||||
def cast(value: Any, type: Any, opt_description: str) -> Any:
|
def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||||
try:
|
try:
|
||||||
# handle bools
|
# handle bools
|
||||||
if isinstance(type, bool):
|
if isinstance(input_type, bool):
|
||||||
if value[0] in ["true", "True", "yes", "y", "1"]:
|
if value[0] in ["true", "True", "yes", "y", "1"]:
|
||||||
return True
|
return True
|
||||||
elif value[0] in ["false", "False", "no", "n", "0"]:
|
elif value[0] in ["false", "False", "no", "n", "0"]:
|
||||||
@@ -81,28 +81,28 @@ def cast(value: Any, type: Any, opt_description: str) -> Any:
|
|||||||
else:
|
else:
|
||||||
raise ClanError(f"Invalid value {value} for boolean")
|
raise ClanError(f"Invalid value {value} for boolean")
|
||||||
# handle lists
|
# handle lists
|
||||||
elif get_origin(type) == list:
|
elif get_origin(input_type) == list:
|
||||||
subtype = type.__args__[0]
|
subtype = input_type.__args__[0]
|
||||||
return [cast([x], subtype, opt_description) for x in value]
|
return [cast([x], subtype, opt_description) for x in value]
|
||||||
# handle dicts
|
# handle dicts
|
||||||
elif get_origin(type) == dict:
|
elif get_origin(input_type) == dict:
|
||||||
if not isinstance(value, dict):
|
if not isinstance(value, dict):
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||||
)
|
)
|
||||||
subtype = type.__args__[1]
|
subtype = input_type.__args__[1]
|
||||||
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
||||||
elif str(type) == "typing.Optional[str]":
|
elif str(input_type) == "str | None":
|
||||||
if value[0] in ["null", "None"]:
|
if value[0] in ["null", "None"]:
|
||||||
return None
|
return None
|
||||||
return value[0]
|
return value[0]
|
||||||
else:
|
else:
|
||||||
if len(value) > 1:
|
if len(value) > 1:
|
||||||
raise ClanError(f"Too many values for {opt_description}")
|
raise ClanError(f"Too many values for {opt_description}")
|
||||||
return type(value[0])
|
return input_type(value[0])
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
f"Invalid type for option {opt_description} (expected {type.__name__})"
|
f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,19 @@
|
|||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import Any
|
from typing import Any, Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class AnyCall(Protocol):
|
||||||
|
def __call__(self, *args: Any, **kwargs: Any) -> Any:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class FakeDeal:
|
class FakeDeal:
|
||||||
def __getattr__(self, name: str) -> "Callable":
|
def __getattr__(self, name: str) -> AnyCall:
|
||||||
return self.mock_call
|
return self.mock_call
|
||||||
|
|
||||||
def mock_call(self, *args: Any, **kwargs: Any) -> Callable:
|
def mock_call(self, *args: Any, **kwargs: Any) -> Callable[[AnyCall], AnyCall]:
|
||||||
def wrapper(func: Callable) -> Callable:
|
def wrapper(func: AnyCall) -> AnyCall:
|
||||||
return func
|
return func
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|||||||
@@ -1,24 +1,17 @@
|
|||||||
import logging
|
import logging
|
||||||
import multiprocessing as mp
|
|
||||||
import os
|
import os
|
||||||
import shlex
|
import shlex
|
||||||
import stat
|
import stat
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
from collections.abc import Callable
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import ipdb
|
import ipdb
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def command_exec(cmd: list[str], work_dir: Path, env: dict[str, str]) -> None:
|
|
||||||
subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve())
|
|
||||||
|
|
||||||
|
|
||||||
def block_for_input() -> None:
|
def block_for_input() -> None:
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
procid = os.getpid()
|
procid = os.getpid()
|
||||||
@@ -66,12 +59,13 @@ def breakpoint_shell(
|
|||||||
if cmd is not None:
|
if cmd is not None:
|
||||||
mycommand = shlex.join(cmd)
|
mycommand = shlex.join(cmd)
|
||||||
write_command(mycommand, work_dir / "cmd.sh")
|
write_command(mycommand, work_dir / "cmd.sh")
|
||||||
proc = spawn_process(func=command_exec, cmd=args, work_dir=work_dir, env=env)
|
proc = subprocess.Popen(args, env=env, cwd=work_dir.resolve())
|
||||||
|
|
||||||
try:
|
with proc:
|
||||||
ipdb.set_trace()
|
try:
|
||||||
finally:
|
ipdb.set_trace()
|
||||||
proc.terminate()
|
finally:
|
||||||
|
proc.terminate()
|
||||||
|
|
||||||
|
|
||||||
def write_command(command: str, loc: Path) -> None:
|
def write_command(command: str, loc: Path) -> None:
|
||||||
@@ -83,15 +77,6 @@ def write_command(command: str, loc: Path) -> None:
|
|||||||
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
||||||
|
|
||||||
|
|
||||||
def spawn_process(func: Callable, **kwargs: Any) -> mp.Process:
|
|
||||||
if mp.get_start_method(allow_none=True) is None:
|
|
||||||
mp.set_start_method(method="spawn")
|
|
||||||
|
|
||||||
proc = mp.Process(target=func, name="python-debug-process", kwargs=kwargs)
|
|
||||||
proc.start()
|
|
||||||
return proc
|
|
||||||
|
|
||||||
|
|
||||||
def dump_env(env: dict[str, str], loc: Path) -> None:
|
def dump_env(env: dict[str, str], loc: Path) -> None:
|
||||||
cenv = env.copy()
|
cenv = env.copy()
|
||||||
log.info("Dumping environment variables to %s", loc)
|
log.info("Dumping environment variables to %s", loc)
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ class ClanHttpError(ClanError):
|
|||||||
status_code: int
|
status_code: int
|
||||||
msg: str
|
msg: str
|
||||||
|
|
||||||
def __init__(self, status_code: int, msg: str):
|
def __init__(self, status_code: int, msg: str) -> None:
|
||||||
self.status_code = status_code
|
self.status_code = status_code
|
||||||
self.msg = msg
|
self.msg = msg
|
||||||
super().__init__(msg)
|
super().__init__(msg)
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ async def add_flake(path: Path) -> dict[str, CmdOut]:
|
|||||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||||
# append line to history file
|
# append line to history file
|
||||||
# TODO: Make this atomic
|
# TODO: Make this atomic
|
||||||
lines: set = set()
|
lines: set[str] = set()
|
||||||
if user_history_file().exists():
|
if user_history_file().exists():
|
||||||
with open(user_history_file()) as f:
|
with open(user_history_file()) as f:
|
||||||
lines = set(f.readlines())
|
lines = set(f.readlines())
|
||||||
|
|||||||
@@ -425,14 +425,14 @@ class Host:
|
|||||||
sudo = ""
|
sudo = ""
|
||||||
if become_root and self.user != "root":
|
if become_root and self.user != "root":
|
||||||
sudo = "sudo -- "
|
sudo = "sudo -- "
|
||||||
vars = []
|
env_vars = []
|
||||||
for k, v in extra_env.items():
|
for k, v in extra_env.items():
|
||||||
vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
env_vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
||||||
|
|
||||||
displayed_cmd = ""
|
displayed_cmd = ""
|
||||||
export_cmd = ""
|
export_cmd = ""
|
||||||
if vars:
|
if env_vars:
|
||||||
export_cmd = f"export {' '.join(vars)}; "
|
export_cmd = f"export {' '.join(env_vars)}; "
|
||||||
displayed_cmd += export_cmd
|
displayed_cmd += export_cmd
|
||||||
if isinstance(cmd, list):
|
if isinstance(cmd, list):
|
||||||
displayed_cmd += " ".join(cmd)
|
displayed_cmd += " ".join(cmd)
|
||||||
@@ -469,7 +469,7 @@ class Host:
|
|||||||
def ssh_cmd(
|
def ssh_cmd(
|
||||||
self,
|
self,
|
||||||
verbose_ssh: bool = False,
|
verbose_ssh: bool = False,
|
||||||
) -> list:
|
) -> list[str]:
|
||||||
if self.user is not None:
|
if self.user is not None:
|
||||||
ssh_target = f"{self.user}@{self.host}"
|
ssh_target = f"{self.user}@{self.host}"
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -21,8 +21,8 @@ from .errors import ClanError
|
|||||||
class Command:
|
class Command:
|
||||||
def __init__(self, log: logging.Logger) -> None:
|
def __init__(self, log: logging.Logger) -> None:
|
||||||
self.log: logging.Logger = log
|
self.log: logging.Logger = log
|
||||||
self.p: subprocess.Popen | None = None
|
self.p: subprocess.Popen[str] | None = None
|
||||||
self._output: queue.SimpleQueue = queue.SimpleQueue()
|
self._output: queue.SimpleQueue[str | None] = queue.SimpleQueue()
|
||||||
self.returncode: int | None = None
|
self.returncode: int | None = None
|
||||||
self.done: bool = False
|
self.done: bool = False
|
||||||
self.stdout: list[str] = []
|
self.stdout: list[str] = []
|
||||||
@@ -148,8 +148,8 @@ class BaseTask:
|
|||||||
for line in proc.stderr:
|
for line in proc.stderr:
|
||||||
yield line
|
yield line
|
||||||
else:
|
else:
|
||||||
while line := proc._output.get():
|
while maybe_line := proc._output.get():
|
||||||
yield line
|
yield maybe_line
|
||||||
|
|
||||||
def commands(self) -> Iterator[Command]:
|
def commands(self) -> Iterator[Command]:
|
||||||
yield from self.procs
|
yield from self.procs
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ scripts = { clan = "clan_cli:main" }
|
|||||||
exclude = ["clan_cli.nixpkgs*"]
|
exclude = ["clan_cli.nixpkgs*"]
|
||||||
|
|
||||||
[tool.setuptools.package-data]
|
[tool.setuptools.package-data]
|
||||||
clan_cli = ["config/jsonschema/*", "webui/assets/**/*"]
|
clan_cli = ["config/jsonschema/*", "webui/assets/**/*", "vms/mimetypes/**/*"]
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
testpaths = "tests"
|
testpaths = "tests"
|
||||||
@@ -55,5 +55,5 @@ ignore_missing_imports = true
|
|||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
line-length = 88
|
line-length = 88
|
||||||
select = ["E", "F", "I", "U", "N", "RUF"]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = ["E501", "E402"]
|
ignore = ["E501", "E402", "ANN101", "ANN401", "A003"]
|
||||||
|
|||||||
@@ -38,14 +38,14 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
|
|||||||
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
|
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
|
||||||
# we use .direnv instead since it's already in .gitignore
|
# we use .direnv instead since it's already in .gitignore
|
||||||
with TemporaryDirectory() as _dir:
|
with TemporaryDirectory() as _dir:
|
||||||
dir = Path(_dir)
|
tmpdir = Path(_dir)
|
||||||
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
||||||
host_key.chmod(0o600)
|
host_key.chmod(0o600)
|
||||||
template = (test_root / "data" / "sshd_config").read_text()
|
template = (test_root / "data" / "sshd_config").read_text()
|
||||||
content = string.Template(template).substitute(dict(host_key=host_key))
|
content = string.Template(template).substitute(dict(host_key=host_key))
|
||||||
config = dir / "sshd_config"
|
config = tmpdir / "sshd_config"
|
||||||
config.write_text(content)
|
config.write_text(content)
|
||||||
login_shell = dir / "shell"
|
login_shell = tmpdir / "shell"
|
||||||
|
|
||||||
bash = shutil.which("bash")
|
bash = shutil.which("bash")
|
||||||
path = os.environ["PATH"]
|
path = os.environ["PATH"]
|
||||||
@@ -72,7 +72,7 @@ exec {bash} -l "${{@}}"
|
|||||||
), "we do not support the ld_preload trick on non-linux just now"
|
), "we do not support the ld_preload trick on non-linux just now"
|
||||||
|
|
||||||
# This enforces a login shell by overriding the login shell of `getpwnam(3)`
|
# This enforces a login shell by overriding the login shell of `getpwnam(3)`
|
||||||
lib_path = dir / "libgetpwnam-preload.so"
|
lib_path = tmpdir / "libgetpwnam-preload.so"
|
||||||
subprocess.run(
|
subprocess.run(
|
||||||
[
|
[
|
||||||
os.environ.get("CC", "cc"),
|
os.environ.get("CC", "cc"),
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Optional
|
from typing import Any
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from cli import Cli
|
from cli import Cli
|
||||||
@@ -211,13 +211,16 @@ def test_map_type() -> None:
|
|||||||
|
|
||||||
# test the cast function with simple types
|
# test the cast function with simple types
|
||||||
def test_cast() -> None:
|
def test_cast() -> None:
|
||||||
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
|
|
||||||
assert (
|
assert (
|
||||||
config.cast(value=["null"], type=Optional[str], opt_description="foo-option") # noqa: UP007
|
config.cast(value=["true"], input_type=bool, opt_description="foo-option")
|
||||||
|
is True
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
config.cast(value=["null"], input_type=str | None, opt_description="foo-option")
|
||||||
is None
|
is None
|
||||||
)
|
)
|
||||||
assert (
|
assert (
|
||||||
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option") # noqa: UP007
|
config.cast(value=["bar"], input_type=str | None, opt_description="foo-option")
|
||||||
== "bar"
|
== "bar"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import gi
|
import gi
|
||||||
|
|
||||||
@@ -14,7 +15,7 @@ from .ui.clan_select_list import ClanSelectPage
|
|||||||
|
|
||||||
|
|
||||||
class VM:
|
class VM:
|
||||||
def __init__(self, url: str, autostart: bool, path: Path):
|
def __init__(self, url: str, autostart: bool, path: Path) -> None:
|
||||||
self.url = url
|
self.url = url
|
||||||
self.autostart = autostart
|
self.autostart = autostart
|
||||||
self.path = path
|
self.path = path
|
||||||
@@ -32,7 +33,7 @@ vms.extend(vms)
|
|||||||
|
|
||||||
|
|
||||||
class ClanJoinPage(Gtk.Box):
|
class ClanJoinPage(Gtk.Box):
|
||||||
def __init__(self):
|
def __init__(self) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.page = Gtk.Box()
|
self.page = Gtk.Box()
|
||||||
self.set_border_width(10)
|
self.set_border_width(10)
|
||||||
@@ -60,22 +61,22 @@ class MainWindow(Gtk.ApplicationWindow):
|
|||||||
# Must be called AFTER all components were added
|
# Must be called AFTER all components were added
|
||||||
self.show_all()
|
self.show_all()
|
||||||
|
|
||||||
def on_quit(self, *args):
|
def on_quit(self, *args: Any) -> None:
|
||||||
Gio.Application.quit(self.get_application())
|
Gio.Application.quit(self.get_application())
|
||||||
|
|
||||||
|
|
||||||
class Application(Gtk.Application):
|
class Application(Gtk.Application):
|
||||||
def __init__(self):
|
def __init__(self) -> None:
|
||||||
super().__init__(
|
super().__init__(
|
||||||
application_id=constants["APPID"], flags=Gio.ApplicationFlags.FLAGS_NONE
|
application_id=constants["APPID"], flags=Gio.ApplicationFlags.FLAGS_NONE
|
||||||
)
|
)
|
||||||
self.init_style()
|
self.init_style()
|
||||||
|
|
||||||
def do_startup(self):
|
def do_startup(self) -> None:
|
||||||
Gtk.Application.do_startup(self)
|
Gtk.Application.do_startup(self)
|
||||||
Gtk.init(sys.argv)
|
Gtk.init(sys.argv)
|
||||||
|
|
||||||
def do_activate(self):
|
def do_activate(self) -> None:
|
||||||
win = self.props.active_window
|
win = self.props.active_window
|
||||||
if not win:
|
if not win:
|
||||||
# win = SwitchTreeView(application=self)
|
# win = SwitchTreeView(application=self)
|
||||||
@@ -83,7 +84,7 @@ class Application(Gtk.Application):
|
|||||||
win.present()
|
win.present()
|
||||||
|
|
||||||
# TODO: For css styling
|
# TODO: For css styling
|
||||||
def init_style(self):
|
def init_style(self) -> None:
|
||||||
pass
|
pass
|
||||||
# css_provider = Gtk.CssProvider()
|
# css_provider = Gtk.CssProvider()
|
||||||
# css_provider.load_from_resource(constants['RESOURCEID'] + '/style.css')
|
# css_provider.load_from_resource(constants['RESOURCEID'] + '/style.css')
|
||||||
|
|||||||
@@ -1,8 +1,14 @@
|
|||||||
|
from collections.abc import Callable
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from gi.repository import Gtk
|
from gi.repository import Gtk
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..app import VM
|
||||||
|
|
||||||
|
|
||||||
class ClanSelectPage(Gtk.Box):
|
class ClanSelectPage(Gtk.Box):
|
||||||
def __init__(self, vms):
|
def __init__(self, vms: list["VM"]) -> None:
|
||||||
super().__init__(orientation=Gtk.Orientation.VERTICAL, expand=True)
|
super().__init__(orientation=Gtk.Orientation.VERTICAL, expand=True)
|
||||||
|
|
||||||
self.add(ClanSelectList(vms, self.on_cell_toggled, self.on_select_row))
|
self.add(ClanSelectList(vms, self.on_cell_toggled, self.on_select_row))
|
||||||
@@ -12,16 +18,16 @@ class ClanSelectPage(Gtk.Box):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
def on_start_clicked(self, widget):
|
def on_start_clicked(self, widget: Gtk.Widget) -> None:
|
||||||
print("Start clicked")
|
print("Start clicked")
|
||||||
|
|
||||||
def on_stop_clicked(self, widget):
|
def on_stop_clicked(self, widget: Gtk.Widget) -> None:
|
||||||
print("Stop clicked")
|
print("Stop clicked")
|
||||||
|
|
||||||
def on_backup_clicked(self, widget):
|
def on_backup_clicked(self, widget: Gtk.Widget) -> None:
|
||||||
print("Backup clicked")
|
print("Backup clicked")
|
||||||
|
|
||||||
def on_cell_toggled(self, widget, path):
|
def on_cell_toggled(self, widget: Gtk.Widget, path: str) -> None:
|
||||||
print(f"on_cell_toggled: {path}")
|
print(f"on_cell_toggled: {path}")
|
||||||
# Get the current value from the model
|
# Get the current value from the model
|
||||||
current_value = self.list_store[path][1]
|
current_value = self.list_store[path][1]
|
||||||
@@ -32,14 +38,19 @@ class ClanSelectPage(Gtk.Box):
|
|||||||
# Print the updated value
|
# Print the updated value
|
||||||
print("Switched", path, "to", self.list_store[path][1])
|
print("Switched", path, "to", self.list_store[path][1])
|
||||||
|
|
||||||
def on_select_row(self, selection):
|
def on_select_row(self, selection: Gtk.TreeSelection) -> None:
|
||||||
model, row = selection.get_selected()
|
model, row = selection.get_selected()
|
||||||
if row is not None:
|
if row is not None:
|
||||||
print(f"Selected {model[row][0]}")
|
print(f"Selected {model[row][0]}")
|
||||||
|
|
||||||
|
|
||||||
class ClanSelectButtons(Gtk.Box):
|
class ClanSelectButtons(Gtk.Box):
|
||||||
def __init__(self, on_start_clicked, on_stop_clicked, on_backup_clicked):
|
def __init__(
|
||||||
|
self,
|
||||||
|
on_start_clicked: Callable[[Gtk.Widget], None],
|
||||||
|
on_stop_clicked: Callable[[Gtk.Widget], None],
|
||||||
|
on_backup_clicked: Callable[[Gtk.Widget], None],
|
||||||
|
) -> None:
|
||||||
super().__init__(
|
super().__init__(
|
||||||
orientation=Gtk.Orientation.HORIZONTAL, margin_bottom=10, margin_top=10
|
orientation=Gtk.Orientation.HORIZONTAL, margin_bottom=10, margin_top=10
|
||||||
)
|
)
|
||||||
@@ -56,7 +67,12 @@ class ClanSelectButtons(Gtk.Box):
|
|||||||
|
|
||||||
|
|
||||||
class ClanSelectList(Gtk.Box):
|
class ClanSelectList(Gtk.Box):
|
||||||
def __init__(self, vms, on_cell_toggled, on_select_row):
|
def __init__(
|
||||||
|
self,
|
||||||
|
vms: list["VM"],
|
||||||
|
on_cell_toggled: Callable[[Gtk.Widget, str], None],
|
||||||
|
on_select_row: Callable[[Gtk.TreeSelection], None],
|
||||||
|
) -> None:
|
||||||
super().__init__(expand=True)
|
super().__init__(expand=True)
|
||||||
self.vms = vms
|
self.vms = vms
|
||||||
|
|
||||||
|
|||||||
@@ -24,5 +24,5 @@ ignore_missing_imports = true
|
|||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
line-length = 88
|
line-length = 88
|
||||||
select = ["E", "F", "I", "N", "RUF", "U"]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = ["E501", "E402", "N802"]
|
ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]
|
||||||
|
|||||||
@@ -9,6 +9,6 @@ exclude = "clan_cli.nixpkgs"
|
|||||||
|
|
||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
line-length = 88
|
line-length = 88
|
||||||
|
target-version = "py311"
|
||||||
select = [ "E", "F", "I", "U", "N"]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = [ "E501" ]
|
ignore = [ "E501", "ANN101", "ANN401", "A003"]
|
||||||
|
|||||||
Reference in New Issue
Block a user