use pathlib

This commit is contained in:
Jörg Thalheim
2024-09-02 17:32:29 +02:00
parent e9536c2be0
commit af0a1dd3f2
9 changed files with 37 additions and 39 deletions

View File

@@ -1,35 +1,34 @@
import argparse
import json
import os
import sqlite3
from pathlib import Path
def ensure_config(path: str, db_path: str) -> None:
def ensure_config(path: Path, db_path: Path) -> None:
# Default JSON structure if the file doesn't exist
default_json = {
"misc": {
"audio_wizard_has_been_shown": True,
"database_location": db_path,
"database_location": str(db_path),
"viewed_server_ping_consent_message": True,
},
"settings_version": 1,
}
# Check if the file exists
if os.path.exists(path):
with open(path) as file:
data = json.load(file)
if path.exists():
data = json.loads(path.read_text())
else:
data = default_json
# Create the file with default JSON structure
with open(path, "w") as file:
with path.open("w") as file:
json.dump(data, file, indent=4)
# TODO: make sure to only update the diff
updated_data = {**default_json, **data}
# Write the modified JSON object back to the file
with open(path, "w") as file:
with path.open("w") as file:
json.dump(updated_data, file, indent=4)
@@ -206,7 +205,7 @@ if __name__ == "__main__":
parser.add_argument("--servers")
parser.add_argument("--username")
parser.add_argument("--db-location")
parser.add_argument("--ensure-config")
parser.add_argument("--ensure-config", type=Path)
args = parser.parse_args()
print(args)

View File

