Merge pull request 'clan-cli: also package mimetypes' (#593) from Mic92-type-checking into main

This commit is contained in:
clan-bot
2023-11-30 13:33:49 +00:00
17 changed files with 108 additions and 97 deletions

View File

@@ -19,8 +19,8 @@ test_driver = ["py.typed"]
target-version = "py311" target-version = "py311"
line-length = 88 line-length = 88
select = ["E", "F", "I", "U", "N", "RUF"] select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
ignore = ["E501"] ignore = ["E501", "ANN101", "ANN401", "A003"]
[tool.mypy] [tool.mypy]
python_version = "3.11" python_version = "3.11"

View File

@@ -36,7 +36,7 @@ def retry(fn: Callable, timeout: int = 900) -> None:
class Machine: class Machine:
def __init__(self, name: str, toplevel: Path, rootdir: Path, out_dir: str): def __init__(self, name: str, toplevel: Path, rootdir: Path, out_dir: str) -> None:
self.name = name self.name = name
self.toplevel = toplevel self.toplevel = toplevel
self.out_dir = out_dir self.out_dir = out_dir
@@ -198,7 +198,7 @@ class Machine:
timing out. timing out.
""" """
def check_active(_: Any) -> bool: def check_active(_: bool) -> bool:
info = self.get_unit_info(unit) info = self.get_unit_info(unit)
state = info["ActiveState"] state = info["ActiveState"]
if state == "failed": if state == "failed":
@@ -247,7 +247,7 @@ def setup_filesystems() -> None:
class Driver: class Driver:
def __init__(self, containers: list[Path], testscript: str, out_dir: str): def __init__(self, containers: list[Path], testscript: str, out_dir: str) -> None:
self.containers = containers self.containers = containers
self.testscript = testscript self.testscript = testscript
self.out_dir = out_dir self.out_dir = out_dir

View File

@@ -7,11 +7,12 @@ import socket
import subprocess import subprocess
import time import time
import urllib.request import urllib.request
from collections.abc import Iterator
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any, Iterator, Optional from typing import Any
class ClanError(Exception): class ClanError(Exception):
@@ -37,7 +38,7 @@ def try_connect_port(port: int) -> bool:
return result == 0 return result == 0
def find_free_port() -> Optional[int]: def find_free_port() -> int | None:
"""Find an unused localhost port from 1024-65535 and return it.""" """Find an unused localhost port from 1024-65535 and return it."""
with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock: with contextlib.closing(socket.socket(type=socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0)) sock.bind(("127.0.0.1", 0))
@@ -69,7 +70,7 @@ class ZerotierController:
path: str, path: str,
method: str = "GET", method: str = "GET",
headers: dict[str, str] = {}, headers: dict[str, str] = {},
data: Optional[dict[str, Any]] = None, data: dict[str, Any] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
body = None body = None
headers = headers.copy() headers = headers.copy()
@@ -92,8 +93,8 @@ class ZerotierController:
data=data, data=data,
) )
def get_network(self, id: str) -> dict[str, Any]: def get_network(self, network_id: str) -> dict[str, Any]:
return self._http_request(f"/controller/network/{id}") return self._http_request(f"/controller/network/{network_id}")
@contextmanager @contextmanager

View File

