PLW2901: fix

This commit is contained in:
Jörg Thalheim
2025-08-20 16:32:59 +02:00
parent 8134ffd787
commit 1d0e0f243e
9 changed files with 59 additions and 45 deletions

View File

@@ -175,9 +175,11 @@ def print_options(
res += head if len(options.items()) else no_options
for option_name, info in options.items():
if replace_prefix:
option_name = option_name.replace(replace_prefix + ".", "")
display_name = option_name.replace(replace_prefix + ".", "")
else:
display_name = option_name
res += render_option(option_name, info, 4)
res += render_option(display_name, info, 4)
return res

View File

@@ -31,11 +31,11 @@ def ping_command(args: argparse.Namespace) -> None:
if machine in network.peers:
found = True
with network.module.connection(network) as network:
with network.module.connection(network) as conn:
log.info(f"Pinging '{machine}' in network '{net_name}' ...")
res = ""
# Check if peer is online
ping = network.ping(machine)
ping = conn.ping(machine)
if ping is None:
res = "not reachable"
log.info(f"{machine} ({net_name}): {res}")

View File

@@ -37,22 +37,22 @@ def import_sops(args: argparse.Namespace) -> None:
res = run(cmd, RunOpts(error_msg=f"Could not import sops file {file}"))
secrets = json.loads(res.stdout)
for k, v in secrets.items():
k = args.prefix + k
secret_name = args.prefix + k
if not isinstance(v, str):
print(
f"WARNING: {k} is not a string but {type(v)}, skipping",
f"WARNING: {secret_name} is not a string but {type(v)}, skipping",
file=sys.stderr,
)
continue
if (sops_secrets_folder(args.flake.path) / k / "secret").exists():
if (sops_secrets_folder(args.flake.path) / secret_name / "secret").exists():
print(
f"WARNING: {k} already exists, skipping",
f"WARNING: {secret_name} already exists, skipping",
file=sys.stderr,
)
continue
encrypt_secret(
args.flake.path,
sops_secrets_folder(args.flake.path) / k,
sops_secrets_folder(args.flake.path) / secret_name,
v,
add_groups=args.group,
add_machines=args.machine,

View File

@@ -14,9 +14,7 @@ def list_command(args: argparse.Namespace) -> None:
templates = list_templates(args.flake)
# Display all templates
for i, (template_type, _builtin_template_set) in enumerate(
templates.builtins.items(),
):
for template_type in templates.builtins:
builtin_template_set: TemplateClanType | None = templates.builtins.get(
template_type,
None,
@@ -26,15 +24,17 @@ def list_command(args: argparse.Namespace) -> None:
print(f"Available '{template_type}' templates")
print("├── <builtin>")
for i, (name, template) in enumerate(builtin_template_set.items()):
for builtin_idx, (name, template) in enumerate(builtin_template_set.items()):
description = template.get("description", "no description")
is_last_template = i == len(builtin_template_set.items()) - 1
is_last_template = builtin_idx == len(builtin_template_set.items()) - 1
if not is_last_template:
print(f"│ ├── {name}: {description}")
else:
print(f"│ └── {name}: {description}")
for i, (input_name, input_templates) in enumerate(templates.custom.items()):
for input_idx, (input_name, input_templates) in enumerate(
templates.custom.items(),
):
custom_templates: TemplateClanType | None = input_templates.get(
template_type,
None,
@@ -42,15 +42,15 @@ def list_command(args: argparse.Namespace) -> None:
if not custom_templates:
continue
is_last_input = i == len(templates.custom.items()) - 1
is_last_input = input_idx == len(templates.custom.items()) - 1
prefix = "" if not is_last_input else " "
if not is_last_input:
print(f"├── inputs.{input_name}:")
else:
print(f"└── inputs.{input_name}:")
for i, (name, template) in enumerate(custom_templates.items()):
is_last_template = i == len(custom_templates.items()) - 1
for custom_idx, (name, template) in enumerate(custom_templates.items()):
is_last_template = custom_idx == len(custom_templates.items()) - 1
if not is_last_template:
print(
f"{prefix} ├── {name}: {template.get('description', 'no description')}",

View File

@@ -70,22 +70,31 @@ def substitute(
with file.open() as f:
for line in f:
line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
processed_line = line.replace("__NIXPKGS__", str(nixpkgs_source()))
if clan_core_replacement:
line = line.replace("__CLAN_CORE__", clan_core_replacement)
line = line.replace(
processed_line = processed_line.replace(
"__CLAN_CORE__",
clan_core_replacement,
)
processed_line = processed_line.replace(
"git+https://git.clan.lol/clan/clan-core",
clan_core_replacement,
)
line = line.replace(
processed_line = processed_line.replace(
"https://git.clan.lol/clan/clan-core/archive/main.tar.gz",
clan_core_replacement,
)
line = line.replace("__INVENTORY_EXPR__", str(inventory_expr))
processed_line = processed_line.replace(
"__INVENTORY_EXPR__",
str(inventory_expr),
)
line = line.replace("__CLAN_SOPS_KEY_PATH__", sops_key)
line = line.replace("__CLAN_SOPS_KEY_DIR__", str(flake / "facts"))
buf += line
processed_line = processed_line.replace("__CLAN_SOPS_KEY_PATH__", sops_key)
processed_line = processed_line.replace(
"__CLAN_SOPS_KEY_DIR__",
str(flake / "facts"),
)
buf += processed_line
print(f"file: {file}")
print(f"clan_core: {clan_core_flake}")
@@ -202,9 +211,10 @@ class ClanFlake:
buf = ""
with file.open() as f:
for line in f:
processed_line = line
for key, value in self.substitutions.items():
line = line.replace(key, value)
buf += line
processed_line = processed_line.replace(key, value)
buf += processed_line
file.write_text(buf)
def init_from_template(self) -> None:

View File

@@ -14,8 +14,8 @@ from clan_lib.errors import ClanError
def should_skip(file_path: Path, excludes: list[Path]) -> bool:
file_path = file_path.resolve() # Ensure absolute path
for exclude in excludes:
exclude = exclude.resolve()
if exclude in file_path.parents or exclude == file_path:
resolved_exclude = exclude.resolve()
if resolved_exclude in file_path.parents or resolved_exclude == file_path:
return True # Skip this file
return False

View File

@@ -247,10 +247,10 @@ def get_network_overview(networks: dict[str, Network]) -> dict:
if module.is_running():
result[network_name]["status"] = True
else:
with module.connection(network) as network:
for peer_name in network.peers:
with module.connection(network) as conn:
for peer_name in conn.peers:
try:
result[network_name]["peers"][peer_name] = network.ping(
result[network_name]["peers"][peer_name] = conn.ping(
peer_name,
)
except ClanError:

View File

@@ -498,12 +498,14 @@ def retrieve_typed_field_names(obj: type, prefix: str = "") -> set[str]:
# Unwrap Required/NotRequired
if origin in {NotRequired, Required}:
field_type = args[0]
origin = get_origin(field_type)
args = get_args(field_type)
unwrapped_type = args[0]
origin = get_origin(unwrapped_type)
args = get_args(unwrapped_type)
else:
unwrapped_type = field_type
if is_typeddict_class(field_type):
fields |= retrieve_typed_field_names(field_type, prefix=full_key)
if is_typeddict_class(unwrapped_type):
fields |= retrieve_typed_field_names(unwrapped_type, prefix=full_key)
else:
fields.add(full_key)

View File

@@ -79,9 +79,9 @@ class SudoAskpassProxy:
raise ClanError(msg)
try:
for line in ssh_process.stdout:
line = line.strip()
if line.startswith("PASSWORD_REQUESTED:"):
prompt = line[len("PASSWORD_REQUESTED:") :].strip()
stripped_line = line.strip()
if stripped_line.startswith("PASSWORD_REQUESTED:"):
prompt = stripped_line[len("PASSWORD_REQUESTED:") :].strip()
password = self.handle_password_request(prompt)
if ssh_process.stdin is None:
msg = "SSH process stdin is None"
@@ -89,7 +89,7 @@ class SudoAskpassProxy:
print(password, file=ssh_process.stdin)
ssh_process.stdin.flush()
else:
print(line)
print(stripped_line)
except Exception as e:
logger.error(f"Error processing passwords requests output: {e}")
@@ -116,9 +116,9 @@ class SudoAskpassProxy:
raise ClanError(msg)
for line in self.ssh_process.stdout:
line = line.strip()
if line.startswith("ASKPASS_SCRIPT:"):
askpass_script = line[len("ASKPASS_SCRIPT:") :].strip()
stripped_line = line.strip()
if stripped_line.startswith("ASKPASS_SCRIPT:"):
askpass_script = stripped_line[len("ASKPASS_SCRIPT:") :].strip()
break
else:
msg = f"Failed to create askpass script on {self.host.target}. Did not receive ASKPASS_SCRIPT line."