reformat after update

This commit is contained in:
Jörg Thalheim
2025-01-14 15:30:29 +01:00
parent 9d29cc63ad
commit e2aa66d86f
17 changed files with 37 additions and 35 deletions

View File

@@ -102,9 +102,9 @@ class Machine:
.read_text() .read_text()
.split() .split()
) )
assert ( assert len(childs) == 1, (
len(childs) == 1 f"Expected exactly one child process for systemd-nspawn, got {childs}"
), f"Expected exactly one child process for systemd-nspawn, got {childs}" )
try: try:
return int(childs[0]) return int(childs[0])
except ValueError as e: except ValueError as e:

View File

@@ -55,9 +55,9 @@ class Identity:
def node_id(self) -> str: def node_id(self) -> str:
nid = self.public.split(":")[0] nid = self.public.split(":")[0]
assert ( assert len(nid) == 10, (
len(nid) == 10 f"node_id must be 10 characters long, got {len(nid)}: {nid}"
), f"node_id must be 10 characters long, got {len(nid)}: {nid}" )
return nid return nid
@@ -172,9 +172,9 @@ def create_identity() -> Identity:
def compute_zerotier_ip(network_id: str, identity: Identity) -> ipaddress.IPv6Address: def compute_zerotier_ip(network_id: str, identity: Identity) -> ipaddress.IPv6Address:
assert ( assert len(network_id) == 16, (
len(network_id) == 16 "network_id must be 16 characters long, got {network_id}"
), "network_id must be 16 characters long, got {network_id}" )
nwid = int(network_id, 16) nwid = int(network_id, 16)
node_id = int(identity.node_id(), 16) node_id = int(identity.node_id(), 16)
addr_parts = bytearray( addr_parts = bytearray(

View File

@@ -115,9 +115,9 @@ def type_to_dict(
for f in fields: for f in fields:
if f.name.startswith("_"): if f.name.startswith("_"):
continue continue
assert not isinstance( assert not isinstance(f.type, str), (
f.type, str f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?"
), f"Expected field type to be a type, got {f.type}, Have you imported `from __future__ import annotations`?" )
properties[f.metadata.get("alias", f.name)] = type_to_dict( properties[f.metadata.get("alias", f.name)] = type_to_dict(
f.type, f.type,
f"{scope} {t.__name__}.{f.name}", # type: ignore f"{scope} {t.__name__}.{f.name}", # type: ignore

View File

@@ -75,9 +75,9 @@ def show_command(args: argparse.Namespace) -> None:
flake_path = args.flake.path flake_path = args.flake.path
meta = show_clan_meta(flake_path) meta = show_clan_meta(flake_path)
print(f"Name: {meta.get("name")}") print(f"Name: {meta.get('name')}")
print(f"Description: {meta.get("description", '-')}") print(f"Description: {meta.get('description', '-')}")
print(f"Icon: {meta.get("icon", '-')}") print(f"Icon: {meta.get('icon', '-')}")
def register_parser(parser: argparse.ArgumentParser) -> None: def register_parser(parser: argparse.ArgumentParser) -> None:

View File

@@ -146,7 +146,9 @@ def print_trace(msg: str, logger: logging.Logger, prefix: str | None) -> None:
if len(callers) == 1: if len(callers) == 1:
callers_str = f"Caller: {callers[0]}\n" callers_str = f"Caller: {callers[0]}\n"
else: else:
callers_str = "\n".join(f"{i+1}: {caller}" for i, caller in enumerate(callers)) callers_str = "\n".join(
f"{i + 1}: {caller}" for i, caller in enumerate(callers)
)
callers_str = f"Callers:\n{callers_str}" callers_str = f"Callers:\n{callers_str}"
logger.debug(f"{msg} \n{callers_str}", extra={"command_prefix": prefix}) logger.debug(f"{msg} \n{callers_str}", extra={"command_prefix": prefix})

View File

@@ -116,7 +116,7 @@ class CmdOut:
{optional_text("Command", self.command)} {optional_text("Command", self.command)}
{optional_text("Stdout", self.stdout)} {optional_text("Stdout", self.stdout)}
{optional_text("Stderr", self.stderr)} {optional_text("Stderr", self.stderr)}
{'Return Code:':<{label_width}} {self.returncode} {"Return Code:":<{label_width}} {self.returncode}
""" """
] ]
if self.msg: if self.msg:
@@ -135,7 +135,7 @@ class CmdOut:
f""" f"""
{optional_text("Environment", diffed_dict_str)} {optional_text("Environment", diffed_dict_str)}
{text_heading(heading="Metadata")} {text_heading(heading="Metadata")}
{'Work Dir:':<{label_width}} '{self.cwd}' {"Work Dir:":<{label_width}} '{self.cwd}'
""" """
] ]
return "\n".join(error_msg) return "\n".join(error_msg)

View File

@@ -492,7 +492,7 @@ def read_key(path: Path) -> tuple[str, KeyType]:
raise ClanError(msg) from e raise ClanError(msg) from e
key_type = KeyType.validate(key.get("type")) key_type = KeyType.validate(key.get("type"))
if key_type is None: if key_type is None:
msg = f"Invalid key type in {path.name}: \"{key_type}\" (expected one of {', '.join(KeyType.__members__.keys())})." msg = f'Invalid key type in {path.name}: "{key_type}" (expected one of {", ".join(KeyType.__members__.keys())}).'
raise ClanError(msg) raise ClanError(msg)
publickey = key.get("publickey") publickey = key.get("publickey")
if not publickey: if not publickey:

View File

@@ -141,7 +141,7 @@ class SecretStore(StoreBase):
# TODO get the path to the secrets from the machine # TODO get the path to the secrets from the machine
[ [
"cat", "cat",
f"{self.machine.deployment["password-store"]["secretLocation"]}/.pass_info", f"{self.machine.deployment['password-store']['secretLocation']}/.pass_info",
], ],
RunOpts(log=Log.STDERR, check=False), RunOpts(log=Log.STDERR, check=False),
).stdout.strip() ).stdout.strip()

