use pathlib
This commit is contained in:
@@ -1,35 +1,34 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
import os
|
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
def ensure_config(path: str, db_path: str) -> None:
|
def ensure_config(path: Path, db_path: Path) -> None:
|
||||||
# Default JSON structure if the file doesn't exist
|
# Default JSON structure if the file doesn't exist
|
||||||
default_json = {
|
default_json = {
|
||||||
"misc": {
|
"misc": {
|
||||||
"audio_wizard_has_been_shown": True,
|
"audio_wizard_has_been_shown": True,
|
||||||
"database_location": db_path,
|
"database_location": str(db_path),
|
||||||
"viewed_server_ping_consent_message": True,
|
"viewed_server_ping_consent_message": True,
|
||||||
},
|
},
|
||||||
"settings_version": 1,
|
"settings_version": 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
# Check if the file exists
|
# Check if the file exists
|
||||||
if os.path.exists(path):
|
if path.exists():
|
||||||
with open(path) as file:
|
data = json.loads(path.read_text())
|
||||||
data = json.load(file)
|
|
||||||
else:
|
else:
|
||||||
data = default_json
|
data = default_json
|
||||||
# Create the file with default JSON structure
|
# Create the file with default JSON structure
|
||||||
with open(path, "w") as file:
|
with path.open("w") as file:
|
||||||
json.dump(data, file, indent=4)
|
json.dump(data, file, indent=4)
|
||||||
|
|
||||||
# TODO: make sure to only update the diff
|
# TODO: make sure to only update the diff
|
||||||
updated_data = {**default_json, **data}
|
updated_data = {**default_json, **data}
|
||||||
|
|
||||||
# Write the modified JSON object back to the file
|
# Write the modified JSON object back to the file
|
||||||
with open(path, "w") as file:
|
with path.open("w") as file:
|
||||||
json.dump(updated_data, file, indent=4)
|
json.dump(updated_data, file, indent=4)
|
||||||
|
|
||||||
|
|
||||||
@@ -206,7 +205,7 @@ if __name__ == "__main__":
|
|||||||
parser.add_argument("--servers")
|
parser.add_argument("--servers")
|
||||||
parser.add_argument("--username")
|
parser.add_argument("--username")
|
||||||
parser.add_argument("--db-location")
|
parser.add_argument("--db-location")
|
||||||
parser.add_argument("--ensure-config")
|
parser.add_argument("--ensure-config", type=Path)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
print(args)
|
print(args)
|
||||||
|
|||||||
@@ -31,8 +31,8 @@ from typing import Any
|
|||||||
from clan_cli.api.modules import Frontmatter, extract_frontmatter, get_roles
|
from clan_cli.api.modules import Frontmatter, extract_frontmatter, get_roles
|
||||||
|
|
||||||
# Get environment variables
|
# Get environment variables
|
||||||
CLAN_CORE_PATH = os.getenv("CLAN_CORE_PATH")
|
CLAN_CORE_PATH = Path(os.environ["CLAN_CORE_PATH"])
|
||||||
CLAN_CORE_DOCS = os.getenv("CLAN_CORE_DOCS")
|
CLAN_CORE_DOCS = Path(os.environ["CLAN_CORE_DOCS"])
|
||||||
CLAN_MODULES = os.environ.get("CLAN_MODULES")
|
CLAN_MODULES = os.environ.get("CLAN_MODULES")
|
||||||
|
|
||||||
OUT = os.environ.get("out")
|
OUT = os.environ.get("out")
|
||||||
@@ -161,7 +161,7 @@ def produce_clan_core_docs() -> None:
|
|||||||
|
|
||||||
# A mapping of output file to content
|
# A mapping of output file to content
|
||||||
core_outputs: dict[str, str] = {}
|
core_outputs: dict[str, str] = {}
|
||||||
with open(CLAN_CORE_DOCS) as f:
|
with CLAN_CORE_DOCS.open() as f:
|
||||||
options: dict[str, dict[str, Any]] = json.load(f)
|
options: dict[str, dict[str, Any]] = json.load(f)
|
||||||
module_name = "clan-core"
|
module_name = "clan-core"
|
||||||
for option_name, info in options.items():
|
for option_name, info in options.items():
|
||||||
@@ -189,7 +189,7 @@ def produce_clan_core_docs() -> None:
|
|||||||
|
|
||||||
for outfile, output in core_outputs.items():
|
for outfile, output in core_outputs.items():
|
||||||
(Path(OUT) / outfile).parent.mkdir(parents=True, exist_ok=True)
|
(Path(OUT) / outfile).parent.mkdir(parents=True, exist_ok=True)
|
||||||
with open(Path(OUT) / outfile, "w") as of:
|
with (Path(OUT) / outfile).open("w") as of:
|
||||||
of.write(output)
|
of.write(output)
|
||||||
|
|
||||||
|
|
||||||
@@ -227,9 +227,7 @@ clan_modules_descr = """Clan modules are [NixOS modules](https://wiki.nixos.org/
|
|||||||
|
|
||||||
def produce_clan_modules_docs() -> None:
|
def produce_clan_modules_docs() -> None:
|
||||||
if not CLAN_MODULES:
|
if not CLAN_MODULES:
|
||||||
msg = (
|
msg = f"Environment variables are not set correctly: $out={CLAN_MODULES}"
|
||||||
f"Environment variables are not set correctly: $CLAN_MODULES={CLAN_MODULES}"
|
|
||||||
)
|
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
if not CLAN_CORE_PATH:
|
if not CLAN_CORE_PATH:
|
||||||
@@ -240,7 +238,7 @@ def produce_clan_modules_docs() -> None:
|
|||||||
msg = f"Environment variables are not set correctly: $out={OUT}"
|
msg = f"Environment variables are not set correctly: $out={OUT}"
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
with open(CLAN_MODULES) as f:
|
with Path(CLAN_MODULES).open() as f:
|
||||||
links: dict[str, str] = json.load(f)
|
links: dict[str, str] = json.load(f)
|
||||||
|
|
||||||
# with open(CLAN_MODULES_READMES) as readme:
|
# with open(CLAN_MODULES_READMES) as readme:
|
||||||
@@ -258,9 +256,9 @@ def produce_clan_modules_docs() -> None:
|
|||||||
modules_index += '<div class="grid cards" markdown>\n\n'
|
modules_index += '<div class="grid cards" markdown>\n\n'
|
||||||
|
|
||||||
for module_name, options_file in links.items():
|
for module_name, options_file in links.items():
|
||||||
readme_file = Path(CLAN_CORE_PATH) / "clanModules" / module_name / "README.md"
|
readme_file = CLAN_CORE_PATH / "clanModules" / module_name / "README.md"
|
||||||
print(module_name, readme_file)
|
print(module_name, readme_file)
|
||||||
with open(readme_file) as f:
|
with readme_file.open() as f:
|
||||||
readme = f.read()
|
readme = f.read()
|
||||||
frontmatter: Frontmatter
|
frontmatter: Frontmatter
|
||||||
frontmatter, readme_content = extract_frontmatter(readme, str(readme_file))
|
frontmatter, readme_content = extract_frontmatter(readme, str(readme_file))
|
||||||
@@ -268,7 +266,7 @@ def produce_clan_modules_docs() -> None:
|
|||||||
|
|
||||||
modules_index += build_option_card(module_name, frontmatter)
|
modules_index += build_option_card(module_name, frontmatter)
|
||||||
|
|
||||||
with open(Path(options_file) / "share/doc/nixos/options.json") as f:
|
with (Path(options_file) / "share/doc/nixos/options.json").open() as f:
|
||||||
options: dict[str, dict[str, Any]] = json.load(f)
|
options: dict[str, dict[str, Any]] = json.load(f)
|
||||||
print(f"Rendering options for {module_name}...")
|
print(f"Rendering options for {module_name}...")
|
||||||
output = module_header(module_name)
|
output = module_header(module_name)
|
||||||
@@ -278,7 +276,7 @@ def produce_clan_modules_docs() -> None:
|
|||||||
output += f"{readme_content}\n"
|
output += f"{readme_content}\n"
|
||||||
|
|
||||||
# get_roles(str) -> list[str] | None
|
# get_roles(str) -> list[str] | None
|
||||||
roles = get_roles(str(Path(CLAN_CORE_PATH) / "clanModules" / module_name))
|
roles = get_roles(CLAN_CORE_PATH / "clanModules" / module_name)
|
||||||
if roles:
|
if roles:
|
||||||
output += render_roles(roles, module_name)
|
output += render_roles(roles, module_name)
|
||||||
|
|
||||||
@@ -293,14 +291,14 @@ def produce_clan_modules_docs() -> None:
|
|||||||
parents=True,
|
parents=True,
|
||||||
exist_ok=True,
|
exist_ok=True,
|
||||||
)
|
)
|
||||||
with open(outfile, "w") as of:
|
with outfile.open("w") as of:
|
||||||
of.write(output)
|
of.write(output)
|
||||||
|
|
||||||
modules_index += "</div>"
|
modules_index += "</div>"
|
||||||
modules_index += "\n"
|
modules_index += "\n"
|
||||||
modules_outfile = Path(OUT) / "clanModules/index.md"
|
modules_outfile = Path(OUT) / "clanModules/index.md"
|
||||||
|
|
||||||
with open(modules_outfile, "w") as of:
|
with modules_outfile.open("w") as of:
|
||||||
of.write(modules_index)
|
of.write(modules_index)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ def _init_proc(
|
|||||||
os.setsid()
|
os.setsid()
|
||||||
|
|
||||||
# Open stdout and stderr
|
# Open stdout and stderr
|
||||||
with open(out_file, "w") as out_fd:
|
with out_file.open("w") as out_fd:
|
||||||
os.dup2(out_fd.fileno(), sys.stdout.fileno())
|
os.dup2(out_fd.fileno(), sys.stdout.fileno())
|
||||||
os.dup2(out_fd.fileno(), sys.stderr.fileno())
|
os.dup2(out_fd.fileno(), sys.stderr.fileno())
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from clan_cli.inventory import (
|
from clan_cli.inventory import (
|
||||||
AdminConfig,
|
AdminConfig,
|
||||||
ServiceAdmin,
|
ServiceAdmin,
|
||||||
@@ -25,7 +27,7 @@ def get_admin_service(base_url: str) -> ServiceAdmin | None:
|
|||||||
@API.register
|
@API.register
|
||||||
def set_admin_service(
|
def set_admin_service(
|
||||||
base_url: str,
|
base_url: str,
|
||||||
allowed_keys: dict[str, str],
|
allowed_keys: dict[str, Path],
|
||||||
instance_name: str = "admin",
|
instance_name: str = "admin",
|
||||||
extra_machines: list[str] | None = None,
|
extra_machines: list[str] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
@@ -43,10 +45,10 @@ def set_admin_service(
|
|||||||
|
|
||||||
keys = {}
|
keys = {}
|
||||||
for name, keyfile in allowed_keys.items():
|
for name, keyfile in allowed_keys.items():
|
||||||
if not keyfile.startswith("/"):
|
if not keyfile.is_absolute():
|
||||||
msg = f"Keyfile '{keyfile}' must be an absolute path"
|
msg = f"Keyfile '{keyfile}' must be an absolute path"
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
with open(keyfile) as f:
|
with keyfile.open() as f:
|
||||||
pubkey = f.read()
|
pubkey = f.read()
|
||||||
keys[name] = pubkey
|
keys[name] = pubkey
|
||||||
|
|
||||||
|
|||||||
@@ -69,8 +69,8 @@ def extract_frontmatter(readme_content: str, err_scope: str) -> tuple[Frontmatte
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_roles(module_path: str) -> None | list[str]:
|
def get_roles(module_path: Path) -> None | list[str]:
|
||||||
roles_dir = Path(module_path) / "roles"
|
roles_dir = module_path / "roles"
|
||||||
if not roles_dir.exists() or not roles_dir.is_dir():
|
if not roles_dir.exists() or not roles_dir.is_dir():
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -117,14 +117,14 @@ def list_modules(base_path: str) -> dict[str, ModuleInfo]:
|
|||||||
"""
|
"""
|
||||||
modules = get_modules(base_path)
|
modules = get_modules(base_path)
|
||||||
return {
|
return {
|
||||||
module_name: get_module_info(module_name, module_path)
|
module_name: get_module_info(module_name, Path(module_path))
|
||||||
for module_name, module_path in modules.items()
|
for module_name, module_path in modules.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def get_module_info(
|
def get_module_info(
|
||||||
module_name: str,
|
module_name: str,
|
||||||
module_path: str,
|
module_path: Path,
|
||||||
) -> ModuleInfo:
|
) -> ModuleInfo:
|
||||||
"""
|
"""
|
||||||
Retrieves information about a module
|
Retrieves information about a module
|
||||||
@@ -136,7 +136,7 @@ def get_module_info(
|
|||||||
location=f"show_module_info {module_name}",
|
location=f"show_module_info {module_name}",
|
||||||
description="Module does not exist",
|
description="Module does not exist",
|
||||||
)
|
)
|
||||||
module_readme = Path(module_path) / "README.md"
|
module_readme = module_path / "README.md"
|
||||||
if not module_readme.exists():
|
if not module_readme.exists():
|
||||||
msg = "Module not found"
|
msg = "Module not found"
|
||||||
raise ClanError(
|
raise ClanError(
|
||||||
@@ -144,7 +144,7 @@ def get_module_info(
|
|||||||
location=f"show_module_info {module_name}",
|
location=f"show_module_info {module_name}",
|
||||||
description="Module does not exist or doesn't have any README.md file",
|
description="Module does not exist or doesn't have any README.md file",
|
||||||
)
|
)
|
||||||
with open(module_readme) as f:
|
with module_readme.open() as f:
|
||||||
readme = f.read()
|
readme = f.read()
|
||||||
frontmatter, readme_content = extract_frontmatter(
|
frontmatter, readme_content = extract_frontmatter(
|
||||||
readme, f"{module_path}/README.md"
|
readme, f"{module_path}/README.md"
|
||||||
|
|||||||
@@ -114,7 +114,7 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"path", type=Path, help="Path to the clan directory", default=Path(".")
|
"path", type=Path, help="Path to the clan directory", default=Path()
|
||||||
)
|
)
|
||||||
|
|
||||||
def create_flake_command(args: argparse.Namespace) -> None:
|
def create_flake_command(args: argparse.Namespace) -> None:
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ def get_or_set_option(args: argparse.Namespace) -> None:
|
|||||||
args.flake, machine_name=args.machine, show_trace=args.show_trace
|
args.flake, machine_name=args.machine, show_trace=args.show_trace
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
with open(args.options_file) as f:
|
with args.options_file.open() as f:
|
||||||
options = json.load(f)
|
options = json.load(f)
|
||||||
# compute settings json file location
|
# compute settings json file location
|
||||||
if args.settings_file is None:
|
if args.settings_file is None:
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ def list_state_folders(machine: str, service: None | str = None) -> None:
|
|||||||
if (clan_dir_result := get_clan_flake_toplevel_or_env()) is not None:
|
if (clan_dir_result := get_clan_flake_toplevel_or_env()) is not None:
|
||||||
flake = clan_dir_result
|
flake = clan_dir_result
|
||||||
else:
|
else:
|
||||||
flake = Path(".")
|
flake = Path()
|
||||||
cmd = nix_eval(
|
cmd = nix_eval(
|
||||||
[
|
[
|
||||||
f"{flake}#nixosConfigurations.{machine}.config.clanCore.state",
|
f"{flake}#nixosConfigurations.{machine}.config.clanCore.state",
|
||||||
|
|||||||
@@ -20,8 +20,7 @@ def parse_args() -> argparse.Namespace:
|
|||||||
|
|
||||||
def get_current_shell() -> str | None:
|
def get_current_shell() -> str | None:
|
||||||
if selected_shell_file.exists():
|
if selected_shell_file.exists():
|
||||||
with open(selected_shell_file) as f:
|
return selected_shell_file.read_text().strip()
|
||||||
return f.read().strip()
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
@@ -57,7 +56,7 @@ def select_shell(shell: str) -> None:
|
|||||||
print(f"{shell} devshell already selected. No changes made.")
|
print(f"{shell} devshell already selected. No changes made.")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
else:
|
else:
|
||||||
with open(selected_shell_file, "w") as f:
|
with selected_shell_file.open("w") as f:
|
||||||
f.write(shell)
|
f.write(shell)
|
||||||
print(f"{shell} devshell selected")
|
print(f"{shell} devshell selected")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user