add SIM lint
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import argparse
|
||||
import contextlib
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
@@ -35,10 +36,8 @@ from .ssh import cli as ssh_cli
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
argcomplete: ModuleType | None = None
|
||||
try:
|
||||
with contextlib.suppress(ImportError):
|
||||
import argcomplete # type: ignore[no-redef]
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def flake_path(arg: str) -> FlakeId:
|
||||
|
||||
@@ -47,11 +47,8 @@ class FlakeId:
|
||||
|
||||
"""
|
||||
x = urllib.parse.urlparse(str(self.loc))
|
||||
if x.scheme == "" or "file" in x.scheme:
|
||||
# See above *file* or empty are the only local schemas
|
||||
return True
|
||||
|
||||
return False
|
||||
# See above *file* or empty are the only local schemas
|
||||
return x.scheme == "" or "file" in x.scheme
|
||||
|
||||
def is_remote(self) -> bool:
|
||||
return not self.is_local()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import argparse
|
||||
import contextlib
|
||||
import json
|
||||
import subprocess
|
||||
import threading
|
||||
@@ -17,10 +18,8 @@ We target a maximum of 1second on our average machine.
|
||||
|
||||
|
||||
argcomplete: ModuleType | None = None
|
||||
try:
|
||||
with contextlib.suppress(ImportError):
|
||||
import argcomplete # type: ignore[no-redef]
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
# The default completion timeout for commands
|
||||
@@ -211,10 +210,7 @@ def complete_secrets(
|
||||
from .clan_uri import FlakeId
|
||||
from .secrets.secrets import ListSecretsOptions, list_secrets
|
||||
|
||||
if (clan_dir_result := clan_dir(None)) is not None:
|
||||
flake = clan_dir_result
|
||||
else:
|
||||
flake = "."
|
||||
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
|
||||
|
||||
options = ListSecretsOptions(
|
||||
flake=FlakeId(flake),
|
||||
@@ -237,10 +233,7 @@ def complete_users(
|
||||
|
||||
from .secrets.users import list_users
|
||||
|
||||
if (clan_dir_result := clan_dir(None)) is not None:
|
||||
flake = clan_dir_result
|
||||
else:
|
||||
flake = "."
|
||||
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
|
||||
|
||||
users = list_users(Path(flake))
|
||||
|
||||
@@ -258,10 +251,7 @@ def complete_groups(
|
||||
|
||||
from .secrets.groups import list_groups
|
||||
|
||||
if (clan_dir_result := clan_dir(None)) is not None:
|
||||
flake = clan_dir_result
|
||||
else:
|
||||
flake = "."
|
||||
flake = clan_dir_result if (clan_dir_result := clan_dir(None)) is not None else "."
|
||||
|
||||
groups_list = list_groups(Path(flake))
|
||||
groups = [group.name for group in groups_list]
|
||||
|
||||
@@ -227,7 +227,7 @@ def find_option(
|
||||
regex = rf"({first}|<name>)"
|
||||
for elem in option_path[1:]:
|
||||
regex += rf"\.({elem}|<name>)"
|
||||
for opt in options.keys():
|
||||
for opt in options:
|
||||
if re.match(regex, opt):
|
||||
return opt, value
|
||||
|
||||
|
||||
@@ -16,10 +16,7 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool:
|
||||
|
||||
missing_secret_facts = []
|
||||
missing_public_facts = []
|
||||
if service:
|
||||
services = [service]
|
||||
else:
|
||||
services = list(machine.facts_data.keys())
|
||||
services = [service] if service else list(machine.facts_data.keys())
|
||||
for service in services:
|
||||
for secret_fact in machine.facts_data[service]["secret"]:
|
||||
if isinstance(secret_fact, str):
|
||||
@@ -41,9 +38,7 @@ def check_secrets(machine: Machine, service: None | str = None) -> bool:
|
||||
|
||||
log.debug(f"missing_secret_facts: {missing_secret_facts}")
|
||||
log.debug(f"missing_public_facts: {missing_public_facts}")
|
||||
if missing_secret_facts or missing_public_facts:
|
||||
return False
|
||||
return True
|
||||
return not (missing_secret_facts or missing_public_facts)
|
||||
|
||||
|
||||
def check_command(args: argparse.Namespace) -> None:
|
||||
|
||||
@@ -9,7 +9,7 @@ def list_history_command(args: argparse.Namespace) -> None:
|
||||
res: dict[str, list[HistoryEntry]] = {}
|
||||
for history_entry in list_history():
|
||||
url = str(history_entry.flake.flake_url)
|
||||
if res.get(url, None) is None:
|
||||
if res.get(url) is None:
|
||||
res[url] = []
|
||||
res[url].append(history_entry)
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ Operate on the returned inventory to make changes
|
||||
- save_inventory: To persist changes.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
@@ -163,10 +164,8 @@ def init_inventory(directory: str, init: Inventory | None = None) -> None:
|
||||
inventory = None
|
||||
# Try reading the current flake
|
||||
if init is None:
|
||||
try:
|
||||
with contextlib.suppress(ClanCmdError):
|
||||
inventory = load_inventory_eval(directory)
|
||||
except ClanCmdError:
|
||||
pass
|
||||
|
||||
if init is not None:
|
||||
inventory = init
|
||||
|
||||
@@ -27,7 +27,7 @@ def create_machine(flake: FlakeId, machine: Machine) -> None:
|
||||
|
||||
full_inventory = load_inventory_eval(flake.path)
|
||||
|
||||
if machine.name in full_inventory.machines.keys():
|
||||
if machine.name in full_inventory.machines:
|
||||
msg = f"Machine with the name {machine.name} already exists"
|
||||
raise ClanError(msg)
|
||||
|
||||
|
||||
@@ -166,10 +166,7 @@ def find_reachable_host_from_deploy_json(deploy_json: dict[str, str]) -> str:
|
||||
host = None
|
||||
for addr in deploy_json["addrs"]:
|
||||
if is_reachable(addr):
|
||||
if is_ipv6(addr):
|
||||
host = f"[{addr}]"
|
||||
else:
|
||||
host = addr
|
||||
host = f"[{addr}]" if is_ipv6(addr) else addr
|
||||
break
|
||||
if not host:
|
||||
msg = f"""
|
||||
|
||||
@@ -80,10 +80,7 @@ class QEMUMonitorProtocol:
|
||||
self.__sock.listen(1)
|
||||
|
||||
def __get_sock(self) -> socket.socket:
|
||||
if isinstance(self.__address, tuple):
|
||||
family = socket.AF_INET
|
||||
else:
|
||||
family = socket.AF_UNIX
|
||||
family = socket.AF_INET if isinstance(self.__address, tuple) else socket.AF_UNIX
|
||||
return socket.socket(family, socket.SOCK_STREAM)
|
||||
|
||||
def __negotiate_capabilities(self) -> dict[str, Any]:
|
||||
|
||||
@@ -4,7 +4,7 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from contextlib import contextmanager, suppress
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import IO
|
||||
@@ -196,10 +196,8 @@ def encrypt_file(
|
||||
with open(meta_path, "w") as f_meta:
|
||||
json.dump(meta, f_meta, indent=2)
|
||||
finally:
|
||||
try:
|
||||
with suppress(OSError):
|
||||
os.remove(f.name)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def decrypt_file(secret_path: Path) -> str:
|
||||
|
||||
@@ -476,10 +476,7 @@ class Host:
|
||||
verbose_ssh: bool = False,
|
||||
tty: bool = False,
|
||||
) -> list[str]:
|
||||
if self.user is not None:
|
||||
ssh_target = f"{self.user}@{self.host}"
|
||||
else:
|
||||
ssh_target = self.host
|
||||
ssh_target = f"{self.user}@{self.host}" if self.user is not None else self.host
|
||||
|
||||
ssh_opts = ["-A"] if self.forward_agent else []
|
||||
|
||||
|
||||
@@ -37,9 +37,7 @@ def check_vars(machine: Machine, generator_name: None | str = None) -> bool:
|
||||
|
||||
log.debug(f"missing_secret_vars: {missing_secret_vars}")
|
||||
log.debug(f"missing_public_vars: {missing_public_vars}")
|
||||
if missing_secret_vars or missing_public_vars:
|
||||
return False
|
||||
return True
|
||||
return not (missing_secret_vars or missing_public_vars)
|
||||
|
||||
|
||||
def check_command(args: argparse.Namespace) -> None:
|
||||
|
||||
Reference in New Issue
Block a user