Merge pull request 'more-linting' (#584) from more-linting into main

Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/584
This commit is contained in:
Mic92
2023-11-29 13:29:45 +00:00
47 changed files with 214 additions and 217 deletions

View File

@@ -16,13 +16,14 @@ find = {}
test_driver = ["py.typed"] test_driver = ["py.typed"]
[tool.ruff] [tool.ruff]
target-version = "py311"
line-length = 88 line-length = 88
select = ["E", "F", "I", "U", "N"] select = ["E", "F", "I", "U", "N", "RUF"]
ignore = ["E501"] ignore = ["E501"]
[tool.mypy] [tool.mypy]
python_version = "3.10" python_version = "3.11"
warn_redundant_casts = true warn_redundant_casts = true
disallow_untyped_calls = true disallow_untyped_calls = true
disallow_untyped_defs = true disallow_untyped_defs = true

View File

@@ -1,9 +1,9 @@
# This file contains type hints that can be prepended to Nix test scripts so they can be type # This file contains type hints that can be prepended to Nix test scripts so they can be type
# checked. # checked.
from typing import Callable, List from collections.abc import Callable
from test_driver import Machine from test_driver import Machine
start_all: Callable[[], None] start_all: Callable[[], None]
machines: List[Machine] machines: list[Machine]

View File

@@ -3,9 +3,10 @@ import os
import re import re
import subprocess import subprocess
import time import time
from collections.abc import Callable
from pathlib import Path from pathlib import Path
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Any, Callable, Dict, Optional, Tuple from typing import Any
def prepare_machine_root(machinename: str, root: Path) -> None: def prepare_machine_root(machinename: str, root: Path) -> None:
@@ -88,7 +89,7 @@ class Machine:
except ValueError: except ValueError:
raise RuntimeError(f"Failed to parse child process id {childs[0]}") raise RuntimeError(f"Failed to parse child process id {childs[0]}")
def get_unit_info(self, unit: str) -> Dict[str, str]: def get_unit_info(self, unit: str) -> dict[str, str]:
proc = self.systemctl(f'--no-pager show "{unit}"') proc = self.systemctl(f'--no-pager show "{unit}"')
if proc.returncode != 0: if proc.returncode != 0:
raise Exception( raise Exception(
@@ -98,7 +99,7 @@ class Machine:
line_pattern = re.compile(r"^([^=]+)=(.*)$") line_pattern = re.compile(r"^([^=]+)=(.*)$")
def tuple_from_line(line: str) -> Tuple[str, str]: def tuple_from_line(line: str) -> tuple[str, str]:
match = line_pattern.match(line) match = line_pattern.match(line)
assert match is not None assert match is not None
return match[1], match[2] return match[1], match[2]
@@ -114,7 +115,7 @@ class Machine:
command: str, command: str,
check_return: bool = True, check_return: bool = True,
check_output: bool = True, check_output: bool = True,
timeout: Optional[int] = 900, timeout: int | None = 900,
) -> subprocess.CompletedProcess: ) -> subprocess.CompletedProcess:
""" """
Execute a shell command, returning a list `(status, stdout)`. Execute a shell command, returning a list `(status, stdout)`.

View File

@@ -1,9 +1,10 @@
import argparse import argparse
import logging import logging
import sys import sys
from collections.abc import Sequence
from pathlib import Path from pathlib import Path
from types import ModuleType from types import ModuleType
from typing import Any, Optional, Sequence from typing import Any
from . import config, flakes, machines, secrets, vms, webui from . import config, flakes, machines, secrets, vms, webui
from .custom_logger import setup_logging from .custom_logger import setup_logging
@@ -12,7 +13,7 @@ from .ssh import cli as ssh_cli
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
argcomplete: Optional[ModuleType] = None argcomplete: ModuleType | None = None
try: try:
import argcomplete # type: ignore[no-redef] import argcomplete # type: ignore[no-redef]
except ImportError: except ImportError:
@@ -28,7 +29,7 @@ class AppendOptionAction(argparse.Action):
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
namespace: argparse.Namespace, namespace: argparse.Namespace,
values: str | Sequence[str] | None, values: str | Sequence[str] | None,
option_string: Optional[str] = None, option_string: str | None = None,
) -> None: ) -> None:
lst = getattr(namespace, self.dest) lst = getattr(namespace, self.dest)
lst.append("--option") lst.append("--option")
@@ -37,7 +38,7 @@ class AppendOptionAction(argparse.Action):
lst.append(values[1]) lst.append(values[1])
def create_parser(prog: Optional[str] = None) -> argparse.ArgumentParser: def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog=prog, description="cLAN tool") parser = argparse.ArgumentParser(prog=prog, description="cLAN tool")
parser.add_argument( parser.add_argument(

View File

@@ -1,8 +1,9 @@
import asyncio import asyncio
import logging import logging
import shlex import shlex
from collections.abc import Callable, Coroutine
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, NamedTuple, Optional from typing import Any, NamedTuple
from .custom_logger import get_caller from .custom_logger import get_caller
from .errors import ClanError from .errors import ClanError
@@ -13,10 +14,10 @@ log = logging.getLogger(__name__)
class CmdOut(NamedTuple): class CmdOut(NamedTuple):
stdout: str stdout: str
stderr: str stderr: str
cwd: Optional[Path] = None cwd: Path | None = None
async def run(cmd: list[str], cwd: Optional[Path] = None) -> CmdOut: async def run(cmd: list[str], cwd: Path | None = None) -> CmdOut:
cwd_res = None cwd_res = None
if cwd is not None: if cwd is not None:
if not cwd.exists(): if not cwd.exists():
@@ -52,7 +53,7 @@ stdout:
def runforcli( def runforcli(
func: Callable[..., Coroutine[Any, Any, Dict[str, CmdOut]]], *args: Any func: Callable[..., Coroutine[Any, Any, dict[str, CmdOut]]], *args: Any
) -> None: ) -> None:
try: try:
res = asyncio.run(func(*args)) res = asyncio.run(func(*args))

View File

@@ -1,14 +1,13 @@
import json import json
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Optional
from clan_cli.nix import nix_eval from clan_cli.nix import nix_eval
def get_clan_module_names( def get_clan_module_names(
flake_dir: Path, flake_dir: Path,
) -> tuple[list[str], Optional[str]]: ) -> tuple[list[str], str | None]:
""" """
Get the list of clan modules from the clan-core flake input Get the list of clan modules from the clan-core flake input
""" """

View File

@@ -8,7 +8,7 @@ import shlex
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Tuple, get_origin from typing import Any, get_origin
from clan_cli.dirs import machine_settings_file from clan_cli.dirs import machine_settings_file
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -34,7 +34,7 @@ def map_type(type: str) -> Any:
return str return str
elif type.startswith("null or "): elif type.startswith("null or "):
subtype = type.removeprefix("null or ") subtype = type.removeprefix("null or ")
return Optional[map_type(subtype)] return map_type(subtype) | None
elif type.startswith("attribute set of"): elif type.startswith("attribute set of"):
subtype = type.removeprefix("attribute set of ") subtype = type.removeprefix("attribute set of ")
return dict[str, map_type(subtype)] # type: ignore return dict[str, map_type(subtype)] # type: ignore
@@ -50,7 +50,7 @@ def merge(a: dict, b: dict, path: list[str] = []) -> dict:
for key in b: for key in b:
if key in a: if key in a:
if isinstance(a[key], dict) and isinstance(b[key], dict): if isinstance(a[key], dict) and isinstance(b[key], dict):
merge(a[key], b[key], path + [str(key)]) merge(a[key], b[key], [*path, str(key)])
elif isinstance(a[key], list) and isinstance(b[key], list): elif isinstance(a[key], list) and isinstance(b[key], list):
a[key].extend(b[key]) a[key].extend(b[key])
elif a[key] != b[key]: elif a[key] != b[key]:
@@ -196,8 +196,8 @@ def get_or_set_option(args: argparse.Namespace) -> None:
def find_option( def find_option(
option: str, value: Any, options: dict, option_description: Optional[str] = None option: str, value: Any, options: dict, option_description: str | None = None
) -> Tuple[str, Any]: ) -> tuple[str, Any]:
""" """
The option path specified by the user doesn't have to match exactly to an The option path specified by the user doesn't have to match exactly to an
entry in the options.json file. Examples entry in the options.json file. Examples
@@ -307,7 +307,7 @@ def set_option(
# takes a (sub)parser and configures it # takes a (sub)parser and configures it
def register_parser( def register_parser(
parser: Optional[argparse.ArgumentParser], parser: argparse.ArgumentParser | None,
) -> None: ) -> None:
if parser is None: if parser is None:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@@ -361,7 +361,7 @@ def register_parser(
) )
def main(argv: Optional[list[str]] = None) -> None: def main(argv: list[str] | None = None) -> None:
if argv is None: if argv is None:
argv = sys.argv argv = sys.argv
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()

View File

@@ -4,7 +4,6 @@ import re
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Optional
from clan_cli.dirs import machine_settings_file, nixpkgs_source, specific_machine_dir from clan_cli.dirs import machine_settings_file, nixpkgs_source, specific_machine_dir
from clan_cli.errors import ClanError, ClanHttpError from clan_cli.errors import ClanError, ClanHttpError
@@ -15,8 +14,8 @@ from clan_cli.nix import nix_eval
def verify_machine_config( def verify_machine_config(
flake_dir: Path, flake_dir: Path,
machine_name: str, machine_name: str,
config: Optional[dict] = None, config: dict | None = None,
) -> Optional[str]: ) -> str | None:
""" """
Verify that the machine evaluates successfully Verify that the machine evaluates successfully
Returns a tuple of (success, error_message) Returns a tuple of (success, error_message)

View File

@@ -1,7 +1,7 @@
import json import json
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Type, Union from typing import Any
from ..errors import ClanError from ..errors import ClanError
from ..nix import nix_eval from ..nix import nix_eval
@@ -19,7 +19,7 @@ type_map: dict[str, type] = {
def schema_from_module_file( def schema_from_module_file(
file: Union[str, Path] = f"{script_dir}/jsonschema/example-schema.json", file: str | Path = f"{script_dir}/jsonschema/example-schema.json",
) -> dict[str, Any]: ) -> dict[str, Any]:
absolute_path = Path(file).absolute() absolute_path = Path(file).absolute()
# define a nix expression that loads the given module file using lib.evalModules # define a nix expression that loads the given module file using lib.evalModules
@@ -36,7 +36,7 @@ def schema_from_module_file(
return json.loads(proc.stdout) return json.loads(proc.stdout)
def subtype_from_schema(schema: dict[str, Any]) -> Type: def subtype_from_schema(schema: dict[str, Any]) -> type:
if schema["type"] == "object": if schema["type"] == "object":
if "additionalProperties" in schema: if "additionalProperties" in schema:
sub_type = subtype_from_schema(schema["additionalProperties"]) sub_type = subtype_from_schema(schema["additionalProperties"])
@@ -57,8 +57,8 @@ def subtype_from_schema(schema: dict[str, Any]) -> Type:
def type_from_schema_path( def type_from_schema_path(
schema: dict[str, Any], schema: dict[str, Any],
path: list[str], path: list[str],
full_path: Optional[list[str]] = None, full_path: list[str] | None = None,
) -> Type: ) -> type:
if full_path is None: if full_path is None:
full_path = path full_path = path
if len(path) == 0: if len(path) == 0:
@@ -76,8 +76,8 @@ def type_from_schema_path(
raise ClanError(f"Unknown type for path {path}") raise ClanError(f"Unknown type for path {path}")
def options_types_from_schema(schema: dict[str, Any]) -> dict[str, Type]: def options_types_from_schema(schema: dict[str, Any]) -> dict[str, type]:
result: dict[str, Type] = {} result: dict[str, type] = {}
for name, value in schema.get("properties", {}).items(): for name, value in schema.get("properties", {}).items():
assert isinstance(value, dict) assert isinstance(value, dict)
type_ = value["type"] type_ = value["type"]

View File

@@ -4,7 +4,6 @@ import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Optional
from clan_cli.dirs import nixpkgs_source from clan_cli.dirs import nixpkgs_source
from clan_cli.errors import ClanError, ClanHttpError from clan_cli.errors import ClanError, ClanHttpError
@@ -14,7 +13,7 @@ from clan_cli.nix import nix_eval
def machine_schema( def machine_schema(
flake_dir: Path, flake_dir: Path,
config: dict, config: dict,
clan_imports: Optional[list[str]] = None, clan_imports: list[str] | None = None,
) -> dict: ) -> dict:
# use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan # use nix eval to lib.evalModules .#nixosConfigurations.<machine_name>.options.clan
with NamedTemporaryFile(mode="w", dir=flake_dir) as clan_machine_settings_file: with NamedTemporaryFile(mode="w", dir=flake_dir) as clan_machine_settings_file:

View File

@@ -1,7 +1,8 @@
import inspect import inspect
import logging import logging
from collections.abc import Callable
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any
grey = "\x1b[38;20m" grey = "\x1b[38;20m"
yellow = "\x1b[33;20m" yellow = "\x1b[33;20m"

View File

@@ -1,5 +1,6 @@
from collections.abc import Callable
from types import ModuleType from types import ModuleType
from typing import Any, Callable from typing import Any
class FakeDeal: class FakeDeal:

View File

@@ -6,15 +6,16 @@ import stat
import subprocess import subprocess
import sys import sys
import time import time
from collections.abc import Callable
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, List, Optional from typing import Any
import ipdb import ipdb
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def command_exec(cmd: List[str], work_dir: Path, env: Dict[str, str]) -> None: def command_exec(cmd: list[str], work_dir: Path, env: dict[str, str]) -> None:
subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve()) subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve())
@@ -32,8 +33,8 @@ def block_for_input() -> None:
def breakpoint_container( def breakpoint_container(
work_dir: Path, work_dir: Path,
env: Optional[Dict[str, str]] = None, env: dict[str, str] | None = None,
cmd: Optional[List[str]] = None, cmd: list[str] | None = None,
) -> None: ) -> None:
if env is None: if env is None:
env = os.environ.copy() env = os.environ.copy()
@@ -52,8 +53,8 @@ def breakpoint_container(
def breakpoint_shell( def breakpoint_shell(
work_dir: Path = Path(os.getcwd()), work_dir: Path = Path(os.getcwd()),
env: Optional[Dict[str, str]] = None, env: dict[str, str] | None = None,
cmd: Optional[List[str]] = None, cmd: list[str] | None = None,
) -> None: ) -> None:
if env is None: if env is None:
env = os.environ.copy() env = os.environ.copy()
@@ -91,7 +92,7 @@ def spawn_process(func: Callable, **kwargs: Any) -> mp.Process:
return proc return proc
def dump_env(env: Dict[str, str], loc: Path) -> None: def dump_env(env: dict[str, str], loc: Path) -> None:
cenv = env.copy() cenv = env.copy()
log.info("Dumping environment variables to %s", loc) log.info("Dumping environment variables to %s", loc)
with open(loc, "w") as f: with open(loc, "w") as f:

View File

@@ -2,20 +2,19 @@ import logging
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def get_clan_flake_toplevel() -> Optional[Path]: def get_clan_flake_toplevel() -> Path | None:
return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"]) return find_toplevel([".clan-flake", ".git", ".hg", ".svn", "flake.nix"])
def find_git_repo_root() -> Optional[Path]: def find_git_repo_root() -> Path | None:
return find_toplevel([".git"]) return find_toplevel([".git"])
def find_toplevel(top_level_files: list[str]) -> Optional[Path]: def find_toplevel(top_level_files: list[str]) -> Path | None:
"""Returns the path to the toplevel of the clan flake""" """Returns the path to the toplevel of the clan flake"""
for project_file in top_level_files: for project_file in top_level_files:
initial_path = Path(os.getcwd()) initial_path = Path(os.getcwd())

View File

@@ -1,20 +1,19 @@
# !/usr/bin/env python3 # !/usr/bin/env python3
import argparse import argparse
from pathlib import Path from pathlib import Path
from typing import Dict
from clan_cli.dirs import user_history_file from clan_cli.dirs import user_history_file
from ..async_cmd import CmdOut, runforcli from ..async_cmd import CmdOut, runforcli
async def add_flake(path: Path) -> Dict[str, CmdOut]: async def add_flake(path: Path) -> dict[str, CmdOut]:
user_history_file().parent.mkdir(parents=True, exist_ok=True) user_history_file().parent.mkdir(parents=True, exist_ok=True)
# append line to history file # append line to history file
# TODO: Make this atomic # TODO: Make this atomic
lines: set = set() lines: set = set()
if user_history_file().exists(): if user_history_file().exists():
with open(user_history_file(), "r") as f: with open(user_history_file()) as f:
lines = set(f.readlines()) lines = set(f.readlines())
lines.add(str(path)) lines.add(str(path))
with open(user_history_file(), "w") as f: with open(user_history_file(), "w") as f:

View File

@@ -1,7 +1,6 @@
# !/usr/bin/env python3 # !/usr/bin/env python3
import argparse import argparse
from pathlib import Path from pathlib import Path
from typing import Dict
from ..async_cmd import CmdOut, run, runforcli from ..async_cmd import CmdOut, run, runforcli
from ..errors import ClanError from ..errors import ClanError
@@ -10,7 +9,7 @@ from ..nix import nix_command, nix_shell
DEFAULT_URL: str = "git+https://git.clan.lol/clan/clan-core?new-clan" DEFAULT_URL: str = "git+https://git.clan.lol/clan/clan-core?new-clan"
async def create_flake(directory: Path, url: str) -> Dict[str, CmdOut]: async def create_flake(directory: Path, url: str) -> dict[str, CmdOut]:
if not directory.exists(): if not directory.exists():
directory.mkdir() directory.mkdir()
else: else:

View File

@@ -9,7 +9,7 @@ def list_history() -> list[Path]:
if not user_history_file().exists(): if not user_history_file().exists():
return [] return []
# read path lines from history file # read path lines from history file
with open(user_history_file(), "r") as f: with open(user_history_file()) as f:
lines = f.readlines() lines = f.readlines()
return [Path(line.strip()) for line in lines] return [Path(line.strip()) for line in lines]

View File

@@ -1,7 +1,6 @@
import shlex import shlex
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Optional
# from clan_cli.dirs import find_git_repo_root # from clan_cli.dirs import find_git_repo_root
from clan_cli.errors import ClanError from clan_cli.errors import ClanError
@@ -12,7 +11,7 @@ from clan_cli.nix import nix_shell
def commit_file( def commit_file(
file_path: Path, file_path: Path,
repo_dir: Path, repo_dir: Path,
commit_message: Optional[str] = None, commit_message: str | None = None,
) -> None: ) -> None:
# check that the file is in the git repository and exists # check that the file is in the git repository and exists
if not Path(file_path).resolve().is_relative_to(repo_dir.resolve()): if not Path(file_path).resolve().is_relative_to(repo_dir.resolve()):

View File

@@ -3,7 +3,6 @@ import os
import subprocess import subprocess
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional
from ..nix import nix_build, nix_config, nix_eval from ..nix import nix_build, nix_config, nix_eval
from ..ssh import Host, parse_deployment_address from ..ssh import Host, parse_deployment_address
@@ -31,7 +30,7 @@ class Machine:
self, self,
name: str, name: str,
flake_dir: Path, flake_dir: Path,
machine_data: Optional[dict] = None, machine_data: dict | None = None,
) -> None: ) -> None:
""" """
Creates a Machine Creates a Machine

View File

@@ -49,23 +49,22 @@ def deploy_nixos(hosts: HostGroup, clan_dir: Path) -> None:
if target_user: if target_user:
target_host = f"{target_user}@{target_host}" target_host = f"{target_user}@{target_host}"
extra_args = h.meta.get("extra_args", []) extra_args = h.meta.get("extra_args", [])
cmd = ( cmd = [
["nixos-rebuild", "switch"] "nixos-rebuild",
+ extra_args "switch",
+ [ *extra_args,
"--fast", "--fast",
"--option", "--option",
"keep-going", "keep-going",
"true", "true",
"--option", "--option",
"accept-flake-config", "accept-flake-config",
"true", "true",
"--build-host", "--build-host",
"", "",
"--flake", "--flake",
f"{path}#{flake_attr}", f"{path}#{flake_attr}",
] ]
)
if target_host: if target_host:
cmd.extend(["--target-host", target_host]) cmd.extend(["--target-host", target_host])
ret = h.run(cmd, check=False) ret = h.run(cmd, check=False)

View File

@@ -12,7 +12,7 @@ from .errors import ClanError
@deal.raises(ClanError) @deal.raises(ClanError)
def nix_command(flags: list[str]) -> list[str]: def nix_command(flags: list[str]) -> list[str]:
return ["nix", "--extra-experimental-features", "nix-command flakes"] + flags return ["nix", "--extra-experimental-features", "nix-command flakes", *flags]
def nix_flake_show(flake_url: str | Path) -> list[str]: def nix_flake_show(flake_url: str | Path) -> list[str]:
@@ -68,19 +68,17 @@ def nix_eval(flags: list[str]) -> list[str]:
) )
if os.environ.get("IN_NIX_SANDBOX"): if os.environ.get("IN_NIX_SANDBOX"):
with tempfile.TemporaryDirectory() as nix_store: with tempfile.TemporaryDirectory() as nix_store:
return ( return [
default_flags *default_flags,
+ [ "--override-input",
"--override-input", "nixpkgs",
"nixpkgs", str(nixpkgs_source()),
str(nixpkgs_source()), # --store is required to prevent this error:
# --store is required to prevent this error: # error: cannot unlink '/nix/store/6xg259477c90a229xwmb53pdfkn6ig3g-default-builder.sh': Operation not permitted
# error: cannot unlink '/nix/store/6xg259477c90a229xwmb53pdfkn6ig3g-default-builder.sh': Operation not permitted "--store",
"--store", nix_store,
nix_store, *flags,
] ]
+ flags
)
return default_flags + flags return default_flags + flags
@@ -96,7 +94,7 @@ def nix_shell(packages: list[str], cmd: list[str]) -> list[str]:
[ [
"shell", "shell",
"--inputs-from", "--inputs-from",
f"{str(nixpkgs_flake())}", f"{nixpkgs_flake()!s}",
] ]
) )
+ wrapped_packages + wrapped_packages