@@ -31,8 +31,8 @@ from typing import Any
from clan_cli.api.modules import Frontmatter, extract_frontmatter, get_roles
# Get environment variables
CLAN_CORE_PATH = os.getenv("CLAN_CORE_PATH")
CLAN_CORE_DOCS = os.getenv("CLAN_CORE_DOCS")
CLAN_CORE_PATH = Path(os.environ["CLAN_CORE_PATH"])
CLAN_CORE_DOCS = Path(os.environ["CLAN_CORE_DOCS"])
CLAN_MODULES = os.environ.get("CLAN_MODULES")
OUT = os.environ.get("out")
@@ -161,7 +161,7 @@ def produce_clan_core_docs() -> None:
# A mapping of output file to content
core_outputs: dict[str, str] = {}
with open(CLAN_CORE_DOCS) as f:
with CLAN_CORE_DOCS.open() as f:
options: dict[str, dict[str, Any]] = json.load(f)
module_name = "clan-core"
for option_name, info in options.items():
@@ -189,7 +189,7 @@ def produce_clan_core_docs() -> None:
for outfile, output in core_outputs.items():
(Path(OUT) / outfile).parent.mkdir(parents=True, exist_ok=True)
with open(Path(OUT) / outfile, "w") as of:
with (Path(OUT) / outfile).open("w") as of:
of.write(output)
@@ -227,9 +227,7 @@ clan_modules_descr = """Clan modules are [NixOS modules](https://wiki.nixos.org/
def produce_clan_modules_docs() -> None:
if not CLAN_MODULES:
msg = (
f"Environment variables are not set correctly: $CLAN_MODULES={CLAN_MODULES}"
)
msg = f"Environment variables are not set correctly: $out={CLAN_MODULES}"
raise ValueError(msg)
if not CLAN_CORE_PATH:
@@ -240,7 +238,7 @@ def produce_clan_modules_docs() -> None:
msg = f"Environment variables are not set correctly: $out={OUT}"
raise ValueError(msg)
with open(CLAN_MODULES) as f:
with Path(CLAN_MODULES).open() as f:
links: dict[str, str] = json.load(f)
# with open(CLAN_MODULES_READMES) as readme:
@@ -258,9 +256,9 @@ def produce_clan_modules_docs() -> None:
modules_index += '<div class="grid cards" markdown>\n\n'
for module_name, options_file in links.items():
readme_file = Path(CLAN_CORE_PATH) / "clanModules" / module_name / "README.md"
readme_file = CLAN_CORE_PATH / "clanModules" / module_name / "README.md"
print(module_name, readme_file)
with open(readme_file) as f:
with readme_file.open() as f:
readme = f.read()
frontmatter: Frontmatter
frontmatter, readme_content = extract_frontmatter(readme, str(readme_file))
@@ -268,7 +266,7 @@ def produce_clan_modules_docs() -> None:
modules_index += build_option_card(module_name, frontmatter)
with open(Path(options_file) / "share/doc/nixos/options.json") as f:
with (Path(options_file) / "share/doc/nixos/options.json").open() as f:
options: dict[str, dict[str, Any]] = json.load(f)
print(f"Rendering options for {module_name}...")
output = module_header(module_name)
@@ -278,7 +276,7 @@ def produce_clan_modules_docs() -> None:
output += f"{readme_content}\n"
# get_roles(str) -> list[str] | None
roles = get_roles(str(Path(CLAN_CORE_PATH) / "clanModules" / module_name))
roles = get_roles(CLAN_CORE_PATH / "clanModules" / module_name)
if roles:
output += render_roles(roles, module_name)
@@ -293,14 +291,14 @@ def produce_clan_modules_docs() -> None:
parents=True,
exist_ok=True,
)
with open(outfile, "w") as of:
with outfile.open("w") as of:
of.write(output)
modules_index += "</div>"
modules_index += "\n"
modules_outfile = Path(OUT) / "clanModules/index.md"
with open(modules_outfile, "w") as of:
with modules_outfile.open("w") as of:
of.write(modules_index)

View File

@@ -64,7 +64,7 @@ def _init_proc(
os.setsid()
# Open stdout and stderr
with open(out_file, "w") as out_fd:
with out_file.open("w") as out_fd:
os.dup2(out_fd.fileno(), sys.stdout.fileno())
os.dup2(out_fd.fileno(), sys.stderr.fileno())

View File

@@ -1,3 +1,5 @@
from pathlib import Path
from clan_cli.inventory import (
AdminConfig,
ServiceAdmin,
@@ -25,7 +27,7 @@ def get_admin_service(base_url: str) -> ServiceAdmin | None:
@API.register
def set_admin_service(
base_url: str,
allowed_keys: dict[str, str],
allowed_keys: dict[str, Path],
instance_name: str = "admin",
extra_machines: list[str] | None = None,
) -> None:
@@ -43,10 +45,10 @@ def set_admin_service(
keys = {}
for name, keyfile in allowed_keys.items():
if not keyfile.startswith("/"):
if not keyfile.is_absolute():
msg = f"Keyfile '{keyfile}' must be an absolute path"
raise ValueError(msg)
with open(keyfile) as f:
with keyfile.open() as f:
pubkey = f.read()
keys[name] = pubkey

View File

@@ -69,8 +69,8 @@ def extract_frontmatter(readme_content: str, err_scope: str) -> tuple[Frontmatte
)
def get_roles(module_path: str) -> None | list[str]:
roles_dir = Path(module_path) / "roles"
def get_roles(module_path: Path) -> None | list[str]:
roles_dir = module_path / "roles"
if not roles_dir.exists() or not roles_dir.is_dir():
return None
@@ -117,14 +117,14 @@ def list_modules(base_path: str) -> dict[str, ModuleInfo]:
"""
modules = get_modules(base_path)
return {
module_name: get_module_info(module_name, module_path)
module_name: get_module_info(module_name, Path(module_path))
for module_name, module_path in modules.items()
}
def get_module_info(
module_name: str,
module_path: str,
module_path: Path,
) -> ModuleInfo:
"""
Retrieves information about a module
@@ -136,7 +136,7 @@ def get_module_info(
location=f"show_module_info {module_name}",
description="Module does not exist",
)
module_readme = Path(module_path) / "README.md"
module_readme = module_path / "README.md"
if not module_readme.exists():
msg = "Module not found"
raise ClanError(
@@ -144,7 +144,7 @@ def get_module_info(
location=f"show_module_info {module_name}",
description="Module does not exist or doesn't have any README.md file",
)
with open(module_readme) as f:
with module_readme.open() as f:
readme = f.read()
frontmatter, readme_content = extract_frontmatter(
readme, f"{module_path}/README.md"

View File

@@ -114,7 +114,7 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
)
parser.add_argument(
"path", type=Path, help="Path to the clan directory", default=Path(".")
"path", type=Path, help="Path to the clan directory", default=Path()
)
def create_flake_command(args: argparse.Namespace) -> None:

View File

@@ -172,7 +172,7 @@ def get_or_set_option(args: argparse.Namespace) -> None:
args.flake, machine_name=args.machine, show_trace=args.show_trace
)
else:
with open(args.options_file) as f:
with args.options_file.open() as f:
options = json.load(f)
# compute settings json file location
if args.settings_file is None:

View File

@@ -21,7 +21,7 @@ def list_state_folders(machine: str, service: None | str = None) -> None:
if (clan_dir_result := get_clan_flake_toplevel_or_env()) is not None:
flake = clan_dir_result
else:
flake = Path(".")
flake = Path()
cmd = nix_eval(
[
f"{flake}#nixosConfigurations.{machine}.config.clanCore.state",

View File

@@ -20,8 +20,7 @@ def parse_args() -> argparse.Namespace:
def get_current_shell() -> str | None:
if selected_shell_file.exists():
with open(selected_shell_file) as f:
return f.read().strip()
return selected_shell_file.read_text().strip()
return None
@@ -57,7 +56,7 @@ def select_shell(shell: str) -> None:
print(f"{shell} devshell already selected. No changes made.")
sys.exit(0)
else:
with open(selected_shell_file, "w") as f:
with selected_shell_file.open("w") as f:
f.write(shell)
print(f"{shell} devshell selected")