@@ -21,28 +21,28 @@ log = logging.getLogger(__name__)
# nixos option type description to python type # nixos option type description to python type
def map_type(type: str) -> Any: def map_type(nix_type: str) -> Any:
if type == "boolean": if nix_type == "boolean":
return bool return bool
elif type in [ elif nix_type in [
"integer", "integer",
"signed integer", "signed integer",
"16 bit unsigned integer; between 0 and 65535 (both inclusive)", "16 bit unsigned integer; between 0 and 65535 (both inclusive)",
]: ]:
return int return int
elif type.startswith("string"): elif nix_type.startswith("string"):
return str return str
elif type.startswith("null or "): elif nix_type.startswith("null or "):
subtype = type.removeprefix("null or ") subtype = nix_type.removeprefix("null or ")
return map_type(subtype) | None return map_type(subtype) | None
elif type.startswith("attribute set of"): elif nix_type.startswith("attribute set of"):
subtype = type.removeprefix("attribute set of ") subtype = nix_type.removeprefix("attribute set of ")
return dict[str, map_type(subtype)] # type: ignore return dict[str, map_type(subtype)] # type: ignore
elif type.startswith("list of"): elif nix_type.startswith("list of"):
subtype = type.removeprefix("list of ") subtype = nix_type.removeprefix("list of ")
return list[map_type(subtype)] # type: ignore return list[map_type(subtype)] # type: ignore
else: else:
raise ClanError(f"Unknown type {type}") raise ClanError(f"Unknown type {nix_type}")
# merge two dicts recursively # merge two dicts recursively
@@ -70,10 +70,10 @@ class AllContainer(list):
# value is always a list, as the arg parser cannot know the type upfront # value is always a list, as the arg parser cannot know the type upfront
# and therefore always allows multiple arguments. # and therefore always allows multiple arguments.
def cast(value: Any, type: Any, opt_description: str) -> Any: def cast(value: Any, input_type: Any, opt_description: str) -> Any:
try: try:
# handle bools # handle bools
if isinstance(type, bool): if isinstance(input_type, bool):
if value[0] in ["true", "True", "yes", "y", "1"]: if value[0] in ["true", "True", "yes", "y", "1"]:
return True return True
elif value[0] in ["false", "False", "no", "n", "0"]: elif value[0] in ["false", "False", "no", "n", "0"]:
@@ -81,28 +81,28 @@ def cast(value: Any, type: Any, opt_description: str) -> Any:
else: else:
raise ClanError(f"Invalid value {value} for boolean") raise ClanError(f"Invalid value {value} for boolean")
# handle lists # handle lists
elif get_origin(type) == list: elif get_origin(input_type) == list:
subtype = type.__args__[0] subtype = input_type.__args__[0]
return [cast([x], subtype, opt_description) for x in value] return [cast([x], subtype, opt_description) for x in value]
# handle dicts # handle dicts
elif get_origin(type) == dict: elif get_origin(input_type) == dict:
if not isinstance(value, dict): if not isinstance(value, dict):
raise ClanError( raise ClanError(
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>" f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
) )
subtype = type.__args__[1] subtype = input_type.__args__[1]
return {k: cast(v, subtype, opt_description) for k, v in value.items()} return {k: cast(v, subtype, opt_description) for k, v in value.items()}
elif str(type) == "typing.Optional[str]": elif str(input_type) == "str | None":
if value[0] in ["null", "None"]: if value[0] in ["null", "None"]:
return None return None
return value[0] return value[0]
else: else:
if len(value) > 1: if len(value) > 1:
raise ClanError(f"Too many values for {opt_description}") raise ClanError(f"Too many values for {opt_description}")
return type(value[0]) return input_type(value[0])
except ValueError: except ValueError:
raise ClanError( raise ClanError(
f"Invalid type for option {opt_description} (expected {type.__name__})" f"Invalid type for option {opt_description} (expected {input_type.__name__})"
) )

View File

@@ -1,14 +1,19 @@
from collections.abc import Callable from collections.abc import Callable
from types import ModuleType from types import ModuleType
from typing import Any from typing import Any, Protocol
class AnyCall(Protocol):
def __call__(self, *args: Any, **kwargs: Any) -> Any:
...
class FakeDeal: class FakeDeal:
def __getattr__(self, name: str) -> "Callable": def __getattr__(self, name: str) -> AnyCall:
return self.mock_call return self.mock_call
def mock_call(self, *args: Any, **kwargs: Any) -> Callable: def mock_call(self, *args: Any, **kwargs: Any) -> Callable[[AnyCall], AnyCall]:
def wrapper(func: Callable) -> Callable: def wrapper(func: AnyCall) -> AnyCall:
return func return func
return wrapper return wrapper

View File

