diff --git a/clanModules/mumble/mumble-populate-channels.py b/clanModules/mumble/mumble-populate-channels.py index fe2a99188..fe46448ae 100644 --- a/clanModules/mumble/mumble-populate-channels.py +++ b/clanModules/mumble/mumble-populate-channels.py @@ -1,35 +1,34 @@ import argparse import json -import os import sqlite3 +from pathlib import Path -def ensure_config(path: str, db_path: str) -> None: +def ensure_config(path: Path, db_path: Path) -> None: # Default JSON structure if the file doesn't exist default_json = { "misc": { "audio_wizard_has_been_shown": True, - "database_location": db_path, + "database_location": str(db_path), "viewed_server_ping_consent_message": True, }, "settings_version": 1, } # Check if the file exists - if os.path.exists(path): - with open(path) as file: - data = json.load(file) + if path.exists(): + data = json.loads(path.read_text()) else: data = default_json # Create the file with default JSON structure - with open(path, "w") as file: + with path.open("w") as file: json.dump(data, file, indent=4) # TODO: make sure to only update the diff updated_data = {**default_json, **data} # Write the modified JSON object back to the file - with open(path, "w") as file: + with path.open("w") as file: json.dump(updated_data, file, indent=4) @@ -206,7 +205,7 @@ if __name__ == "__main__": parser.add_argument("--servers") parser.add_argument("--username") parser.add_argument("--db-location") - parser.add_argument("--ensure-config") + parser.add_argument("--ensure-config", type=Path) args = parser.parse_args() print(args) diff --git a/docs/nix/flake-module.nix b/docs/nix/flake-module.nix index d2d82d6f4..89aef7e69 100644 --- a/docs/nix/flake-module.nix +++ b/docs/nix/flake-module.nix @@ -23,7 +23,7 @@ # Simply evaluated options (JSON) renderOptions = - pkgs.runCommand "renderOptions.py" + pkgs.runCommand "render-options" { # TODO: ruff does not splice properly in nativeBuildInputs depsBuildBuild = [ pkgs.ruff ]; @@ -34,12 +34,12 @@ ]; } '' - install ${./scripts/renderOptions.py} $out - patchShebangs --build $out + install -D -m755 ${./render_options}/__init__.py $out/bin/render-options + patchShebangs --build $out/bin/render-options - ruff format --check --diff $out - ruff check --line-length 88 $out - mypy --strict $out + ruff format --check --diff $out/bin/render-options + ruff check --line-length 88 $out/bin/render-options + mypy --strict $out/bin/render-options ''; asciinema-player-js = pkgs.fetchurl { @@ -70,7 +70,7 @@ mkdir $out # The python script will place mkDocs files in the output directory - python3 ${renderOptions} + exec python3 ${renderOptions}/bin/render-options ''; in { diff --git a/docs/nix/scripts/renderOptions.py b/docs/nix/render_options/__init__.py similarity index 93% rename from docs/nix/scripts/renderOptions.py rename to docs/nix/render_options/__init__.py index efc1ff7f4..3abcadd5a 100644 --- a/docs/nix/scripts/renderOptions.py +++ b/docs/nix/render_options/__init__.py @@ -31,8 +31,8 @@ from typing import Any from clan_cli.api.modules import Frontmatter, extract_frontmatter, get_roles # Get environment variables -CLAN_CORE_PATH = os.getenv("CLAN_CORE_PATH") -CLAN_CORE_DOCS = os.getenv("CLAN_CORE_DOCS") +CLAN_CORE_PATH = Path(os.environ["CLAN_CORE_PATH"]) +CLAN_CORE_DOCS = Path(os.environ["CLAN_CORE_DOCS"]) CLAN_MODULES = os.environ.get("CLAN_MODULES") OUT = os.environ.get("out") @@ -161,7 +161,7 @@ def produce_clan_core_docs() -> None: # A mapping of output file to content core_outputs: dict[str, str] = {} - with open(CLAN_CORE_DOCS) as f: + with CLAN_CORE_DOCS.open() as f: options: dict[str, dict[str, Any]] = json.load(f) module_name = "clan-core" for option_name, info in options.items(): @@ -189,7 +189,7 @@ def produce_clan_core_docs() -> None: for outfile, output in core_outputs.items(): (Path(OUT) / outfile).parent.mkdir(parents=True, exist_ok=True) - with open(Path(OUT) / outfile, "w") as of: + with (Path(OUT) / outfile).open("w") as of: of.write(output) @@ -227,9 +227,7 @@ clan_modules_descr = """Clan modules are [NixOS modules](https://wiki.nixos.org/ def produce_clan_modules_docs() -> None: if not CLAN_MODULES: - msg = ( - f"Environment variables are not set correctly: $CLAN_MODULES={CLAN_MODULES}" - ) + msg = f"Environment variables are not set correctly: $out={CLAN_MODULES}" raise ValueError(msg) if not CLAN_CORE_PATH: @@ -240,7 +238,7 @@ def produce_clan_modules_docs() -> None: msg = f"Environment variables are not set correctly: $out={OUT}" raise ValueError(msg) - with open(CLAN_MODULES) as f: + with Path(CLAN_MODULES).open() as f: links: dict[str, str] = json.load(f) # with open(CLAN_MODULES_READMES) as readme: @@ -258,9 +256,9 @@ def produce_clan_modules_docs() -> None: modules_index += '
\n\n' for module_name, options_file in links.items(): - readme_file = Path(CLAN_CORE_PATH) / "clanModules" / module_name / "README.md" + readme_file = CLAN_CORE_PATH / "clanModules" / module_name / "README.md" print(module_name, readme_file) - with open(readme_file) as f: + with readme_file.open() as f: readme = f.read() frontmatter: Frontmatter frontmatter, readme_content = extract_frontmatter(readme, str(readme_file)) @@ -268,7 +266,7 @@ def produce_clan_modules_docs() -> None: modules_index += build_option_card(module_name, frontmatter) - with open(Path(options_file) / "share/doc/nixos/options.json") as f: + with (Path(options_file) / "share/doc/nixos/options.json").open() as f: options: dict[str, dict[str, Any]] = json.load(f) print(f"Rendering options for {module_name}...") output = module_header(module_name) @@ -278,7 +276,7 @@ def produce_clan_modules_docs() -> None: output += f"{readme_content}\n" # get_roles(str) -> list[str] | None - roles = get_roles(str(Path(CLAN_CORE_PATH) / "clanModules" / module_name)) + roles = get_roles(CLAN_CORE_PATH / "clanModules" / module_name) if roles: output += render_roles(roles, module_name) @@ -293,14 +291,14 @@ def produce_clan_modules_docs() -> None: parents=True, exist_ok=True, ) - with open(outfile, "w") as of: + with outfile.open("w") as of: of.write(output) modules_index += "
" modules_index += "\n" modules_outfile = Path(OUT) / "clanModules/index.md" - with open(modules_outfile, "w") as of: + with modules_outfile.open("w") as of: of.write(modules_index) diff --git a/pkgs/clan-app/clan_app/components/executor.py b/pkgs/clan-app/clan_app/components/executor.py index b97013c50..137ff8ff4 100644 --- a/pkgs/clan-app/clan_app/components/executor.py +++ b/pkgs/clan-app/clan_app/components/executor.py @@ -64,7 +64,7 @@ def _init_proc( os.setsid() # Open stdout and stderr - with open(out_file, "w") as out_fd: + with out_file.open("w") as out_fd: os.dup2(out_fd.fileno(), sys.stdout.fileno()) os.dup2(out_fd.fileno(), sys.stderr.fileno()) diff --git a/pkgs/clan-cli/clan_cli/api/admin.py b/pkgs/clan-cli/clan_cli/api/admin.py index 124127c22..bb076fd97 100644 --- a/pkgs/clan-cli/clan_cli/api/admin.py +++ b/pkgs/clan-cli/clan_cli/api/admin.py @@ -1,3 +1,5 @@ +from pathlib import Path + from clan_cli.inventory import ( AdminConfig, ServiceAdmin, @@ -25,7 +27,7 @@ def get_admin_service(base_url: str) -> ServiceAdmin | None: @API.register def set_admin_service( base_url: str, - allowed_keys: dict[str, str], + allowed_keys: dict[str, Path], instance_name: str = "admin", extra_machines: list[str] | None = None, ) -> None: @@ -43,10 +45,10 @@ def set_admin_service( keys = {} for name, keyfile in allowed_keys.items(): - if not keyfile.startswith("/"): + if not keyfile.is_absolute(): msg = f"Keyfile '{keyfile}' must be an absolute path" raise ValueError(msg) - with open(keyfile) as f: + with keyfile.open() as f: pubkey = f.read() keys[name] = pubkey diff --git a/pkgs/clan-cli/clan_cli/api/modules.py b/pkgs/clan-cli/clan_cli/api/modules.py index 7960c952a..b5e363db5 100644 --- a/pkgs/clan-cli/clan_cli/api/modules.py +++ b/pkgs/clan-cli/clan_cli/api/modules.py @@ -69,8 +69,8 @@ def extract_frontmatter(readme_content: str, err_scope: str) -> tuple[Frontmatte ) -def get_roles(module_path: str) -> None | list[str]: - roles_dir = Path(module_path) / "roles" +def get_roles(module_path: Path) -> None | list[str]: + roles_dir = module_path / "roles" if not roles_dir.exists() or not roles_dir.is_dir(): return None @@ -117,14 +117,14 @@ def list_modules(base_path: str) -> dict[str, ModuleInfo]: """ modules = get_modules(base_path) return { - module_name: get_module_info(module_name, module_path) + module_name: get_module_info(module_name, Path(module_path)) for module_name, module_path in modules.items() } def get_module_info( module_name: str, - module_path: str, + module_path: Path, ) -> ModuleInfo: """ Retrieves information about a module @@ -136,7 +136,7 @@ def get_module_info( location=f"show_module_info {module_name}", description="Module does not exist", ) - module_readme = Path(module_path) / "README.md" + module_readme = module_path / "README.md" if not module_readme.exists(): msg = "Module not found" raise ClanError( @@ -144,7 +144,7 @@ def get_module_info( location=f"show_module_info {module_name}", description="Module does not exist or doesn't have any README.md file", ) - with open(module_readme) as f: + with module_readme.open() as f: readme = f.read() frontmatter, readme_content = extract_frontmatter( readme, f"{module_path}/README.md" diff --git a/pkgs/clan-cli/clan_cli/clan/create.py b/pkgs/clan-cli/clan_cli/clan/create.py index 65cdb0559..84c7d0132 100644 --- a/pkgs/clan-cli/clan_cli/clan/create.py +++ b/pkgs/clan-cli/clan_cli/clan/create.py @@ -114,7 +114,7 @@ def register_create_parser(parser: argparse.ArgumentParser) -> None: ) parser.add_argument( - "path", type=Path, help="Path to the clan directory", default=Path(".") + "path", type=Path, help="Path to the clan directory", default=Path() ) def create_flake_command(args: argparse.Namespace) -> None: diff --git a/pkgs/clan-cli/clan_cli/config/__init__.py b/pkgs/clan-cli/clan_cli/config/__init__.py index 13489425e..b2189304d 100644 --- a/pkgs/clan-cli/clan_cli/config/__init__.py +++ b/pkgs/clan-cli/clan_cli/config/__init__.py @@ -172,7 +172,7 @@ def get_or_set_option(args: argparse.Namespace) -> None: args.flake, machine_name=args.machine, show_trace=args.show_trace ) else: - with open(args.options_file) as f: + with args.options_file.open() as f: options = json.load(f) # compute settings json file location if args.settings_file is None: diff --git a/pkgs/clan-cli/clan_cli/state/list.py b/pkgs/clan-cli/clan_cli/state/list.py index d07b1c20f..ba6a1d684 100644 --- a/pkgs/clan-cli/clan_cli/state/list.py +++ b/pkgs/clan-cli/clan_cli/state/list.py @@ -21,7 +21,7 @@ def list_state_folders(machine: str, service: None | str = None) -> None: if (clan_dir_result := get_clan_flake_toplevel_or_env()) is not None: flake = clan_dir_result else: - flake = Path(".") + flake = Path() cmd = nix_eval( [ f"{flake}#nixosConfigurations.{machine}.config.clanCore.state", diff --git a/pkgs/moonlight-sunshine-accept/.envrc b/pkgs/moonlight-sunshine-accept/.envrc new file mode 100644 index 000000000..1799d252a --- /dev/null +++ b/pkgs/moonlight-sunshine-accept/.envrc @@ -0,0 +1,2 @@ +# shellcheck shell=bash +use flake .#moonlight-sunshine-accept diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/init_certificates.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/init_certificates.py index f7ce69b43..c368e9bde 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/init_certificates.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/init_certificates.py @@ -1,6 +1,5 @@ import argparse import datetime -import os from datetime import timedelta from pathlib import Path @@ -54,7 +53,7 @@ def private_key_to_pem(private_key: rsa.RSAPrivateKey) -> bytes: return pem_private_key -def init_credentials() -> (str, str): +def init_credentials() -> tuple[bytes, bytes]: private_key = generate_private_key() certificate = generate_certificate(private_key) private_key_pem = private_key_to_pem(private_key) @@ -63,15 +62,11 @@ def init_credentials() -> (str, str): def write_credentials(_args: argparse.Namespace) -> None: pem_certificate, pem_private_key = init_credentials() - credentials_path = os.getcwd() + "credentials" + credentials_path = Path.cwd() / "credentials" Path(credentials_path).mkdir(parents=True, exist_ok=True) - cacaert_path = os.path.join(credentials_path, "cacert.pem") - with open(cacaert_path, mode="wb") as file: - file.write(pem_certificate) - cakey_path = os.path.join(credentials_path, "cakey.pem") - with open(cakey_path, mode="wb") as file: - file.write(pem_private_key) + (credentials_path / "cacert.pem").write_bytes(pem_certificate) + (credentials_path / "cakey.pem").write_bytes(pem_private_key) print("Finished writing moonlight credentials") diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/join.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/join.py index 0ac104338..45a6c4b0a 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/join.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/join.py @@ -33,8 +33,7 @@ def send_join_request_api(host: str, port: int) -> bool: with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: s.connect((host, port)) - json_body = {"type": "api", "pin": pin} - json_body = json.dumps(json_body) + json_body = json.dumps({"type": "api", "pin": pin}) request = ( f"POST / HTTP/1.1\r\n" f"Content-Type: application/json\r\n" @@ -62,14 +61,15 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool: with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: s.connect((host, port)) encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode("utf-8") - json_body = {"type": "native", "uuid": uuid, "cert": encoded_cert} - json_body = json.dumps(json_body) + json_body_str = json.dumps( + {"type": "native", "uuid": uuid, "cert": encoded_cert} + ) request = ( f"POST / HTTP/1.1\r\n" f"Content-Type: application/json\r\n" - f"Content-Length: {len(json_body)}\r\n" + f"Content-Length: {len(json_body_str)}\r\n" f"Connection: close\r\n\r\n" - f"{json_body}" + f"{json_body_str}" ) try: s.sendall(request.encode("utf-8")) @@ -78,7 +78,7 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool: lines = response.split("\n") body = "\n".join(lines[2:])[2:] print(body) - return body + return True except Exception as e: print(f"An error occurred: {e}") # TODO: fix @@ -98,6 +98,7 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool: print(f"Failed to decode JSON: {e}") pos = e.pos print(f"Failed to decode JSON: unexpected character {response[pos]}") + return False def join(args: argparse.Namespace) -> None: @@ -113,6 +114,7 @@ def join(args: argparse.Namespace) -> None: # TODO: If cert is not provided parse from config # cert = args.cert cert = get_moonlight_certificate() + assert port is not None if send_join_request(host, port, cert): print(f"Successfully joined sunshine host: {host}") else: diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/run.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/run.py index 2cf12df7e..dc315f14e 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/run.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/run.py @@ -4,8 +4,8 @@ import threading class MoonlightPairing: - def __init__(self) -> "MoonlightPairing": - self.process = None + def __init__(self) -> None: + self.process: subprocess.Popen | None = None self.output = "" self.found = threading.Event() @@ -45,6 +45,8 @@ class MoonlightPairing: self.process.wait() def stream_output(self, target_string: str) -> None: + assert self.process is not None + assert self.process.stdout is not None for line in iter(self.process.stdout.readline, b""): line = line.decode() self.output += line diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/state.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/state.py index 11a8ee3d3..c4bb1b5e2 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/state.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/state.py @@ -3,21 +3,20 @@ import os import random import string from configparser import ConfigParser, DuplicateSectionError, NoOptionError +from pathlib import Path -def moonlight_config_dir() -> str: - return os.path.join( - os.path.expanduser("~"), ".config", "Moonlight Game Streaming Project" - ) +def moonlight_config_dir() -> Path: + return Path.home() / ".config" / "Moonlight Game Streaming Project" -def moonlight_state_file() -> str: - return os.path.join(moonlight_config_dir(), "Moonlight.conf") +def moonlight_state_file() -> Path: + return moonlight_config_dir() / "Moonlight.conf" def load_state() -> ConfigParser | None: try: - with open(moonlight_state_file()) as file: + with moonlight_state_file().open() as file: config = ConfigParser() config.read_file(file) print(config.sections()) @@ -65,8 +64,8 @@ def init_state(certificate: str, key: str) -> None: config.write(file) -def write_state(data: ConfigParser) -> bool: - with open(moonlight_state_file(), "w") as file: +def write_state(data: ConfigParser) -> None: + with moonlight_state_file().open("w") as file: data.write(file) @@ -84,22 +83,24 @@ def add_sunshine_host_to_parser( new_host = no_hosts + 1 - config.set("hosts", f"{new_host}\srvcert", convert_string_to_bytearray(certificate)) + config.set( + "hosts", f"{new_host}\\srvcert", convert_string_to_bytearray(certificate) + ) config.set("hosts", "size", str(new_host)) config.set("hosts", f"{new_host}\\uuid", uuid) - config.set("hosts", f"{new_host}\hostname", hostname) + config.set("hosts", f"{new_host}\\hostname", hostname) config.set("hosts", f"{new_host}\\nvidiasv", "false") - config.set("hosts", f"{new_host}\customname", "false") - config.set("hosts", f"{new_host}\manualaddress", manual_host) - config.set("hosts", f"{new_host}\manualport", "47989") + config.set("hosts", f"{new_host}\\customname", "false") + config.set("hosts", f"{new_host}\\manualaddress", manual_host) + config.set("hosts", f"{new_host}\\manualport", "47989") config.set("hosts", f"{new_host}\\remoteport", "0") config.set("hosts", f"{new_host}\\remoteaddress", "") - config.set("hosts", f"{new_host}\localaddress", "") - config.set("hosts", f"{new_host}\localport", "0") - config.set("hosts", f"{new_host}\ipv6port", "0") - config.set("hosts", f"{new_host}\ipv6address", "") + config.set("hosts", f"{new_host}\\localaddress", "") + config.set("hosts", f"{new_host}\\localport", "0") + config.set("hosts", f"{new_host}\\ipv6port", "0") + config.set("hosts", f"{new_host}\\ipv6address", "") config.set( - "hosts", f"{new_host}\mac", convert_string_to_bytearray("\\xceop\\x8d\\xfc{") + "hosts", f"{new_host}\\mac", convert_string_to_bytearray("\\xceop\\x8d\\xfc{") ) add_app(config, "Desktop", new_host, 1, 881448767) add_app(config, "Low Res Desktop", new_host, 2, 303580669) @@ -114,7 +115,7 @@ def add_sunshine_host_to_parser( def add_app( config: ConfigParser, name: str, host_id: int, app_id: int, app_no: int ) -> None: - identifier = f"{host_id}\\apps\{app_id}\\" + identifier = f"{host_id}\\apps\\{app_id}\\" config.set("hosts", f"{identifier}appcollector", "false") config.set("hosts", f"{identifier}directlaunch", "false") config.set("hosts", f"{identifier}hdr", "false") diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/uri.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/uri.py index 840247a3f..0b4e4aac3 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/uri.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/moonlight/uri.py @@ -1,7 +1,7 @@ from urllib.parse import urlparse -def parse_moonlight_uri(uri: str) -> (str, str): +def parse_moonlight_uri(uri: str) -> tuple[str, int | None]: print(uri) if uri.startswith("moonlight:"): # Fixes a bug where moonlight:// is not parsed correctly @@ -13,5 +13,8 @@ def parse_moonlight_uri(uri: str) -> (str, str): msg = f"Invalid moonlight URI: {uri}" raise ValueError(msg) hostname = parsed.hostname + if hostname is None: + msg = f"Invalid moonlight URI: {uri}" + raise ValueError port = parsed.port return (hostname, port) diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/api.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/api.py index 206f0d6fb..1df45e1dc 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/api.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/api.py @@ -3,7 +3,7 @@ import http.client import json -def get_context() -> http.client.ssl.SSLContext: +def get_context() -> http.client.ssl.SSLContext: # type: ignore # context = http.client.ssl.create_default_context() # # context.load_cert_chain( # # certfile="/var/lib/sunshine/sunshine.cert", keyfile="/var/lib/sunshine/sunshine.key" @@ -12,7 +12,7 @@ def get_context() -> http.client.ssl.SSLContext: # certfile="/home/kenji/.config/sunshine/credentials/cacert.pem", # keyfile="/home/kenji/.config/sunshine/credentials/cakey.pem", # ) - return http.client.ssl._create_unverified_context() # noqa: SLF001 + return http.client.ssl._create_unverified_context() # type: ignore # noqa: SLF001 def pair(pin: str) -> str: @@ -44,7 +44,7 @@ def restart() -> None: conn = http.client.HTTPSConnection( "localhost", 47990, - context=http.client.ssl._create_unverified_context(), # noqa: SLF001 + context=http.client.ssl._create_unverified_context(), # type: ignore # noqa: SLF001 ) user_and_pass = base64.b64encode(b"sunshine:sunshine").decode("ascii") headers = { @@ -52,11 +52,8 @@ def restart() -> None: "Authorization": f"Basic {user_and_pass}", } - # Define the parameters - params = {} - # Make the request - conn.request("POST", "/api/restart", params, headers) + conn.request("POST", "/api/restart", {}, headers) # Get and print the response # There wont be a response, because it is restarted diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/config.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/config.py index 55a088e12..83e79bce2 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/config.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/config.py @@ -1,5 +1,6 @@ import configparser -import os +from dataclasses import dataclass +from pathlib import Path # address_family = both # channels = 5 @@ -11,26 +12,29 @@ import os PSEUDO_SECTION = "DEFAULT" +@dataclass class Config: + config: configparser.ConfigParser + config_location: Path _instance = None - def __new__(cls, config_location: str | None = None) -> "Config": + def __new__(cls, config_location: Path | None = None) -> "Config": if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.config = configparser.ConfigParser() config = config_location or cls._instance.default_sunshine_config_file() cls._instance.config_location = config - with open(config) as f: + with config.open() as f: config_string = f"[{PSEUDO_SECTION}]\n" + f.read() print(config_string) cls._instance.config.read_string(config_string) return cls._instance - def default_sunshine_config_dir(self) -> str: - return os.path.join(os.path.expanduser("~"), ".config", "sunshine") + def default_sunshine_config_dir(self) -> Path: + return Path.home() / ".config" / "sunshine" - def default_sunshine_config_file(self) -> str: - return os.path.join(self.default_sunshine_config_dir(), "sunshine.conf") + def default_sunshine_config_file(self) -> Path: + return self.default_sunshine_config_dir() / "sunshine.conf" def get_private_key(self) -> str: return self.config.get(PSEUDO_SECTION, "pkey") diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/init_certificates.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/init_certificates.py index 2eeeb3bf7..a422482db 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/init_certificates.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/init_certificates.py @@ -50,7 +50,7 @@ def private_key_to_pem(private_key: rsa.RSAPrivateKey) -> bytes: return pem_private_key -def init_credentials() -> (str, str): +def init_credentials() -> tuple[bytes, bytes]: private_key = generate_private_key() certificate = generate_certificate(private_key) private_key_pem = private_key_to_pem(private_key) @@ -64,14 +64,12 @@ def uniqueid() -> str: def write_credentials(_args: argparse.Namespace) -> None: print("Writing sunshine credentials") pem_certificate, pem_private_key = init_credentials() - Path("credentials").mkdir(parents=True, exist_ok=True) - with open("credentials/cacert.pem", mode="wb") as file: - file.write(pem_certificate) - with open("credentials/cakey.pem", mode="wb") as file: - file.write(pem_private_key) + credentials_dir = Path("credentials") + credentials_dir.mkdir(parents=True, exist_ok=True) + (credentials_dir / "cacert.pem").write_bytes(pem_certificate) + (credentials_dir / "cakey.pem").write_bytes(pem_private_key) print("Generating sunshine UUID") - with open("uuid", mode="w") as file: - file.write(uniqueid()) + Path("uuid").write_text(uniqueid()) def register_initialization_parser(parser: argparse.ArgumentParser) -> None: diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/listen.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/listen.py index 8bb50b482..5fe24adaa 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/listen.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/listen.py @@ -32,10 +32,9 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool: try: request = data.decode("utf-8") - body = request.split("\n")[-1] - print(body) - body = json.loads(f"{body}") - print(body) + raw_body = request.split("\n")[-1] + print(raw_body) + body = json.loads(raw_body) pair_type = body.get("type", "") @@ -43,7 +42,7 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool: print("Api request") status = pair(body.get("pin", "")) status = json.dumps(status) - response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\{status}\r\n" + response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\\{status}\r\n" client_socket.sendall(response.encode("utf-8")) if pair_type == "native": @@ -57,12 +56,12 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool: encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode( "utf-8" ) - json_body = {} - json_body["uuid"] = uuid - json_body["cert"] = encoded_cert - json_body["hostname"] = socket.gethostname() - json_body = json.dumps(json_body) - response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\{json_body}\r\n" + json_data = {} + json_data["uuid"] = uuid + json_data["cert"] = encoded_cert + json_data["hostname"] = socket.gethostname() + json_body = json.dumps(json_data) + response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\\{json_body}\r\n" client_socket.sendall(response.encode("utf-8")) # add_moonlight_client(decoded_cert, state_file, rec_uuid) diff --git a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/state.py b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/state.py index 0be4dc809..0f96d755b 100644 --- a/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/state.py +++ b/pkgs/moonlight-sunshine-accept/moonlight_sunshine_accept/sunshine/state.py @@ -1,33 +1,31 @@ import json -import os +from pathlib import Path from typing import Any -def default_sunshine_config_dir() -> str: - return os.path.join(os.path.expanduser("~"), ".config", "sunshine") +def default_sunshine_config_dir() -> Path: + return Path.home() / ".config" / "sunshine" -def default_sunshine_state_file() -> str: - return os.path.join(default_sunshine_config_dir(), "sunshine_state.json") +def default_sunshine_state_file() -> Path: + return default_sunshine_config_dir() / "sunshine_state.json" -def load_state(sunshine_state_path: str) -> dict[str, Any] | None: +def load_state(sunshine_state_path: Path) -> str | None: sunshine_state_path = sunshine_state_path or default_sunshine_state_file() print(f"Loading sunshine state from {sunshine_state_path}") try: - with open(sunshine_state_path) as file: - config = file.read() - return config + return json.loads(sunshine_state_path.read_text()) except FileNotFoundError: print("Sunshine state file not found.") return None # this needs to be created before sunshine is first run -def init_state(uuid: str, sunshine_state_path: str) -> None: +def init_state(uuid: str, sunshine_state_path: Path) -> None: print("Initializing sunshine state.") - data = {} + data: dict[str, Any] = {} data["root"] = {} data["root"]["uniqueid"] = uuid data["root"]["devices"] = [] @@ -36,9 +34,9 @@ def init_state(uuid: str, sunshine_state_path: str) -> None: write_state(data, sunshine_state_path) -def write_state(data: dict[str, Any], sunshine_state_path: str) -> None: +def write_state(data: dict[str, Any], sunshine_state_path: Path) -> None: sunshine_state_path = sunshine_state_path or default_sunshine_state_file() - with open(sunshine_state_path, "w") as file: + with sunshine_state_path.open("w") as file: json.dump(data, file, indent=4) @@ -48,11 +46,13 @@ def pseudo_uuid() -> str: # TODO: finish this function -def add_moonlight_client(certificate: str, sunshine_state_path: str, uuid: str) -> None: +def add_moonlight_client( + certificate: str, sunshine_state_path: Path, uuid: str +) -> None: print("Adding moonlight client to sunshine state.") - state = load_state(sunshine_state_path) - if state: - state = json.loads(state) + raw_state = load_state(sunshine_state_path) + if raw_state: + state = json.loads(raw_state) if not state["root"]["devices"]: state["root"]["devices"].append( diff --git a/pkgs/scripts/select-shell.py b/pkgs/scripts/select-shell.py index cc8484f3e..c4e25b915 100644 --- a/pkgs/scripts/select-shell.py +++ b/pkgs/scripts/select-shell.py @@ -20,8 +20,7 @@ def parse_args() -> argparse.Namespace: def get_current_shell() -> str | None: if selected_shell_file.exists(): - with open(selected_shell_file) as f: - return f.read().strip() + return selected_shell_file.read_text().strip() return None @@ -57,7 +56,7 @@ def select_shell(shell: str) -> None: print(f"{shell} devshell already selected. No changes made.") sys.exit(0) else: - with open(selected_shell_file, "w") as f: + with selected_shell_file.open("w") as f: f.write(shell) print(f"{shell} devshell selected")