View File

@@ -1,7 +1,7 @@
import os import os
import shutil import shutil
from collections.abc import Callable
from pathlib import Path from pathlib import Path
from typing import Callable
from ..errors import ClanError from ..errors import ClanError

View File

@@ -2,10 +2,11 @@ import json
import os import os
import shutil import shutil
import subprocess import subprocess
from collections.abc import Iterator
from contextlib import contextmanager from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import IO, Iterator from typing import IO
from ..dirs import user_config_dir from ..dirs import user_config_dir
from ..errors import ClanError from ..errors import ClanError

View File

@@ -1,8 +1,8 @@
import argparse import argparse
import os import os
import re import re
from collections.abc import Callable
from pathlib import Path from pathlib import Path
from typing import Callable
from ..errors import ClanError from ..errors import ClanError
from .sops import get_public_key from .sops import get_public_key

View File

@@ -28,7 +28,7 @@ def upload_secrets(machine: Machine) -> None:
" ".join(["ssh"] + ssh_cmd[2:]), " ".join(["ssh"] + ssh_cmd[2:]),
"-az", "-az",
"--delete", "--delete",
f"{str(tempdir)}/", f"{tempdir!s}/",
f"{host.user}@{host.host}:{machine.secrets_upload_directory}/", f"{host.user}@{host.host}:{machine.secrets_upload_directory}/",
], ],
), ),

