disallow variable shadowing
This commit is contained in:
@@ -19,8 +19,8 @@ test_driver = ["py.typed"]
|
|||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
line-length = 88
|
line-length = 88
|
||||||
|
|
||||||
select = ["E", "F", "I", "U", "N", "RUF", "ANN"]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = ["E501", "ANN101", "ANN401"]
|
ignore = ["E501", "ANN101", "ANN401", "A003"]
|
||||||
|
|
||||||
[tool.mypy]
|
[tool.mypy]
|
||||||
python_version = "3.11"
|
python_version = "3.11"
|
||||||
|
|||||||
@@ -93,8 +93,8 @@ class ZerotierController:
|
|||||||
data=data,
|
data=data,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_network(self, id: str) -> dict[str, Any]:
|
def get_network(self, network_id: str) -> dict[str, Any]:
|
||||||
return self._http_request(f"/controller/network/{id}")
|
return self._http_request(f"/controller/network/{network_id}")
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
|||||||
@@ -21,28 +21,28 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
# nixos option type description to python type
|
# nixos option type description to python type
|
||||||
def map_type(type: str) -> Any:
|
def map_type(nix_type: str) -> Any:
|
||||||
if type == "boolean":
|
if nix_type == "boolean":
|
||||||
return bool
|
return bool
|
||||||
elif type in [
|
elif nix_type in [
|
||||||
"integer",
|
"integer",
|
||||||
"signed integer",
|
"signed integer",
|
||||||
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
"16 bit unsigned integer; between 0 and 65535 (both inclusive)",
|
||||||
]:
|
]:
|
||||||
return int
|
return int
|
||||||
elif type.startswith("string"):
|
elif nix_type.startswith("string"):
|
||||||
return str
|
return str
|
||||||
elif type.startswith("null or "):
|
elif nix_type.startswith("null or "):
|
||||||
subtype = type.removeprefix("null or ")
|
subtype = nix_type.removeprefix("null or ")
|
||||||
return map_type(subtype) | None
|
return map_type(subtype) | None
|
||||||
elif type.startswith("attribute set of"):
|
elif nix_type.startswith("attribute set of"):
|
||||||
subtype = type.removeprefix("attribute set of ")
|
subtype = nix_type.removeprefix("attribute set of ")
|
||||||
return dict[str, map_type(subtype)] # type: ignore
|
return dict[str, map_type(subtype)] # type: ignore
|
||||||
elif type.startswith("list of"):
|
elif nix_type.startswith("list of"):
|
||||||
subtype = type.removeprefix("list of ")
|
subtype = nix_type.removeprefix("list of ")
|
||||||
return list[map_type(subtype)] # type: ignore
|
return list[map_type(subtype)] # type: ignore
|
||||||
else:
|
else:
|
||||||
raise ClanError(f"Unknown type {type}")
|
raise ClanError(f"Unknown type {nix_type}")
|
||||||
|
|
||||||
|
|
||||||
# merge two dicts recursively
|
# merge two dicts recursively
|
||||||
@@ -70,10 +70,10 @@ class AllContainer(list):
|
|||||||
|
|
||||||
# value is always a list, as the arg parser cannot know the type upfront
|
# value is always a list, as the arg parser cannot know the type upfront
|
||||||
# and therefore always allows multiple arguments.
|
# and therefore always allows multiple arguments.
|
||||||
def cast(value: Any, type: Any, opt_description: str) -> Any:
|
def cast(value: Any, input_type: Any, opt_description: str) -> Any:
|
||||||
try:
|
try:
|
||||||
# handle bools
|
# handle bools
|
||||||
if isinstance(type, bool):
|
if isinstance(input_type, bool):
|
||||||
if value[0] in ["true", "True", "yes", "y", "1"]:
|
if value[0] in ["true", "True", "yes", "y", "1"]:
|
||||||
return True
|
return True
|
||||||
elif value[0] in ["false", "False", "no", "n", "0"]:
|
elif value[0] in ["false", "False", "no", "n", "0"]:
|
||||||
@@ -81,28 +81,28 @@ def cast(value: Any, type: Any, opt_description: str) -> Any:
|
|||||||
else:
|
else:
|
||||||
raise ClanError(f"Invalid value {value} for boolean")
|
raise ClanError(f"Invalid value {value} for boolean")
|
||||||
# handle lists
|
# handle lists
|
||||||
elif get_origin(type) == list:
|
elif get_origin(input_type) == list:
|
||||||
subtype = type.__args__[0]
|
subtype = input_type.__args__[0]
|
||||||
return [cast([x], subtype, opt_description) for x in value]
|
return [cast([x], subtype, opt_description) for x in value]
|
||||||
# handle dicts
|
# handle dicts
|
||||||
elif get_origin(type) == dict:
|
elif get_origin(input_type) == dict:
|
||||||
if not isinstance(value, dict):
|
if not isinstance(value, dict):
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
f"Cannot set {opt_description} directly. Specify a suboption like {opt_description}.<name>"
|
||||||
)
|
)
|
||||||
subtype = type.__args__[1]
|
subtype = input_type.__args__[1]
|
||||||
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
return {k: cast(v, subtype, opt_description) for k, v in value.items()}
|
||||||
elif str(type) == "typing.Optional[str]":
|
elif str(input_type) == "str | None":
|
||||||
if value[0] in ["null", "None"]:
|
if value[0] in ["null", "None"]:
|
||||||
return None
|
return None
|
||||||
return value[0]
|
return value[0]
|
||||||
else:
|
else:
|
||||||
if len(value) > 1:
|
if len(value) > 1:
|
||||||
raise ClanError(f"Too many values for {opt_description}")
|
raise ClanError(f"Too many values for {opt_description}")
|
||||||
return type(value[0])
|
return input_type(value[0])
|
||||||
except ValueError:
|
except ValueError:
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
f"Invalid type for option {opt_description} (expected {type.__name__})"
|
f"Invalid type for option {opt_description} (expected {input_type.__name__})"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,14 +1,19 @@
|
|||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import Any
|
from typing import Any, Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class AnyCall(Protocol):
|
||||||
|
def __call__(self, *args: Any, **kwargs: Any) -> Any:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class FakeDeal:
|
class FakeDeal:
|
||||||
def __getattr__(self, name: str) -> "Callable":
|
def __getattr__(self, name: str) -> AnyCall:
|
||||||
return self.mock_call
|
return self.mock_call
|
||||||
|
|
||||||
def mock_call(self, *args: Any, **kwargs: Any) -> Callable:
|
def mock_call(self, *args: Any, **kwargs: Any) -> Callable[[AnyCall], AnyCall]:
|
||||||
def wrapper(func: Callable) -> Callable:
|
def wrapper(func: AnyCall) -> AnyCall:
|
||||||
return func
|
return func
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|||||||
@@ -1,24 +1,17 @@
|
|||||||
import logging
|
import logging
|
||||||
import multiprocessing as mp
|
|
||||||
import os
|
import os
|
||||||
import shlex
|
import shlex
|
||||||
import stat
|
import stat
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
from collections.abc import Callable
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import ipdb
|
import ipdb
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def command_exec(cmd: list[str], work_dir: Path, env: dict[str, str]) -> None:
|
|
||||||
subprocess.run(cmd, check=True, env=env, cwd=work_dir.resolve())
|
|
||||||
|
|
||||||
|
|
||||||
def block_for_input() -> None:
|
def block_for_input() -> None:
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
procid = os.getpid()
|
procid = os.getpid()
|
||||||
@@ -66,12 +59,13 @@ def breakpoint_shell(
|
|||||||
if cmd is not None:
|
if cmd is not None:
|
||||||
mycommand = shlex.join(cmd)
|
mycommand = shlex.join(cmd)
|
||||||
write_command(mycommand, work_dir / "cmd.sh")
|
write_command(mycommand, work_dir / "cmd.sh")
|
||||||
proc = spawn_process(func=command_exec, cmd=args, work_dir=work_dir, env=env)
|
proc = subprocess.Popen(args, env=env, cwd=work_dir.resolve())
|
||||||
|
|
||||||
try:
|
with proc:
|
||||||
ipdb.set_trace()
|
try:
|
||||||
finally:
|
ipdb.set_trace()
|
||||||
proc.terminate()
|
finally:
|
||||||
|
proc.terminate()
|
||||||
|
|
||||||
|
|
||||||
def write_command(command: str, loc: Path) -> None:
|
def write_command(command: str, loc: Path) -> None:
|
||||||
@@ -83,15 +77,6 @@ def write_command(command: str, loc: Path) -> None:
|
|||||||
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
os.chmod(loc, st.st_mode | stat.S_IEXEC)
|
||||||
|
|
||||||
|
|
||||||
def spawn_process(func: Callable, **kwargs: Any) -> mp.Process:
|
|
||||||
if mp.get_start_method(allow_none=True) is None:
|
|
||||||
mp.set_start_method(method="spawn")
|
|
||||||
|
|
||||||
proc = mp.Process(target=func, name="python-debug-process", kwargs=kwargs)
|
|
||||||
proc.start()
|
|
||||||
return proc
|
|
||||||
|
|
||||||
|
|
||||||
def dump_env(env: dict[str, str], loc: Path) -> None:
|
def dump_env(env: dict[str, str], loc: Path) -> None:
|
||||||
cenv = env.copy()
|
cenv = env.copy()
|
||||||
log.info("Dumping environment variables to %s", loc)
|
log.info("Dumping environment variables to %s", loc)
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ async def add_flake(path: Path) -> dict[str, CmdOut]:
|
|||||||
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
user_history_file().parent.mkdir(parents=True, exist_ok=True)
|
||||||
# append line to history file
|
# append line to history file
|
||||||
# TODO: Make this atomic
|
# TODO: Make this atomic
|
||||||
lines: set = set()
|
lines: set[str] = set()
|
||||||
if user_history_file().exists():
|
if user_history_file().exists():
|
||||||
with open(user_history_file()) as f:
|
with open(user_history_file()) as f:
|
||||||
lines = set(f.readlines())
|
lines = set(f.readlines())
|
||||||
|
|||||||
@@ -425,14 +425,14 @@ class Host:
|
|||||||
sudo = ""
|
sudo = ""
|
||||||
if become_root and self.user != "root":
|
if become_root and self.user != "root":
|
||||||
sudo = "sudo -- "
|
sudo = "sudo -- "
|
||||||
vars = []
|
env_vars = []
|
||||||
for k, v in extra_env.items():
|
for k, v in extra_env.items():
|
||||||
vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
env_vars.append(f"{shlex.quote(k)}={shlex.quote(v)}")
|
||||||
|
|
||||||
displayed_cmd = ""
|
displayed_cmd = ""
|
||||||
export_cmd = ""
|
export_cmd = ""
|
||||||
if vars:
|
if env_vars:
|
||||||
export_cmd = f"export {' '.join(vars)}; "
|
export_cmd = f"export {' '.join(env_vars)}; "
|
||||||
displayed_cmd += export_cmd
|
displayed_cmd += export_cmd
|
||||||
if isinstance(cmd, list):
|
if isinstance(cmd, list):
|
||||||
displayed_cmd += " ".join(cmd)
|
displayed_cmd += " ".join(cmd)
|
||||||
@@ -469,7 +469,7 @@ class Host:
|
|||||||
def ssh_cmd(
|
def ssh_cmd(
|
||||||
self,
|
self,
|
||||||
verbose_ssh: bool = False,
|
verbose_ssh: bool = False,
|
||||||
) -> list:
|
) -> list[str]:
|
||||||
if self.user is not None:
|
if self.user is not None:
|
||||||
ssh_target = f"{self.user}@{self.host}"
|
ssh_target = f"{self.user}@{self.host}"
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -21,8 +21,8 @@ from .errors import ClanError
|
|||||||
class Command:
|
class Command:
|
||||||
def __init__(self, log: logging.Logger) -> None:
|
def __init__(self, log: logging.Logger) -> None:
|
||||||
self.log: logging.Logger = log
|
self.log: logging.Logger = log
|
||||||
self.p: subprocess.Popen | None = None
|
self.p: subprocess.Popen[str] | None = None
|
||||||
self._output: queue.SimpleQueue = queue.SimpleQueue()
|
self._output: queue.SimpleQueue[str | None] = queue.SimpleQueue()
|
||||||
self.returncode: int | None = None
|
self.returncode: int | None = None
|
||||||
self.done: bool = False
|
self.done: bool = False
|
||||||
self.stdout: list[str] = []
|
self.stdout: list[str] = []
|
||||||
@@ -148,8 +148,8 @@ class BaseTask:
|
|||||||
for line in proc.stderr:
|
for line in proc.stderr:
|
||||||
yield line
|
yield line
|
||||||
else:
|
else:
|
||||||
while line := proc._output.get():
|
while maybe_line := proc._output.get():
|
||||||
yield line
|
yield maybe_line
|
||||||
|
|
||||||
def commands(self) -> Iterator[Command]:
|
def commands(self) -> Iterator[Command]:
|
||||||
yield from self.procs
|
yield from self.procs
|
||||||
|
|||||||
@@ -55,5 +55,5 @@ ignore_missing_imports = true
|
|||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
line-length = 88
|
line-length = 88
|
||||||
select = ["E", "F", "I", "U", "N", "RUF", "ANN"]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = ["E501", "E402", "ANN101", "ANN401"]
|
ignore = ["E501", "E402", "ANN101", "ANN401", "A003"]
|
||||||
|
|||||||
@@ -38,14 +38,14 @@ def sshd_config(test_root: Path) -> Iterator[SshdConfig]:
|
|||||||
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
|
# FIXME, if any parent of the sshd directory is world-writable than sshd will refuse it.
|
||||||
# we use .direnv instead since it's already in .gitignore
|
# we use .direnv instead since it's already in .gitignore
|
||||||
with TemporaryDirectory() as _dir:
|
with TemporaryDirectory() as _dir:
|
||||||
dir = Path(_dir)
|
tmpdir = Path(_dir)
|
||||||
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
host_key = test_root / "data" / "ssh_host_ed25519_key"
|
||||||
host_key.chmod(0o600)
|
host_key.chmod(0o600)
|
||||||
template = (test_root / "data" / "sshd_config").read_text()
|
template = (test_root / "data" / "sshd_config").read_text()
|
||||||
content = string.Template(template).substitute(dict(host_key=host_key))
|
content = string.Template(template).substitute(dict(host_key=host_key))
|
||||||
config = dir / "sshd_config"
|
config = tmpdir / "sshd_config"
|
||||||
config.write_text(content)
|
config.write_text(content)
|
||||||
login_shell = dir / "shell"
|
login_shell = tmpdir / "shell"
|
||||||
|
|
||||||
bash = shutil.which("bash")
|
bash = shutil.which("bash")
|
||||||
path = os.environ["PATH"]
|
path = os.environ["PATH"]
|
||||||
@@ -72,7 +72,7 @@ exec {bash} -l "${{@}}"
|
|||||||
), "we do not support the ld_preload trick on non-linux just now"
|
), "we do not support the ld_preload trick on non-linux just now"
|
||||||
|
|
||||||
# This enforces a login shell by overriding the login shell of `getpwnam(3)`
|
# This enforces a login shell by overriding the login shell of `getpwnam(3)`
|
||||||
lib_path = dir / "libgetpwnam-preload.so"
|
lib_path = tmpdir / "libgetpwnam-preload.so"
|
||||||
subprocess.run(
|
subprocess.run(
|
||||||
[
|
[
|
||||||
os.environ.get("CC", "cc"),
|
os.environ.get("CC", "cc"),
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import json
|
import json
|
||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Optional
|
from typing import Any
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from cli import Cli
|
from cli import Cli
|
||||||
@@ -211,13 +211,16 @@ def test_map_type() -> None:
|
|||||||
|
|
||||||
# test the cast function with simple types
|
# test the cast function with simple types
|
||||||
def test_cast() -> None:
|
def test_cast() -> None:
|
||||||
assert config.cast(value=["true"], type=bool, opt_description="foo-option") is True
|
|
||||||
assert (
|
assert (
|
||||||
config.cast(value=["null"], type=Optional[str], opt_description="foo-option") # noqa: UP007
|
config.cast(value=["true"], input_type=bool, opt_description="foo-option")
|
||||||
|
is True
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
config.cast(value=["null"], input_type=str | None, opt_description="foo-option")
|
||||||
is None
|
is None
|
||||||
)
|
)
|
||||||
assert (
|
assert (
|
||||||
config.cast(value=["bar"], type=Optional[str], opt_description="foo-option") # noqa: UP007
|
config.cast(value=["bar"], input_type=str | None, opt_description="foo-option")
|
||||||
== "bar"
|
== "bar"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -24,5 +24,5 @@ ignore_missing_imports = true
|
|||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
line-length = 88
|
line-length = 88
|
||||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN" ]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = ["E501", "E402", "N802", "ANN101", "ANN401"]
|
ignore = ["E501", "E402", "N802", "ANN101", "ANN401", "A003"]
|
||||||
|
|||||||
@@ -10,5 +10,5 @@ exclude = "clan_cli.nixpkgs"
|
|||||||
[tool.ruff]
|
[tool.ruff]
|
||||||
line-length = 88
|
line-length = 88
|
||||||
target-version = "py311"
|
target-version = "py311"
|
||||||
select = [ "E", "F", "I", "U", "N", "RUF", "ANN" ]
|
select = [ "E", "F", "I", "U", "N", "RUF", "ANN", "A" ]
|
||||||
ignore = [ "E501", "ANN101", "ANN401"]
|
ignore = [ "E501", "ANN101", "ANN401", "A003"]
|
||||||
|
|||||||
Reference in New Issue
Block a user