View File

@@ -238,7 +238,7 @@ class SecretStore(StoreBase):
msg = ( msg = (
f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted.\n" f"One or more recipient keys were added to secret{' shared' if generator.share else ''} var '{var_id}', but it was never re-encrypted.\n"
f"This could have been a malicious actor trying to add their keys, please investigate.\n" f"This could have been a malicious actor trying to add their keys, please investigate.\n"
f"Added keys: {', '.join(f"{r.key_type.name}:{r.pubkey}" for r in recipients_to_add)}\n" f"Added keys: {', '.join(f'{r.key_type.name}:{r.pubkey}' for r in recipients_to_add)}\n"
f"If this is intended, run 'clan vars fix' to re-encrypt the secret." f"If this is intended, run 'clan vars fix' to re-encrypt the secret."
) )
return needs_update, msg return needs_update, msg

View File

@@ -99,8 +99,8 @@ def qemu_command(
portmap = {} portmap = {}
kernel_cmdline = [ kernel_cmdline = [
(Path(nixos_config["toplevel"]) / "kernel-params").read_text(), (Path(nixos_config["toplevel"]) / "kernel-params").read_text(),
f'init={nixos_config["toplevel"]}/init', f"init={nixos_config['toplevel']}/init",
f'regInfo={nixos_config["regInfo"]}/registration', f"regInfo={nixos_config['regInfo']}/registration",
"console=hvc0", "console=hvc0",
] ]
if not vm.waypipe.enable: if not vm.waypipe.enable:

View File

@@ -16,7 +16,7 @@ def test_vsock_port(port: int) -> bool:
msg = "vsock is only supported on Linux" msg = "vsock is only supported on Linux"
raise NotImplementedError(msg) raise NotImplementedError(msg)
try: try:
with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: # type: ignore[attr-defined]
s.connect((VMADDR_CID_HYPERVISOR, port)) s.connect((VMADDR_CID_HYPERVISOR, port))
except OSError: except OSError:
return False return False

View File

@@ -76,9 +76,9 @@ exec {bash} -l "${{@}}"
login_shell.chmod(0o755) login_shell.chmod(0o755)
lib_path = None lib_path = None
assert ( assert platform == "linux", (
platform == "linux" f"we do not support the ld_preload trick on non-linux just now. Got {platform}"
), f"we do not support the ld_preload trick on non-linux just now. Got {platform}" )
# This enforces a login shell by overriding the login shell of `getpwnam(3)` # This enforces a login shell by overriding the login shell of `getpwnam(3)`
lib_path = tmpdir / "libgetpwnam-preload.so" lib_path = tmpdir / "libgetpwnam-preload.so"

View File

@@ -14,9 +14,9 @@ class ClanError(Exception):
def compute_zerotier_ip(network_id: str, identity: str) -> ipaddress.IPv6Address: def compute_zerotier_ip(network_id: str, identity: str) -> ipaddress.IPv6Address:
assert ( assert len(network_id) == 16, (
len(network_id) == 16 "network_id must be 16 characters long, got {network_id}"
), "network_id must be 16 characters long, got {network_id}" )
nwid = int(network_id, 16) nwid = int(network_id, 16)
node_id = int(identity, 16) node_id = int(identity, 16)
addr_parts = bytearray( addr_parts = bytearray(