View File

@@ -10,6 +10,7 @@ import subprocess
import sys import sys
import time import time
import urllib.parse import urllib.parse
from collections.abc import Callable, Iterator
from contextlib import ExitStack, contextmanager from contextlib import ExitStack, contextmanager
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
@@ -18,16 +19,9 @@ from threading import Thread
from typing import ( from typing import (
IO, IO,
Any, Any,
Callable,
Dict,
Generic, Generic,
Iterator,
List,
Literal, Literal,
Optional,
Tuple,
TypeVar, TypeVar,
Union,
overload, overload,
) )
@@ -48,7 +42,7 @@ class CommandFormatter(logging.Formatter):
super().__init__( super().__init__(
"%(prefix_color)s[%(command_prefix)s]%(color_reset)s %(color)s%(message)s%(color_reset)s" "%(prefix_color)s[%(command_prefix)s]%(color_reset)s %(color)s%(message)s%(color_reset)s"
) )
self.hostnames: List[str] = [] self.hostnames: list[str] = []
self.hostname_color_offset = 1 # first host shouldn't get agressive red self.hostname_color_offset = 1 # first host shouldn't get agressive red
def format(self, record: logging.LogRecord) -> str: def format(self, record: logging.LogRecord) -> str:
@@ -80,7 +74,7 @@ class CommandFormatter(logging.Formatter):
return 31 + (index + self.hostname_color_offset) % 7 return 31 + (index + self.hostname_color_offset) % 7
def setup_loggers() -> Tuple[logging.Logger, logging.Logger]: def setup_loggers() -> tuple[logging.Logger, logging.Logger]:
# If we use the default logger here (logging.error etc) or a logger called # If we use the default logger here (logging.error etc) or a logger called
# "deploykit", then cmdlog messages are also posted on the default logger. # "deploykit", then cmdlog messages are also posted on the default logger.
# To avoid this message duplication, we set up a main and command logger # To avoid this message duplication, we set up a main and command logger
@@ -115,7 +109,7 @@ error = kitlog.error
@contextmanager @contextmanager
def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]: def _pipe() -> Iterator[tuple[IO[str], IO[str]]]:
(pipe_r, pipe_w) = os.pipe() (pipe_r, pipe_w) = os.pipe()
read_end = os.fdopen(pipe_r, "r") read_end = os.fdopen(pipe_r, "r")
write_end = os.fdopen(pipe_w, "w") write_end = os.fdopen(pipe_w, "w")
@@ -130,7 +124,7 @@ def _pipe() -> Iterator[Tuple[IO[str], IO[str]]]:
write_end.close() write_end.close()
FILE = Union[None, int] FILE = None | int
# Seconds until a message is printed when _run produces no output. # Seconds until a message is printed when _run produces no output.
NO_OUTPUT_TIMEOUT = 20 NO_OUTPUT_TIMEOUT = 20
@@ -149,13 +143,13 @@ class Host:
def __init__( def __init__(
self, self,
host: str, host: str,
user: Optional[str] = None, user: str | None = None,
port: Optional[int] = None, port: int | None = None,
key: Optional[str] = None, key: str | None = None,
forward_agent: bool = False, forward_agent: bool = False,
command_prefix: Optional[str] = None, command_prefix: str | None = None,
host_key_check: HostKeyCheck = HostKeyCheck.STRICT, host_key_check: HostKeyCheck = HostKeyCheck.STRICT,
meta: Dict[str, Any] = {}, meta: dict[str, Any] = {},
verbose_ssh: bool = False, verbose_ssh: bool = False,
ssh_options: dict[str, str] = {}, ssh_options: dict[str, str] = {},
) -> None: ) -> None:
@@ -186,12 +180,12 @@ class Host:
def _prefix_output( def _prefix_output(
self, self,
displayed_cmd: str, displayed_cmd: str,
print_std_fd: Optional[IO[str]], print_std_fd: IO[str] | None,
print_err_fd: Optional[IO[str]], print_err_fd: IO[str] | None,
stdout: Optional[IO[str]], stdout: IO[str] | None,
stderr: Optional[IO[str]], stderr: IO[str] | None,
timeout: float = math.inf, timeout: float = math.inf,
) -> Tuple[str, str]: ) -> tuple[str, str]:
rlist = [] rlist = []
if print_std_fd is not None: if print_std_fd is not None:
rlist.append(print_std_fd) rlist.append(print_std_fd)
@@ -215,7 +209,7 @@ class Host:
def print_from( def print_from(
print_fd: IO[str], print_buf: str, is_err: bool = False print_fd: IO[str], print_buf: str, is_err: bool = False
) -> Tuple[float, str]: ) -> tuple[float, str]:
read = os.read(print_fd.fileno(), 4096) read = os.read(print_fd.fileno(), 4096)
if len(read) == 0: if len(read) == 0:
rlist.remove(print_fd) rlist.remove(print_fd)
@@ -256,7 +250,7 @@ class Host:
extra=dict(command_prefix=self.command_prefix), extra=dict(command_prefix=self.command_prefix),
) )
def handle_fd(fd: Optional[IO[Any]]) -> str: def handle_fd(fd: IO[Any] | None) -> str:
if fd and fd in r: if fd and fd in r:
read = os.read(fd.fileno(), 4096) read = os.read(fd.fileno(), 4096)
if len(read) == 0: if len(read) == 0:
@@ -274,13 +268,13 @@ class Host:
def _run( def _run(
self, self,
cmd: List[str], cmd: list[str],
displayed_cmd: str, displayed_cmd: str,
shell: bool, shell: bool,
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
timeout: float = math.inf, timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]: ) -> subprocess.CompletedProcess[str]:
@@ -362,11 +356,11 @@ class Host:
def run_local( def run_local(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
timeout: float = math.inf, timeout: float = math.inf,
) -> subprocess.CompletedProcess[str]: ) -> subprocess.CompletedProcess[str]:
@@ -404,12 +398,12 @@ class Host:
def run( def run(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
become_root: bool = False, become_root: bool = False,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
verbose_ssh: bool = False, verbose_ssh: bool = False,
timeout: float = math.inf, timeout: float = math.inf,
@@ -456,7 +450,8 @@ class Host:
else: else:
bash_cmd += cmd bash_cmd += cmd
# FIXME we assume bash to be present here? Should be documented... # FIXME we assume bash to be present here? Should be documented...
ssh_cmd = self.ssh_cmd(verbose_ssh=verbose_ssh) + [ ssh_cmd = [
*self.ssh_cmd(verbose_ssh=verbose_ssh),
"--", "--",
f"{sudo}bash -c {quote(bash_cmd)} -- {' '.join(map(quote, bash_args))}", f"{sudo}bash -c {quote(bash_cmd)} -- {' '.join(map(quote, bash_args))}",
] ]
@@ -474,7 +469,7 @@ class Host:
def ssh_cmd( def ssh_cmd(
self, self,
verbose_ssh: bool = False, verbose_ssh: bool = False,
) -> List: ) -> list:
if self.user is not None: if self.user is not None:
ssh_target = f"{self.user}@{self.host}" ssh_target = f"{self.user}@{self.host}"
else: else:
@@ -497,19 +492,19 @@ class Host:
if verbose_ssh or self.verbose_ssh: if verbose_ssh or self.verbose_ssh:
ssh_opts.extend(["-v"]) ssh_opts.extend(["-v"])
return ["ssh", ssh_target] + ssh_opts return ["ssh", ssh_target, *ssh_opts]
T = TypeVar("T") T = TypeVar("T")
class HostResult(Generic[T]): class HostResult(Generic[T]):
def __init__(self, host: Host, result: Union[T, Exception]) -> None: def __init__(self, host: Host, result: T | Exception) -> None:
self.host = host self.host = host
self._result = result self._result = result
@property @property
def error(self) -> Optional[Exception]: def error(self) -> Exception | None:
""" """
Returns an error if the command failed Returns an error if the command failed
""" """
@@ -527,13 +522,13 @@ class HostResult(Generic[T]):
return self._result return self._result
Results = List[HostResult[subprocess.CompletedProcess[str]]] Results = list[HostResult[subprocess.CompletedProcess[str]]]
def _worker( def _worker(
func: Callable[[Host], T], func: Callable[[Host], T],
host: Host, host: Host,
results: List[HostResult[T]], results: list[HostResult[T]],
idx: int, idx: int,
) -> None: ) -> None:
try: try:
@@ -544,18 +539,18 @@ def _worker(
class HostGroup: class HostGroup:
def __init__(self, hosts: List[Host]) -> None: def __init__(self, hosts: list[Host]) -> None:
self.hosts = hosts self.hosts = hosts
def _run_local( def _run_local(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
host: Host, host: Host,
results: Results, results: Results,
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
verbose_ssh: bool = False, verbose_ssh: bool = False,
timeout: float = math.inf, timeout: float = math.inf,
@@ -577,13 +572,13 @@ class HostGroup:
def _run_remote( def _run_remote(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
host: Host, host: Host,
results: Results, results: Results,
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
verbose_ssh: bool = False, verbose_ssh: bool = False,
timeout: float = math.inf, timeout: float = math.inf,
@@ -604,7 +599,7 @@ class HostGroup:
kitlog.exception(e) kitlog.exception(e)
results.append(HostResult(host, e)) results.append(HostResult(host, e))
def _reraise_errors(self, results: List[HostResult[Any]]) -> None: def _reraise_errors(self, results: list[HostResult[Any]]) -> None:
errors = 0 errors = 0
for result in results: for result in results:
e = result.error e = result.error
@@ -621,12 +616,12 @@ class HostGroup:
def _run( def _run(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
local: bool = False, local: bool = False,
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
verbose_ssh: bool = False, verbose_ssh: bool = False,
timeout: float = math.inf, timeout: float = math.inf,
@@ -663,11 +658,11 @@ class HostGroup:
def run( def run(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
verbose_ssh: bool = False, verbose_ssh: bool = False,
timeout: float = math.inf, timeout: float = math.inf,
@@ -695,11 +690,11 @@ class HostGroup:
def run_local( def run_local(
self, self,
cmd: Union[str, List[str]], cmd: str | list[str],
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
timeout: float = math.inf, timeout: float = math.inf,
) -> Results: ) -> Results:
@@ -727,14 +722,14 @@ class HostGroup:
def run_function( def run_function(
self, func: Callable[[Host], T], check: bool = True self, func: Callable[[Host], T], check: bool = True
) -> List[HostResult[T]]: ) -> list[HostResult[T]]:
""" """
Function to run for each host in the group in parallel Function to run for each host in the group in parallel
@func the function to call @func the function to call
""" """
threads = [] threads = []
results: List[HostResult[T]] = [ results: list[HostResult[T]] = [
HostResult(h, Exception(f"No result set for thread {i}")) HostResult(h, Exception(f"No result set for thread {i}"))
for (i, h) in enumerate(self.hosts) for (i, h) in enumerate(self.hosts)
] ]
@@ -763,14 +758,14 @@ def parse_deployment_address(
machine_name: str, host: str, meta: dict[str, Any] = {} machine_name: str, host: str, meta: dict[str, Any] = {}
) -> Host: ) -> Host:
parts = host.split("@") parts = host.split("@")
user: Optional[str] = None user: str | None = None
if len(parts) > 1: if len(parts) > 1:
user = parts[0] user = parts[0]
hostname = parts[1] hostname = parts[1]
else: else:
hostname = parts[0] hostname = parts[0]
maybe_options = hostname.split("?") maybe_options = hostname.split("?")
options: Dict[str, str] = {} options: dict[str, str] = {}
if len(maybe_options) > 1: if len(maybe_options) > 1:
hostname = maybe_options[0] hostname = maybe_options[0]
for option in maybe_options[1].split("&"): for option in maybe_options[1].split("&"):
@@ -795,12 +790,12 @@ def parse_deployment_address(
@overload @overload
def run( def run(
cmd: Union[List[str], str], cmd: list[str] | str,
text: Literal[True] = ..., text: Literal[True] = ...,
stdout: FILE = ..., stdout: FILE = ...,
stderr: FILE = ..., stderr: FILE = ...,
extra_env: Dict[str, str] = ..., extra_env: dict[str, str] = ...,
cwd: Union[None, str, Path] = ..., cwd: None | str | Path = ...,
check: bool = ..., check: bool = ...,
) -> subprocess.CompletedProcess[str]: ) -> subprocess.CompletedProcess[str]:
... ...
@@ -808,24 +803,24 @@ def run(
@overload @overload
def run( def run(
cmd: Union[List[str], str], cmd: list[str] | str,
text: Literal[False], text: Literal[False],
stdout: FILE = ..., stdout: FILE = ...,
stderr: FILE = ..., stderr: FILE = ...,
extra_env: Dict[str, str] = ..., extra_env: dict[str, str] = ...,
cwd: Union[None, str, Path] = ..., cwd: None | str | Path = ...,
check: bool = ..., check: bool = ...,
) -> subprocess.CompletedProcess[bytes]: ) -> subprocess.CompletedProcess[bytes]:
... ...
def run( def run(
cmd: Union[List[str], str], cmd: list[str] | str,
text: bool = True, text: bool = True,
stdout: FILE = None, stdout: FILE = None,
stderr: FILE = None, stderr: FILE = None,
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
cwd: Union[None, str, Path] = None, cwd: None | str | Path = None,
check: bool = True, check: bool = True,
) -> subprocess.CompletedProcess[Any]: ) -> subprocess.CompletedProcess[Any]:
""" """

View File

@@ -1,7 +1,6 @@
import argparse import argparse
import json import json
import subprocess import subprocess
from typing import Optional
from ..nix import nix_shell from ..nix import nix_shell
@@ -9,7 +8,7 @@ from ..nix import nix_shell
def ssh( def ssh(
host: str, host: str,
user: str = "root", user: str = "root",
password: Optional[str] = None, password: str | None = None,
ssh_args: list[str] = [], ssh_args: list[str] = [],
) -> None: ) -> None:
packages = ["tor", "openssh"] packages = ["tor", "openssh"]
@@ -21,7 +20,8 @@ def ssh(
"-p", "-p",
password, password,
] ]
_ssh_args = ssh_args + [ _ssh_args = [
*ssh_args,
"ssh", "ssh",
"-o", "-o",
"UserKnownHostsFile=/dev/null", "UserKnownHostsFile=/dev/null",
@@ -29,7 +29,7 @@ def ssh(
"StrictHostKeyChecking=no", "StrictHostKeyChecking=no",
f"{user}@{host}", f"{user}@{host}",
] ]
cmd = nix_shell(packages, ["torify"] + password_args + _ssh_args) cmd = nix_shell(packages, ["torify", *password_args, *_ssh_args])
subprocess.run(cmd) subprocess.run(cmd)

View File

@@ -7,9 +7,10 @@ import subprocess
import sys import sys
import threading import threading
import traceback import traceback
from collections.abc import Iterator
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Iterator, Optional, Type, TypeVar from typing import Any, TypeVar
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from .custom_logger import ThreadFormatter, get_caller from .custom_logger import ThreadFormatter, get_caller
@@ -36,8 +37,8 @@ class Command:
def run( def run(
self, self,
cmd: list[str], cmd: list[str],
env: Optional[dict[str, str]] = None, env: dict[str, str] | None = None,
cwd: Optional[Path] = None, cwd: Path | None = None,
name: str = "command", name: str = "command",
) -> None: ) -> None:
self.running = True self.running = True
@@ -188,7 +189,7 @@ T = TypeVar("T", bound="BaseTask")
@deal.raises(ClanError) @deal.raises(ClanError)
def create_task(task_type: Type[T], *args: Any) -> T: def create_task(task_type: type[T], *args: Any) -> T:
global POOL global POOL
# check if task_type is a callable # check if task_type is a callable

View File

@@ -1,5 +1,6 @@
import sys import sys
from typing import IO, Any, Callable from collections.abc import Callable
from typing import IO, Any
def is_interactive() -> bool: def is_interactive() -> bool:

View File

@@ -6,9 +6,9 @@ import shlex
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from threading import Condition, Thread from threading import Condition, Thread
from typing import Iterator
from uuid import UUID from uuid import UUID
from ..nix import nix_build, nix_config, nix_eval, nix_shell from ..nix import nix_build, nix_config, nix_eval, nix_shell
@@ -117,9 +117,9 @@ class BuildVmTask(BaseTask):
cmd.run( cmd.run(
nix_build( nix_build(
[ [
f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.system.clan.vm.create' f'{clan_dir}#clanInternals.machines."{system}"."{machine}".config.system.clan.vm.create',
*self.nix_options,
] ]
+ self.nix_options
), ),
name="buildvm", name="buildvm",
) )

View File

@@ -1,8 +1,9 @@
import argparse import argparse
from typing import Callable, NoReturn, Optional from collections.abc import Callable
from typing import NoReturn
start_server: Optional[Callable] = None start_server: Callable | None = None
ServerImportError: Optional[ImportError] = None ServerImportError: ImportError | None = None
try: try:
from .server import start_server from .server import start_server
except ImportError as e: except ImportError as e:

View File

@@ -1,5 +1,4 @@
from enum import Enum from enum import Enum
from typing import Dict, List
from pydantic import BaseModel, Extra, Field from pydantic import BaseModel, Extra, Field
@@ -71,9 +70,9 @@ class FlakeListResponse(BaseModel):
class FlakeCreateResponse(BaseModel): class FlakeCreateResponse(BaseModel):
cmd_out: Dict[str, CmdOut] cmd_out: dict[str, CmdOut]
class FlakeResponse(BaseModel): class FlakeResponse(BaseModel):
content: str content: str
actions: List[FlakeAction] actions: list[FlakeAction]

View File

@@ -5,10 +5,10 @@ import shutil
import subprocess import subprocess
import time import time
import urllib.request import urllib.request
from collections.abc import Iterator
from contextlib import ExitStack, contextmanager from contextlib import ExitStack, contextmanager
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
from typing import Iterator
# XXX: can we dynamically load this using nix develop? # XXX: can we dynamically load this using nix develop?
import uvicorn import uvicorn

View File

@@ -1,5 +1,5 @@
from enum import Enum from enum import Enum
from typing import Any, Dict, List from typing import Any
class Tags(Enum): class Tags(Enum):
@@ -13,7 +13,7 @@ class Tags(Enum):
return self.value return self.value
tags_metadata: List[Dict[str, Any]] = [ tags_metadata: list[dict[str, Any]] = [
{ {
"name": str(Tags.flake), "name": str(Tags.flake),
"description": "Operations on a flake.", "description": "Operations on a flake.",

View File

@@ -25,7 +25,7 @@ markers = ["impure", "with_core"]
[tool.mypy] [tool.mypy]
plugins = ["deal.mypy"] plugins = ["deal.mypy"]
python_version = "3.10" python_version = "3.11"
warn_redundant_casts = true warn_redundant_casts = true
disallow_untyped_calls = true disallow_untyped_calls = true
disallow_untyped_defs = true disallow_untyped_defs = true
@@ -53,7 +53,7 @@ module = "setuptools.*"
ignore_missing_imports = true ignore_missing_imports = true
[tool.ruff] [tool.ruff]
target-version = "py311"
line-length = 88 line-length = 88
select = ["E", "F", "I", "U", "N", "RUF"]
select = ["E", "F", "I", "N"]
ignore = ["E501", "E402"] ignore = ["E501", "E402"]

View File

@@ -1,26 +1,27 @@
import os import os
import signal import signal
import subprocess import subprocess
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from typing import IO, Any, Dict, Iterator, List, Optional, Union from typing import IO, Any
import pytest import pytest
_FILE = Union[None, int, IO[Any]] _FILE = None | int | IO[Any]
class Command: class Command:
def __init__(self) -> None: def __init__(self) -> None:
self.processes: List[subprocess.Popen[str]] = [] self.processes: list[subprocess.Popen[str]] = []
def run( def run(
self, self,
command: List[str], command: list[str],
extra_env: Dict[str, str] = {}, extra_env: dict[str, str] = {},
stdin: _FILE = None, stdin: _FILE = None,
stdout: _FILE = None, stdout: _FILE = None,
stderr: _FILE = None, stderr: _FILE = None,
workdir: Optional[Path] = None, workdir: Path | None = None,
) -> subprocess.Popen[str]: ) -> subprocess.Popen[str]:
env = os.environ.copy() env = os.environ.copy()
env.update(extra_env) env.update(extra_env)

View File

@@ -4,8 +4,9 @@ import os
import shutil import shutil
import subprocess as sp import subprocess as sp
import tempfile import tempfile
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from typing import Iterator, NamedTuple from typing import NamedTuple
import pytest import pytest
from pydantic import AnyUrl from pydantic import AnyUrl

View File

@@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
class Cli: class Cli:
def run(self, args: list[str]) -> argparse.Namespace: def run(self, args: list[str]) -> argparse.Namespace:
parser = create_parser(prog="clan") parser = create_parser(prog="clan")
cmd = shlex.join(["clan"] + args) cmd = shlex.join(["clan", *args])
log.debug(f"$ {cmd}") log.debug(f"$ {cmd}")
log.debug(f"Caller {get_caller()}") log.debug(f"Caller {get_caller()}")
parsed = parser.parse_args(args) parsed = parser.parse_args(args)

View File

@@ -2,7 +2,7 @@
import contextlib import contextlib
import socket import socket
from typing import Callable from collections.abc import Callable
import pytest import pytest

View File

@@ -3,10 +3,11 @@ import shutil
import string import string
import subprocess import subprocess
import time import time
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from sys import platform from sys import platform
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Iterator from typing import TYPE_CHECKING
import pytest import pytest

View File

@@ -1,8 +1,8 @@
import logging import logging
import os import os
import tempfile import tempfile
from collections.abc import Iterator
from pathlib import Path from pathlib import Path
from typing import Iterator
import pytest import pytest

View File

@@ -1,7 +1,7 @@
import json import json
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any
import pytest import pytest
from cli import Cli from cli import Cli
@@ -47,8 +47,8 @@ def test_set_some_option(
example_options, example_options,
"--settings-file", "--settings-file",
out_file.name, out_file.name,
*args,
] ]
+ args
) )
json_out = json.loads(open(out_file.name).read()) json_out = json.loads(open(out_file.name).read())
assert json_out == expected assert json_out == expected
@@ -206,18 +206,18 @@ def test_map_type() -> None:
assert config.map_type("boolean") == bool assert config.map_type("boolean") == bool
assert config.map_type("attribute set of string") == dict[str, str] assert config.map_type("attribute set of string") == dict[str, str]
assert config.map_type("attribute set of integer") == dict[str, int] assert config.map_type("attribute set of integer") == dict[str, int]
assert config.map_type("null or string") == Optional[str] assert config.map_type("null or string") == str | None
# test the cast function with simple types # test the cast function with simple types
def test_cast() -> None: def test_cast() -> None:
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
assert ( assert (
config.cast(value=["null"], type=Optional[str], opt_description="foo-option") config.cast(value=["null"], type=str | None, opt_description="foo-option")
is None is None
) )
assert ( assert (
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option") config.cast(value=["bar"], type=str | None, opt_description="foo-option")
== "bar" == "bar"
) )

View File

@@ -15,7 +15,7 @@ def test_flake_history_append(
api: TestClient, test_flake: FlakeForTest, temporary_home: Path api: TestClient, test_flake: FlakeForTest, temporary_home: Path
) -> None: ) -> None:
response = api.post( response = api.post(
f"/api/flake/history?flake_dir={str(test_flake.path)}", f"/api/flake/history?flake_dir={test_flake.path!s}",
json={}, json={},
) )
assert response.status_code == 200, response.json() assert response.status_code == 200, response.json()
@@ -34,7 +34,7 @@ def test_flake_history_list(
# add the test_flake # add the test_flake
response = api.post( response = api.post(
f"/api/flake/history?flake_dir={str(test_flake.path)}", f"/api/flake/history?flake_dir={test_flake.path!s}",
json={}, json={},
) )
assert response.status_code == 200, response.text assert response.status_code == 200, response.text

View File

@@ -1,7 +1,8 @@
import logging import logging
import os import os
from collections.abc import Iterator
from contextlib import contextmanager from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator from typing import TYPE_CHECKING
import pytest import pytest
from cli import Cli from cli import Cli

View File

@@ -1,6 +1,5 @@
import os import os
import sys import sys
from typing import Union
import pytest import pytest
import pytest_subprocess.fake_process import pytest_subprocess.fake_process
@@ -28,7 +27,7 @@ def test_ssh_no_pass(
user = "user" user = "user"
if os.environ.get("IN_NIX_SANDBOX"): if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX") monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[Union[str, utils.Any]] = [ cmd: list[str | utils.Any] = [
"nix", "nix",
fp.any(), fp.any(),
"shell", "shell",
@@ -58,7 +57,7 @@ def test_ssh_with_pass(
user = "user" user = "user"
if os.environ.get("IN_NIX_SANDBOX"): if os.environ.get("IN_NIX_SANDBOX"):
monkeypatch.delenv("IN_NIX_SANDBOX") monkeypatch.delenv("IN_NIX_SANDBOX")
cmd: list[Union[str, utils.Any]] = [ cmd: list[str | utils.Any] = [
"nix", "nix",
fp.any(), fp.any(),
"shell", "shell",
@@ -79,7 +78,7 @@ def test_ssh_with_pass(
def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None: def test_qrcode_scan(fp: pytest_subprocess.fake_process.FakeProcess) -> None:
cmd: list[Union[str, utils.Any]] = [fp.any()] cmd: list[str | utils.Any] = [fp.any()]
fp.register(cmd, stdout="https://test.test") fp.register(cmd, stdout="https://test.test")
result = cli.qrcode_scan("test.png") result = cli.qrcode_scan("test.png")
assert result == "https://test.test" assert result == "https://test.test"

View File

@@ -1,7 +1,7 @@
import argparse import argparse
from typing import Callable, Optional from collections.abc import Callable
start_app: Optional[Callable] = None start_app: Callable | None = None
from .app import start_app from .app import start_app

View File

@@ -11,7 +11,7 @@ scripts = { clan-vm-manager = "clan_vm_manager:main" }
clan_vm_manager = ["*.glade"] clan_vm_manager = ["*.glade"]
[tool.mypy] [tool.mypy]
python_version = "3.10" python_version = "3.11"
warn_redundant_casts = true warn_redundant_casts = true
disallow_untyped_calls = true disallow_untyped_calls = true
disallow_untyped_defs = true disallow_untyped_defs = true
@@ -22,7 +22,7 @@ module = "gi.*"
ignore_missing_imports = true ignore_missing_imports = true
[tool.ruff] [tool.ruff]
target-version = "py311"
line-length = 88 line-length = 88
select = ["E", "F", "I", "N", "RUF", "U"]
select = ["E", "F", "I", "N"]
ignore = ["E501", "E402", "N802"] ignore = ["E501", "E402", "N802"]