moonlight-sunshine-accept: use pathlib and fix types
This commit is contained in:
@@ -3,7 +3,7 @@ import http.client
|
||||
import json
|
||||
|
||||
|
||||
def get_context() -> http.client.ssl.SSLContext:
|
||||
def get_context() -> http.client.ssl.SSLContext: # type: ignore
|
||||
# context = http.client.ssl.create_default_context()
|
||||
# # context.load_cert_chain(
|
||||
# # certfile="/var/lib/sunshine/sunshine.cert", keyfile="/var/lib/sunshine/sunshine.key"
|
||||
@@ -12,7 +12,7 @@ def get_context() -> http.client.ssl.SSLContext:
|
||||
# certfile="/home/kenji/.config/sunshine/credentials/cacert.pem",
|
||||
# keyfile="/home/kenji/.config/sunshine/credentials/cakey.pem",
|
||||
# )
|
||||
return http.client.ssl._create_unverified_context() # noqa: SLF001
|
||||
return http.client.ssl._create_unverified_context() # type: ignore # noqa: SLF001
|
||||
|
||||
|
||||
def pair(pin: str) -> str:
|
||||
@@ -44,7 +44,7 @@ def restart() -> None:
|
||||
conn = http.client.HTTPSConnection(
|
||||
"localhost",
|
||||
47990,
|
||||
context=http.client.ssl._create_unverified_context(), # noqa: SLF001
|
||||
context=http.client.ssl._create_unverified_context(), # type: ignore # noqa: SLF001
|
||||
)
|
||||
user_and_pass = base64.b64encode(b"sunshine:sunshine").decode("ascii")
|
||||
headers = {
|
||||
@@ -52,11 +52,8 @@ def restart() -> None:
|
||||
"Authorization": f"Basic {user_and_pass}",
|
||||
}
|
||||
|
||||
# Define the parameters
|
||||
params = {}
|
||||
|
||||
# Make the request
|
||||
conn.request("POST", "/api/restart", params, headers)
|
||||
conn.request("POST", "/api/restart", {}, headers)
|
||||
|
||||
# Get and print the response
|
||||
# There wont be a response, because it is restarted
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import configparser
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
# address_family = both
|
||||
# channels = 5
|
||||
@@ -11,26 +12,29 @@ import os
|
||||
PSEUDO_SECTION = "DEFAULT"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
config: configparser.ConfigParser
|
||||
config_location: Path
|
||||
_instance = None
|
||||
|
||||
def __new__(cls, config_location: str | None = None) -> "Config":
|
||||
def __new__(cls, config_location: Path | None = None) -> "Config":
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance.config = configparser.ConfigParser()
|
||||
config = config_location or cls._instance.default_sunshine_config_file()
|
||||
cls._instance.config_location = config
|
||||
with open(config) as f:
|
||||
with config.open() as f:
|
||||
config_string = f"[{PSEUDO_SECTION}]\n" + f.read()
|
||||
print(config_string)
|
||||
cls._instance.config.read_string(config_string)
|
||||
return cls._instance
|
||||
|
||||
def default_sunshine_config_dir(self) -> str:
|
||||
return os.path.join(os.path.expanduser("~"), ".config", "sunshine")
|
||||
def default_sunshine_config_dir(self) -> Path:
|
||||
return Path.home() / ".config" / "sunshine"
|
||||
|
||||
def default_sunshine_config_file(self) -> str:
|
||||
return os.path.join(self.default_sunshine_config_dir(), "sunshine.conf")
|
||||
def default_sunshine_config_file(self) -> Path:
|
||||
return self.default_sunshine_config_dir() / "sunshine.conf"
|
||||
|
||||
def get_private_key(self) -> str:
|
||||
return self.config.get(PSEUDO_SECTION, "pkey")
|
||||
|
||||
@@ -50,7 +50,7 @@ def private_key_to_pem(private_key: rsa.RSAPrivateKey) -> bytes:
|
||||
return pem_private_key
|
||||
|
||||
|
||||
def init_credentials() -> (str, str):
|
||||
def init_credentials() -> tuple[bytes, bytes]:
|
||||
private_key = generate_private_key()
|
||||
certificate = generate_certificate(private_key)
|
||||
private_key_pem = private_key_to_pem(private_key)
|
||||
@@ -64,14 +64,12 @@ def uniqueid() -> str:
|
||||
def write_credentials(_args: argparse.Namespace) -> None:
|
||||
print("Writing sunshine credentials")
|
||||
pem_certificate, pem_private_key = init_credentials()
|
||||
Path("credentials").mkdir(parents=True, exist_ok=True)
|
||||
with open("credentials/cacert.pem", mode="wb") as file:
|
||||
file.write(pem_certificate)
|
||||
with open("credentials/cakey.pem", mode="wb") as file:
|
||||
file.write(pem_private_key)
|
||||
credentials_dir = Path("credentials")
|
||||
credentials_dir.mkdir(parents=True, exist_ok=True)
|
||||
(credentials_dir / "cacert.pem").write_bytes(pem_certificate)
|
||||
(credentials_dir / "cakey.pem").write_bytes(pem_private_key)
|
||||
print("Generating sunshine UUID")
|
||||
with open("uuid", mode="w") as file:
|
||||
file.write(uniqueid())
|
||||
Path("uuid").write_text(uniqueid())
|
||||
|
||||
|
||||
def register_initialization_parser(parser: argparse.ArgumentParser) -> None:
|
||||
|
||||
@@ -32,10 +32,9 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool:
|
||||
|
||||
try:
|
||||
request = data.decode("utf-8")
|
||||
body = request.split("\n")[-1]
|
||||
print(body)
|
||||
body = json.loads(f"{body}")
|
||||
print(body)
|
||||
raw_body = request.split("\n")[-1]
|
||||
print(raw_body)
|
||||
body = json.loads(raw_body)
|
||||
|
||||
pair_type = body.get("type", "")
|
||||
|
||||
@@ -43,7 +42,7 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool:
|
||||
print("Api request")
|
||||
status = pair(body.get("pin", ""))
|
||||
status = json.dumps(status)
|
||||
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\{status}\r\n"
|
||||
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\\{status}\r\n"
|
||||
client_socket.sendall(response.encode("utf-8"))
|
||||
|
||||
if pair_type == "native":
|
||||
@@ -57,12 +56,12 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool:
|
||||
encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode(
|
||||
"utf-8"
|
||||
)
|
||||
json_body = {}
|
||||
json_body["uuid"] = uuid
|
||||
json_body["cert"] = encoded_cert
|
||||
json_body["hostname"] = socket.gethostname()
|
||||
json_body = json.dumps(json_body)
|
||||
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\{json_body}\r\n"
|
||||
json_data = {}
|
||||
json_data["uuid"] = uuid
|
||||
json_data["cert"] = encoded_cert
|
||||
json_data["hostname"] = socket.gethostname()
|
||||
json_body = json.dumps(json_data)
|
||||
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\\{json_body}\r\n"
|
||||
client_socket.sendall(response.encode("utf-8"))
|
||||
# add_moonlight_client(decoded_cert, state_file, rec_uuid)
|
||||
|
||||
|
||||
@@ -1,33 +1,31 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def default_sunshine_config_dir() -> str:
|
||||
return os.path.join(os.path.expanduser("~"), ".config", "sunshine")
|
||||
def default_sunshine_config_dir() -> Path:
|
||||
return Path.home() / ".config" / "sunshine"
|
||||
|
||||
|
||||
def default_sunshine_state_file() -> str:
|
||||
return os.path.join(default_sunshine_config_dir(), "sunshine_state.json")
|
||||
def default_sunshine_state_file() -> Path:
|
||||
return default_sunshine_config_dir() / "sunshine_state.json"
|
||||
|
||||
|
||||
def load_state(sunshine_state_path: str) -> dict[str, Any] | None:
|
||||
def load_state(sunshine_state_path: Path) -> str | None:
|
||||
sunshine_state_path = sunshine_state_path or default_sunshine_state_file()
|
||||
print(f"Loading sunshine state from {sunshine_state_path}")
|
||||
try:
|
||||
with open(sunshine_state_path) as file:
|
||||
config = file.read()
|
||||
return config
|
||||
return json.loads(sunshine_state_path.read_text())
|
||||
except FileNotFoundError:
|
||||
print("Sunshine state file not found.")
|
||||
return None
|
||||
|
||||
|
||||
# this needs to be created before sunshine is first run
|
||||
def init_state(uuid: str, sunshine_state_path: str) -> None:
|
||||
def init_state(uuid: str, sunshine_state_path: Path) -> None:
|
||||
print("Initializing sunshine state.")
|
||||
|
||||
data = {}
|
||||
data: dict[str, Any] = {}
|
||||
data["root"] = {}
|
||||
data["root"]["uniqueid"] = uuid
|
||||
data["root"]["devices"] = []
|
||||
@@ -36,9 +34,9 @@ def init_state(uuid: str, sunshine_state_path: str) -> None:
|
||||
write_state(data, sunshine_state_path)
|
||||
|
||||
|
||||
def write_state(data: dict[str, Any], sunshine_state_path: str) -> None:
|
||||
def write_state(data: dict[str, Any], sunshine_state_path: Path) -> None:
|
||||
sunshine_state_path = sunshine_state_path or default_sunshine_state_file()
|
||||
with open(sunshine_state_path, "w") as file:
|
||||
with sunshine_state_path.open("w") as file:
|
||||
json.dump(data, file, indent=4)
|
||||
|
||||
|
||||
@@ -48,11 +46,13 @@ def pseudo_uuid() -> str:
|
||||
|
||||
|
||||
# TODO: finish this function
|
||||
def add_moonlight_client(certificate: str, sunshine_state_path: str, uuid: str) -> None:
|
||||
def add_moonlight_client(
|
||||
certificate: str, sunshine_state_path: Path, uuid: str
|
||||
) -> None:
|
||||
print("Adding moonlight client to sunshine state.")
|
||||
state = load_state(sunshine_state_path)
|
||||
if state:
|
||||
state = json.loads(state)
|
||||
raw_state = load_state(sunshine_state_path)
|
||||
if raw_state:
|
||||
state = json.loads(raw_state)
|
||||
|
||||
if not state["root"]["devices"]:
|
||||
state["root"]["devices"].append(
|
||||
|
||||
Reference in New Issue
Block a user