@@ -1,24 +1,17 @@
import logging import logging
import multiprocessing as mp
import os import os
import shlex import shlex
import stat import stat
import subprocess import subprocess
import sys import sys
import time import time
from collections.abc import Callable
from pathlib import Path from pathlib import Path
from typing import Any
import ipdb import ipdb
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def command_exec(cmd: list[str], work_dir: Path, env: dict[str, str]) -> None:
subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve())
def block_for_input() -> None: def block_for_input() -> None:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
procid = os.getpid() procid = os.getpid()
@@ -66,8 +59,9 @@ def breakpoint_shell(
if cmd is not None: if cmd is not None:
mycommand = shlex.join(cmd) mycommand = shlex.join(cmd)
write_command(mycommand, work_dir / "cmd.sh") write_command(mycommand, work_dir / "cmd.sh")
proc = spawn_process(func=command_exec, cmd=args, work_dir=work_dir, env=env) proc = subprocess.Popen(args, env=env, cwd=work_dir.resolve())
with proc:
try: try:
ipdb.set_trace() ipdb.set_trace()
finally: finally:
@@ -83,15 +77,6 @@ def write_command(command: str, loc: Path) -> None:
os.chmod(loc, st.st_mode | stat.S_IEXEC) os.chmod(loc, st.st_mode | stat.S_IEXEC)
def spawn_process(func: Callable, **kwargs: Any) -> mp.Process:
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method(method="spawn")
proc = mp.Process(target=func, name="python-debug-process", kwargs=kwargs)
proc.start()
return proc
def dump_env(env: dict[str, str], loc: Path) -> None: def dump_env(env: dict[str, str], loc: Path) -> None:
cenv = env.copy() cenv = env.copy()
log.info("Dumping environment variables to %s", loc) log.info("Dumping environment variables to %s", loc)

View File

@@ -8,7 +8,7 @@ class ClanHttpError(ClanError):
status_code: int status_code: int
msg: str msg: str
def __init__(self, status_code: int, msg: str): def __init__(self, status_code: int, msg: str) -> None:
self.status_code = status_code self.status_code = status_code
self.msg = msg self.msg = msg
super().__init__(msg) super().__init__(msg)

View File

@@ -11,7 +11,7 @@ async def add_flake(path: Path) -> dict[str, CmdOut]:
user_history_file().parent.mkdir(parents=True, exist_ok=True) user_history_file().parent.mkdir(parents=True, exist_ok=True)
# append line to history file # append line to history file
# TODO: Make this atomic # TODO: Make this atomic
lines: set = set() lines: set[str] = set()
if user_history_file().exists(): if user_history_file().exists():
with open(user_history_file()) as f: with open(user_history_file()) as f:
lines = set(f.readlines()) lines = set(f.readlines())

View File

@@ -425,14 +425,14 @@ class Host:
sudo = "" sudo = ""
if become_root and self.user != "root": if become_root and self.user != "root":
sudo = "sudo -- " sudo = "sudo -- "
vars = [] env_vars = []
for k, v in extra_env.items(): for k, v in extra_env.items():
vars.append(f"{shlex.quote(k)}={shlex.quote(v)}") env_vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
displayed_cmd = "" displayed_cmd = ""
export_cmd = "" export_cmd = ""
if vars: if env_vars:
export_cmd = f"export {' '.join(vars)}; " export_cmd = f"export {' '.join(env_vars)}; "
displayed_cmd += export_cmd displayed_cmd += export_cmd
if isinstance(cmd, list): if isinstance(cmd, list):
displayed_cmd += " ".join(cmd) displayed_cmd += " ".join(cmd)
@@ -469,7 +469,7 @@ class Host:
def ssh_cmd( def ssh_cmd(
self, self,
verbose_ssh: bool = False, verbose_ssh: bool = False,
) -> list: ) -> list[str]:
if self.user is not None: if self.user is not None:
ssh_target = f"{self.user}@{self.host}" ssh_target = f"{self.user}@{self.host}"
else: else:

View File

@@ -21,8 +21,8 @@ from .errors import ClanError
class Command: class Command:
def __init__(self, log: logging.Logger) -> None: def __init__(self, log: logging.Logger) -> None:
self.log: logging.Logger = log self.log: logging.Logger = log
self.p: subprocess.Popen | None = None self.p: subprocess.Popen[str] | None = None
self._output: queue.SimpleQueue = queue.SimpleQueue() self._output: queue.SimpleQueue[str | None] = queue.SimpleQueue()
self.returncode: int | None = None self.returncode: int | None = None
self.done: bool = False self.done: bool = False
self.stdout: list[str] = [] self.stdout: list[str] = []
@@ -148,8 +148,8 @@ class BaseTask:
for line in proc.stderr: for line in proc.stderr:
yield line yield line
else: else:
while line := proc._output.get(): while maybe_line := proc._output.get():
yield line yield maybe_line
def commands(self) -> Iterator[Command]: def commands(self) -> Iterator[Command]:
yield from self.procs yield from self.procs

View File

@@ -12,7 +12,7 @@ scripts = { clan = "clan_cli:main" }
exclude = ["clan_cli.nixpkgs*"] exclude = ["clan_cli.nixpkgs*"]
[tool.setuptools.package-data] [tool.setuptools.package-data]
clan_cli = ["config/jsonschema/*", "webui/assets/**/*"] clan_cli = ["config/jsonschema/*", "webui/assets/**/*", "vms/mimetypes/**/*"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = "tests" testpaths = "tests"
@@ -55,5 +55,5 @@ ignore_missing_imports = true
[tool.ruff] [tool.ruff]
target-version = "py311" target-version = "py311"
line-length = 88 line-length = 88
select = ["E", "F", "I", "U", "N", "RUF"] select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
ignore = ["E501", "E402"] ignore = ["E501", "E402", "ANN101", "ANN401", "A003"]

View File

@@ -38,14 +38,14 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it. # FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
# we use .direnv instead since it's already in .gitignore # we use .direnv instead since it's already in .gitignore
with TemporaryDirectory() as _dir: with TemporaryDirectory() as _dir:
dir = Path(_dir) tmpdir = Path(_dir)
host_key = test_root / "data" / "ssh_host_ed25519_key" host_key = test_root / "data" / "ssh_host_ed25519_key"
host_key.chmod(0o600) host_key.chmod(0o600)
template = (test_root / "data" / "sshd_config").read_text() template = (test_root / "data" / "sshd_config").read_text()
content = string.Template(template).substitute(dict(host_key=host_key)) content = string.Template(template).substitute(dict(host_key=host_key))
config = dir / "sshd_config" config = tmpdir / "sshd_config"
config.write_text(content) config.write_text(content)
login_shell = dir / "shell" login_shell = tmpdir / "shell"
bash = shutil.which("bash") bash = shutil.which("bash")
path = os.environ["PATH"] path = os.environ["PATH"]
@@ -72,7 +72,7 @@ exec {bash} -l "${{@}}"
), "we do not support the ld_preload trick on non-linux just now" ), "we do not support the ld_preload trick on non-linux just now"
# This enforces a login shell by overriding the login shell of `getpwnam(3)` # This enforces a login shell by overriding the login shell of `getpwnam(3)`
lib_path = dir / "libgetpwnam-preload.so" lib_path = tmpdir / "libgetpwnam-preload.so"
subprocess.run( subprocess.run(
[ [
os.environ.get("CC", "cc"), os.environ.get("CC", "cc"),

View File

@@ -1,7 +1,7 @@
import json import json
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any
import pytest import pytest
from cli import Cli from cli import Cli
@@ -211,13 +211,16 @@ def test_map_type() -> None:
# test the cast function with simple types # test the cast function with simple types
def test_cast() -> None: def test_cast() -> None:
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
assert ( assert (
config.cast(value=["null"], type=Optional[str], opt_description="foo-option") # noqa: UP007 config.cast(value=["true"], input_type=bool, opt_description="foo-option")
is True
)
assert (
config.cast(value=["null"], input_type=str | None, opt_description="foo-option")
is None is None
) )
assert ( assert (
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option") # noqa: UP007 config.cast(value=["bar"], input_type=str | None, opt_description="foo-option")
== "bar" == "bar"
) )

View File

@@ -3,6 +3,7 @@
import argparse import argparse
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any
import gi import gi
@@ -14,7 +15,7 @@ from .ui.clan_select_list import ClanSelectPage
class VM: class VM:
def __init__(self, url: str, autostart: bool, path: Path): def __init__(self, url: str, autostart: bool, path: Path) -> None:
self.url = url self.url = url
self.autostart = autostart self.autostart = autostart
self.path = path self.path = path
@@ -32,7 +33,7 @@ vms.extend(vms)
class ClanJoinPage(Gtk.Box): class ClanJoinPage(Gtk.Box):
def __init__(self): def __init__(self) -> None:
super().__init__() super().__init__()
self.page = Gtk.Box() self.page = Gtk.Box()
self.set_border_width(10) self.set_border_width(10)
@@ -60,22 +61,22 @@ class MainWindow(Gtk.ApplicationWindow):
# Must be called AFTER all components were added # Must be called AFTER all components were added
self.show_all() self.show_all()
def on_quit(self, *args): def on_quit(self, *args: Any) -> None:
Gio.Application.quit(self.get_application()) Gio.Application.quit(self.get_application())
class Application(Gtk.Application): class Application(Gtk.Application):
def __init__(self): def __init__(self) -> None:
super().__init__( super().__init__(
application_id=constants["APPID"], flags=Gio.ApplicationFlags.FLAGS_NONE application_id=constants["APPID"], flags=Gio.ApplicationFlags.FLAGS_NONE
) )
self.init_style() self.init_style()
def do_startup(self): def do_startup(self) -> None:
Gtk.Application.do_startup(self) Gtk.Application.do_startup(self)
Gtk.init(sys.argv) Gtk.init(sys.argv)
def do_activate(self): def do_activate(self) -> None:
win = self.props.active_window win = self.props.active_window
if not win: if not win:
# win = SwitchTreeView(application=self) # win = SwitchTreeView(application=self)
@@ -83,7 +84,7 @@ class Application(Gtk.Application):
win.present() win.present()
# TODO: For css styling # TODO: For css styling
def init_style(self): def init_style(self) -> None:
pass pass
# css_provider = Gtk.CssProvider() # css_provider = Gtk.CssProvider()
# css_provider.load_from_resource(constants['RESOURCEID'] + '/style.css') # css_provider.load_from_resource(constants['RESOURCEID'] + '/style.css')

View File

@@ -1,8 +1,14 @@
from collections.abc import Callable
from typing import TYPE_CHECKING
from gi.repository import Gtk from gi.repository import Gtk
if TYPE_CHECKING:
from ..app import VM
class ClanSelectPage(Gtk.Box): class ClanSelectPage(Gtk.Box):
def __init__(self, vms): def __init__(self, vms: list["VM"]) -> None:
super().__init__(orientation=Gtk.Orientation.VERTICAL, expand=True) super().__init__(orientation=Gtk.Orientation.VERTICAL, expand=True)
self.add(ClanSelectList(vms, self.on_cell_toggled, self.on_select_row)) self.add(ClanSelectList(vms, self.on_cell_toggled, self.on_select_row))
@@ -12,16 +18,16 @@ class ClanSelectPage(Gtk.Box):
) )
) )
def on_start_clicked(self, widget): def on_start_clicked(self, widget: Gtk.Widget) -> None:
print("Start clicked") print("Start clicked")
def on_stop_clicked(self, widget): def on_stop_clicked(self, widget: Gtk.Widget) -> None:
print("Stop clicked") print("Stop clicked")
def on_backup_clicked(self, widget): def on_backup_clicked(self, widget: Gtk.Widget) -> None:
print("Backup clicked") print("Backup clicked")
def on_cell_toggled(self, widget, path): def on_cell_toggled(self, widget: Gtk.Widget, path: str) -> None:
print(f"on_cell_toggled: {path}") print(f"on_cell_toggled: {path}")
# Get the current value from the model # Get the current value from the model
current_value = self.list_store[path][1] current_value = self.list_store[path][1]
@@ -32,14 +38,19 @@ class ClanSelectPage(Gtk.Box):
# Print the updated value # Print the updated value
print("Switched", path, "to", self.list_store[path][1]) print("Switched", path, "to", self.list_store[path][1])
def on_select_row(self, selection): def on_select_row(self, selection: Gtk.TreeSelection) -> None:
model, row = selection.get_selected() model, row = selection.get_selected()
if row is not None: if row is not None:
print(f"Selected {model[row][0]}") print(f"Selected {model[row][0]}")
class ClanSelectButtons(Gtk.Box): class ClanSelectButtons(Gtk.Box):
def __init__(self, on_start_clicked, on_stop_clicked, on_backup_clicked): def __init__(
self,
on_start_clicked: Callable[[Gtk.Widget], None],
on_stop_clicked: Callable[[Gtk.Widget], None],
on_backup_clicked: Callable[[Gtk.Widget], None],
) -> None:
super().__init__( super().__init__(
orientation=Gtk.Orientation.HORIZONTAL, margin_bottom=10, margin_top=10 orientation=Gtk.Orientation.HORIZONTAL, margin_bottom=10, margin_top=10
) )
@@ -56,7 +67,12 @@ class ClanSelectButtons(Gtk.Box):
class ClanSelectList(Gtk.Box): class ClanSelectList(Gtk.Box):
def __init__(self, vms, on_cell_toggled, on_select_row): def __init__(
self,
vms: list["VM"],
on_cell_toggled: Callable[[Gtk.Widget, str], None],
on_select_row: Callable[[Gtk.TreeSelection], None],
) -> None:
super().__init__(expand=True) super().__init__(expand=True)
self.vms = vms self.vms = vms

View File

@@ -24,5 +24,5 @@ ignore_missing_imports = true
[tool.ruff] [tool.ruff]
target-version = "py311" target-version = "py311"
line-length = 88 line-length = 88
select = ["E", "F", "I", "N", "RUF", "U"] select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
ignore = ["E501", "E402", "N802"] ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]

View File

@@ -9,6 +9,6 @@ exclude = "clan_cli.nixpkgs"
[tool.ruff] [tool.ruff]
line-length = 88 line-length = 88
target-version = "py311"
select = [ "E", "F", "I", "U", "N"] select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
ignore = [ "E501" ] ignore = [ "E501", "ANN101", "ANN401", "A003"]