moonlight-sunshine-accept: use pathlib and fix types
This commit is contained in:
2
pkgs/moonlight-sunshine-accept/.envrc
Normal file
2
pkgs/moonlight-sunshine-accept/.envrc
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
# shellcheck shell=bash
|
||||||
|
use flake .#moonlight-sunshine-accept
|
||||||
@@ -1,6 +1,5 @@
|
|||||||
import argparse
|
import argparse
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
@@ -54,7 +53,7 @@ def private_key_to_pem(private_key: rsa.RSAPrivateKey) -> bytes:
|
|||||||
return pem_private_key
|
return pem_private_key
|
||||||
|
|
||||||
|
|
||||||
def init_credentials() -> (str, str):
|
def init_credentials() -> tuple[bytes, bytes]:
|
||||||
private_key = generate_private_key()
|
private_key = generate_private_key()
|
||||||
certificate = generate_certificate(private_key)
|
certificate = generate_certificate(private_key)
|
||||||
private_key_pem = private_key_to_pem(private_key)
|
private_key_pem = private_key_to_pem(private_key)
|
||||||
@@ -63,15 +62,11 @@ def init_credentials() -> (str, str):
|
|||||||
|
|
||||||
def write_credentials(_args: argparse.Namespace) -> None:
|
def write_credentials(_args: argparse.Namespace) -> None:
|
||||||
pem_certificate, pem_private_key = init_credentials()
|
pem_certificate, pem_private_key = init_credentials()
|
||||||
credentials_path = os.getcwd() + "credentials"
|
credentials_path = Path.cwd() / "credentials"
|
||||||
Path(credentials_path).mkdir(parents=True, exist_ok=True)
|
Path(credentials_path).mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
cacaert_path = os.path.join(credentials_path, "cacert.pem")
|
(credentials_path / "cacert.pem").write_bytes(pem_certificate)
|
||||||
with open(cacaert_path, mode="wb") as file:
|
(credentials_path / "cakey.pem").write_bytes(pem_private_key)
|
||||||
file.write(pem_certificate)
|
|
||||||
cakey_path = os.path.join(credentials_path, "cakey.pem")
|
|
||||||
with open(cakey_path, mode="wb") as file:
|
|
||||||
file.write(pem_private_key)
|
|
||||||
print("Finished writing moonlight credentials")
|
print("Finished writing moonlight credentials")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -33,8 +33,7 @@ def send_join_request_api(host: str, port: int) -> bool:
|
|||||||
|
|
||||||
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
|
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
|
||||||
s.connect((host, port))
|
s.connect((host, port))
|
||||||
json_body = {"type": "api", "pin": pin}
|
json_body = json.dumps({"type": "api", "pin": pin})
|
||||||
json_body = json.dumps(json_body)
|
|
||||||
request = (
|
request = (
|
||||||
f"POST / HTTP/1.1\r\n"
|
f"POST / HTTP/1.1\r\n"
|
||||||
f"Content-Type: application/json\r\n"
|
f"Content-Type: application/json\r\n"
|
||||||
@@ -62,14 +61,15 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
|||||||
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
|
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
|
||||||
s.connect((host, port))
|
s.connect((host, port))
|
||||||
encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode("utf-8")
|
encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode("utf-8")
|
||||||
json_body = {"type": "native", "uuid": uuid, "cert": encoded_cert}
|
json_body_str = json.dumps(
|
||||||
json_body = json.dumps(json_body)
|
{"type": "native", "uuid": uuid, "cert": encoded_cert}
|
||||||
|
)
|
||||||
request = (
|
request = (
|
||||||
f"POST / HTTP/1.1\r\n"
|
f"POST / HTTP/1.1\r\n"
|
||||||
f"Content-Type: application/json\r\n"
|
f"Content-Type: application/json\r\n"
|
||||||
f"Content-Length: {len(json_body)}\r\n"
|
f"Content-Length: {len(json_body_str)}\r\n"
|
||||||
f"Connection: close\r\n\r\n"
|
f"Connection: close\r\n\r\n"
|
||||||
f"{json_body}"
|
f"{json_body_str}"
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
s.sendall(request.encode("utf-8"))
|
s.sendall(request.encode("utf-8"))
|
||||||
@@ -78,7 +78,7 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
|||||||
lines = response.split("\n")
|
lines = response.split("\n")
|
||||||
body = "\n".join(lines[2:])[2:]
|
body = "\n".join(lines[2:])[2:]
|
||||||
print(body)
|
print(body)
|
||||||
return body
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"An error occurred: {e}")
|
print(f"An error occurred: {e}")
|
||||||
# TODO: fix
|
# TODO: fix
|
||||||
@@ -98,6 +98,7 @@ def send_join_request_native(host: str, port: int, cert: str) -> bool:
|
|||||||
print(f"Failed to decode JSON: {e}")
|
print(f"Failed to decode JSON: {e}")
|
||||||
pos = e.pos
|
pos = e.pos
|
||||||
print(f"Failed to decode JSON: unexpected character {response[pos]}")
|
print(f"Failed to decode JSON: unexpected character {response[pos]}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def join(args: argparse.Namespace) -> None:
|
def join(args: argparse.Namespace) -> None:
|
||||||
@@ -113,6 +114,7 @@ def join(args: argparse.Namespace) -> None:
|
|||||||
# TODO: If cert is not provided parse from config
|
# TODO: If cert is not provided parse from config
|
||||||
# cert = args.cert
|
# cert = args.cert
|
||||||
cert = get_moonlight_certificate()
|
cert = get_moonlight_certificate()
|
||||||
|
assert port is not None
|
||||||
if send_join_request(host, port, cert):
|
if send_join_request(host, port, cert):
|
||||||
print(f"Successfully joined sunshine host: {host}")
|
print(f"Successfully joined sunshine host: {host}")
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ import threading
|
|||||||
|
|
||||||
|
|
||||||
class MoonlightPairing:
|
class MoonlightPairing:
|
||||||
def __init__(self) -> "MoonlightPairing":
|
def __init__(self) -> None:
|
||||||
self.process = None
|
self.process: subprocess.Popen | None = None
|
||||||
self.output = ""
|
self.output = ""
|
||||||
self.found = threading.Event()
|
self.found = threading.Event()
|
||||||
|
|
||||||
@@ -45,6 +45,8 @@ class MoonlightPairing:
|
|||||||
self.process.wait()
|
self.process.wait()
|
||||||
|
|
||||||
def stream_output(self, target_string: str) -> None:
|
def stream_output(self, target_string: str) -> None:
|
||||||
|
assert self.process is not None
|
||||||
|
assert self.process.stdout is not None
|
||||||
for line in iter(self.process.stdout.readline, b""):
|
for line in iter(self.process.stdout.readline, b""):
|
||||||
line = line.decode()
|
line = line.decode()
|
||||||
self.output += line
|
self.output += line
|
||||||
|
|||||||
@@ -3,21 +3,20 @@ import os
|
|||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
from configparser import ConfigParser, DuplicateSectionError, NoOptionError
|
from configparser import ConfigParser, DuplicateSectionError, NoOptionError
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
def moonlight_config_dir() -> str:
|
def moonlight_config_dir() -> Path:
|
||||||
return os.path.join(
|
return Path.home() / ".config" / "Moonlight Game Streaming Project"
|
||||||
os.path.expanduser("~"), ".config", "Moonlight Game Streaming Project"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def moonlight_state_file() -> str:
|
def moonlight_state_file() -> Path:
|
||||||
return os.path.join(moonlight_config_dir(), "Moonlight.conf")
|
return moonlight_config_dir() / "Moonlight.conf"
|
||||||
|
|
||||||
|
|
||||||
def load_state() -> ConfigParser | None:
|
def load_state() -> ConfigParser | None:
|
||||||
try:
|
try:
|
||||||
with open(moonlight_state_file()) as file:
|
with moonlight_state_file().open() as file:
|
||||||
config = ConfigParser()
|
config = ConfigParser()
|
||||||
config.read_file(file)
|
config.read_file(file)
|
||||||
print(config.sections())
|
print(config.sections())
|
||||||
@@ -65,8 +64,8 @@ def init_state(certificate: str, key: str) -> None:
|
|||||||
config.write(file)
|
config.write(file)
|
||||||
|
|
||||||
|
|
||||||
def write_state(data: ConfigParser) -> bool:
|
def write_state(data: ConfigParser) -> None:
|
||||||
with open(moonlight_state_file(), "w") as file:
|
with moonlight_state_file().open("w") as file:
|
||||||
data.write(file)
|
data.write(file)
|
||||||
|
|
||||||
|
|
||||||
@@ -84,22 +83,24 @@ def add_sunshine_host_to_parser(
|
|||||||
|
|
||||||
new_host = no_hosts + 1
|
new_host = no_hosts + 1
|
||||||
|
|
||||||
config.set("hosts", f"{new_host}\srvcert", convert_string_to_bytearray(certificate))
|
config.set(
|
||||||
|
"hosts", f"{new_host}\\srvcert", convert_string_to_bytearray(certificate)
|
||||||
|
)
|
||||||
config.set("hosts", "size", str(new_host))
|
config.set("hosts", "size", str(new_host))
|
||||||
config.set("hosts", f"{new_host}\\uuid", uuid)
|
config.set("hosts", f"{new_host}\\uuid", uuid)
|
||||||
config.set("hosts", f"{new_host}\hostname", hostname)
|
config.set("hosts", f"{new_host}\\hostname", hostname)
|
||||||
config.set("hosts", f"{new_host}\\nvidiasv", "false")
|
config.set("hosts", f"{new_host}\\nvidiasv", "false")
|
||||||
config.set("hosts", f"{new_host}\customname", "false")
|
config.set("hosts", f"{new_host}\\customname", "false")
|
||||||
config.set("hosts", f"{new_host}\manualaddress", manual_host)
|
config.set("hosts", f"{new_host}\\manualaddress", manual_host)
|
||||||
config.set("hosts", f"{new_host}\manualport", "47989")
|
config.set("hosts", f"{new_host}\\manualport", "47989")
|
||||||
config.set("hosts", f"{new_host}\\remoteport", "0")
|
config.set("hosts", f"{new_host}\\remoteport", "0")
|
||||||
config.set("hosts", f"{new_host}\\remoteaddress", "")
|
config.set("hosts", f"{new_host}\\remoteaddress", "")
|
||||||
config.set("hosts", f"{new_host}\localaddress", "")
|
config.set("hosts", f"{new_host}\\localaddress", "")
|
||||||
config.set("hosts", f"{new_host}\localport", "0")
|
config.set("hosts", f"{new_host}\\localport", "0")
|
||||||
config.set("hosts", f"{new_host}\ipv6port", "0")
|
config.set("hosts", f"{new_host}\\ipv6port", "0")
|
||||||
config.set("hosts", f"{new_host}\ipv6address", "")
|
config.set("hosts", f"{new_host}\\ipv6address", "")
|
||||||
config.set(
|
config.set(
|
||||||
"hosts", f"{new_host}\mac", convert_string_to_bytearray("\\xceop\\x8d\\xfc{")
|
"hosts", f"{new_host}\\mac", convert_string_to_bytearray("\\xceop\\x8d\\xfc{")
|
||||||
)
|
)
|
||||||
add_app(config, "Desktop", new_host, 1, 881448767)
|
add_app(config, "Desktop", new_host, 1, 881448767)
|
||||||
add_app(config, "Low Res Desktop", new_host, 2, 303580669)
|
add_app(config, "Low Res Desktop", new_host, 2, 303580669)
|
||||||
@@ -114,7 +115,7 @@ def add_sunshine_host_to_parser(
|
|||||||
def add_app(
|
def add_app(
|
||||||
config: ConfigParser, name: str, host_id: int, app_id: int, app_no: int
|
config: ConfigParser, name: str, host_id: int, app_id: int, app_no: int
|
||||||
) -> None:
|
) -> None:
|
||||||
identifier = f"{host_id}\\apps\{app_id}\\"
|
identifier = f"{host_id}\\apps\\{app_id}\\"
|
||||||
config.set("hosts", f"{identifier}appcollector", "false")
|
config.set("hosts", f"{identifier}appcollector", "false")
|
||||||
config.set("hosts", f"{identifier}directlaunch", "false")
|
config.set("hosts", f"{identifier}directlaunch", "false")
|
||||||
config.set("hosts", f"{identifier}hdr", "false")
|
config.set("hosts", f"{identifier}hdr", "false")
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
|
||||||
def parse_moonlight_uri(uri: str) -> (str, str):
|
def parse_moonlight_uri(uri: str) -> tuple[str, int | None]:
|
||||||
print(uri)
|
print(uri)
|
||||||
if uri.startswith("moonlight:"):
|
if uri.startswith("moonlight:"):
|
||||||
# Fixes a bug where moonlight:// is not parsed correctly
|
# Fixes a bug where moonlight:// is not parsed correctly
|
||||||
@@ -13,5 +13,8 @@ def parse_moonlight_uri(uri: str) -> (str, str):
|
|||||||
msg = f"Invalid moonlight URI: {uri}"
|
msg = f"Invalid moonlight URI: {uri}"
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
hostname = parsed.hostname
|
hostname = parsed.hostname
|
||||||
|
if hostname is None:
|
||||||
|
msg = f"Invalid moonlight URI: {uri}"
|
||||||
|
raise ValueError
|
||||||
port = parsed.port
|
port = parsed.port
|
||||||
return (hostname, port)
|
return (hostname, port)
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import http.client
|
|||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
def get_context() -> http.client.ssl.SSLContext:
|
def get_context() -> http.client.ssl.SSLContext: # type: ignore
|
||||||
# context = http.client.ssl.create_default_context()
|
# context = http.client.ssl.create_default_context()
|
||||||
# # context.load_cert_chain(
|
# # context.load_cert_chain(
|
||||||
# # certfile="/var/lib/sunshine/sunshine.cert", keyfile="/var/lib/sunshine/sunshine.key"
|
# # certfile="/var/lib/sunshine/sunshine.cert", keyfile="/var/lib/sunshine/sunshine.key"
|
||||||
@@ -12,7 +12,7 @@ def get_context() -> http.client.ssl.SSLContext:
|
|||||||
# certfile="/home/kenji/.config/sunshine/credentials/cacert.pem",
|
# certfile="/home/kenji/.config/sunshine/credentials/cacert.pem",
|
||||||
# keyfile="/home/kenji/.config/sunshine/credentials/cakey.pem",
|
# keyfile="/home/kenji/.config/sunshine/credentials/cakey.pem",
|
||||||
# )
|
# )
|
||||||
return http.client.ssl._create_unverified_context() # noqa: SLF001
|
return http.client.ssl._create_unverified_context() # type: ignore # noqa: SLF001
|
||||||
|
|
||||||
|
|
||||||
def pair(pin: str) -> str:
|
def pair(pin: str) -> str:
|
||||||
@@ -44,7 +44,7 @@ def restart() -> None:
|
|||||||
conn = http.client.HTTPSConnection(
|
conn = http.client.HTTPSConnection(
|
||||||
"localhost",
|
"localhost",
|
||||||
47990,
|
47990,
|
||||||
context=http.client.ssl._create_unverified_context(), # noqa: SLF001
|
context=http.client.ssl._create_unverified_context(), # type: ignore # noqa: SLF001
|
||||||
)
|
)
|
||||||
user_and_pass = base64.b64encode(b"sunshine:sunshine").decode("ascii")
|
user_and_pass = base64.b64encode(b"sunshine:sunshine").decode("ascii")
|
||||||
headers = {
|
headers = {
|
||||||
@@ -52,11 +52,8 @@ def restart() -> None:
|
|||||||
"Authorization": f"Basic {user_and_pass}",
|
"Authorization": f"Basic {user_and_pass}",
|
||||||
}
|
}
|
||||||
|
|
||||||
# Define the parameters
|
|
||||||
params = {}
|
|
||||||
|
|
||||||
# Make the request
|
# Make the request
|
||||||
conn.request("POST", "/api/restart", params, headers)
|
conn.request("POST", "/api/restart", {}, headers)
|
||||||
|
|
||||||
# Get and print the response
|
# Get and print the response
|
||||||
# There wont be a response, because it is restarted
|
# There wont be a response, because it is restarted
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import configparser
|
import configparser
|
||||||
import os
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
# address_family = both
|
# address_family = both
|
||||||
# channels = 5
|
# channels = 5
|
||||||
@@ -11,26 +12,29 @@ import os
|
|||||||
PSEUDO_SECTION = "DEFAULT"
|
PSEUDO_SECTION = "DEFAULT"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
class Config:
|
class Config:
|
||||||
|
config: configparser.ConfigParser
|
||||||
|
config_location: Path
|
||||||
_instance = None
|
_instance = None
|
||||||
|
|
||||||
def __new__(cls, config_location: str | None = None) -> "Config":
|
def __new__(cls, config_location: Path | None = None) -> "Config":
|
||||||
if cls._instance is None:
|
if cls._instance is None:
|
||||||
cls._instance = super().__new__(cls)
|
cls._instance = super().__new__(cls)
|
||||||
cls._instance.config = configparser.ConfigParser()
|
cls._instance.config = configparser.ConfigParser()
|
||||||
config = config_location or cls._instance.default_sunshine_config_file()
|
config = config_location or cls._instance.default_sunshine_config_file()
|
||||||
cls._instance.config_location = config
|
cls._instance.config_location = config
|
||||||
with open(config) as f:
|
with config.open() as f:
|
||||||
config_string = f"[{PSEUDO_SECTION}]\n" + f.read()
|
config_string = f"[{PSEUDO_SECTION}]\n" + f.read()
|
||||||
print(config_string)
|
print(config_string)
|
||||||
cls._instance.config.read_string(config_string)
|
cls._instance.config.read_string(config_string)
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
def default_sunshine_config_dir(self) -> str:
|
def default_sunshine_config_dir(self) -> Path:
|
||||||
return os.path.join(os.path.expanduser("~"), ".config", "sunshine")
|
return Path.home() / ".config" / "sunshine"
|
||||||
|
|
||||||
def default_sunshine_config_file(self) -> str:
|
def default_sunshine_config_file(self) -> Path:
|
||||||
return os.path.join(self.default_sunshine_config_dir(), "sunshine.conf")
|
return self.default_sunshine_config_dir() / "sunshine.conf"
|
||||||
|
|
||||||
def get_private_key(self) -> str:
|
def get_private_key(self) -> str:
|
||||||
return self.config.get(PSEUDO_SECTION, "pkey")
|
return self.config.get(PSEUDO_SECTION, "pkey")
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ def private_key_to_pem(private_key: rsa.RSAPrivateKey) -> bytes:
|
|||||||
return pem_private_key
|
return pem_private_key
|
||||||
|
|
||||||
|
|
||||||
def init_credentials() -> (str, str):
|
def init_credentials() -> tuple[bytes, bytes]:
|
||||||
private_key = generate_private_key()
|
private_key = generate_private_key()
|
||||||
certificate = generate_certificate(private_key)
|
certificate = generate_certificate(private_key)
|
||||||
private_key_pem = private_key_to_pem(private_key)
|
private_key_pem = private_key_to_pem(private_key)
|
||||||
@@ -64,14 +64,12 @@ def uniqueid() -> str:
|
|||||||
def write_credentials(_args: argparse.Namespace) -> None:
|
def write_credentials(_args: argparse.Namespace) -> None:
|
||||||
print("Writing sunshine credentials")
|
print("Writing sunshine credentials")
|
||||||
pem_certificate, pem_private_key = init_credentials()
|
pem_certificate, pem_private_key = init_credentials()
|
||||||
Path("credentials").mkdir(parents=True, exist_ok=True)
|
credentials_dir = Path("credentials")
|
||||||
with open("credentials/cacert.pem", mode="wb") as file:
|
credentials_dir.mkdir(parents=True, exist_ok=True)
|
||||||
file.write(pem_certificate)
|
(credentials_dir / "cacert.pem").write_bytes(pem_certificate)
|
||||||
with open("credentials/cakey.pem", mode="wb") as file:
|
(credentials_dir / "cakey.pem").write_bytes(pem_private_key)
|
||||||
file.write(pem_private_key)
|
|
||||||
print("Generating sunshine UUID")
|
print("Generating sunshine UUID")
|
||||||
with open("uuid", mode="w") as file:
|
Path("uuid").write_text(uniqueid())
|
||||||
file.write(uniqueid())
|
|
||||||
|
|
||||||
|
|
||||||
def register_initialization_parser(parser: argparse.ArgumentParser) -> None:
|
def register_initialization_parser(parser: argparse.ArgumentParser) -> None:
|
||||||
|
|||||||
@@ -32,10 +32,9 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
request = data.decode("utf-8")
|
request = data.decode("utf-8")
|
||||||
body = request.split("\n")[-1]
|
raw_body = request.split("\n")[-1]
|
||||||
print(body)
|
print(raw_body)
|
||||||
body = json.loads(f"{body}")
|
body = json.loads(raw_body)
|
||||||
print(body)
|
|
||||||
|
|
||||||
pair_type = body.get("type", "")
|
pair_type = body.get("type", "")
|
||||||
|
|
||||||
@@ -43,7 +42,7 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool:
|
|||||||
print("Api request")
|
print("Api request")
|
||||||
status = pair(body.get("pin", ""))
|
status = pair(body.get("pin", ""))
|
||||||
status = json.dumps(status)
|
status = json.dumps(status)
|
||||||
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\{status}\r\n"
|
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\\{status}\r\n"
|
||||||
client_socket.sendall(response.encode("utf-8"))
|
client_socket.sendall(response.encode("utf-8"))
|
||||||
|
|
||||||
if pair_type == "native":
|
if pair_type == "native":
|
||||||
@@ -57,12 +56,12 @@ def listen(port: int, cert: str, uuid: str, state_file: str) -> bool:
|
|||||||
encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode(
|
encoded_cert = base64.urlsafe_b64encode(cert.encode("utf-8")).decode(
|
||||||
"utf-8"
|
"utf-8"
|
||||||
)
|
)
|
||||||
json_body = {}
|
json_data = {}
|
||||||
json_body["uuid"] = uuid
|
json_data["uuid"] = uuid
|
||||||
json_body["cert"] = encoded_cert
|
json_data["cert"] = encoded_cert
|
||||||
json_body["hostname"] = socket.gethostname()
|
json_data["hostname"] = socket.gethostname()
|
||||||
json_body = json.dumps(json_body)
|
json_body = json.dumps(json_data)
|
||||||
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\{json_body}\r\n"
|
response = f"HTTP/1.1 200 OK\r\nContent-Type:application/json\r\n\r\\{json_body}\r\n"
|
||||||
client_socket.sendall(response.encode("utf-8"))
|
client_socket.sendall(response.encode("utf-8"))
|
||||||
# add_moonlight_client(decoded_cert, state_file, rec_uuid)
|
# add_moonlight_client(decoded_cert, state_file, rec_uuid)
|
||||||
|
|
||||||
|
|||||||
@@ -1,33 +1,31 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
def default_sunshine_config_dir() -> str:
|
def default_sunshine_config_dir() -> Path:
|
||||||
return os.path.join(os.path.expanduser("~"), ".config", "sunshine")
|
return Path.home() / ".config" / "sunshine"
|
||||||
|
|
||||||
|
|
||||||
def default_sunshine_state_file() -> str:
|
def default_sunshine_state_file() -> Path:
|
||||||
return os.path.join(default_sunshine_config_dir(), "sunshine_state.json")
|
return default_sunshine_config_dir() / "sunshine_state.json"
|
||||||
|
|
||||||
|
|
||||||
def load_state(sunshine_state_path: str) -> dict[str, Any] | None:
|
def load_state(sunshine_state_path: Path) -> str | None:
|
||||||
sunshine_state_path = sunshine_state_path or default_sunshine_state_file()
|
sunshine_state_path = sunshine_state_path or default_sunshine_state_file()
|
||||||
print(f"Loading sunshine state from {sunshine_state_path}")
|
print(f"Loading sunshine state from {sunshine_state_path}")
|
||||||
try:
|
try:
|
||||||
with open(sunshine_state_path) as file:
|
return json.loads(sunshine_state_path.read_text())
|
||||||
config = file.read()
|
|
||||||
return config
|
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
print("Sunshine state file not found.")
|
print("Sunshine state file not found.")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
# this needs to be created before sunshine is first run
|
# this needs to be created before sunshine is first run
|
||||||
def init_state(uuid: str, sunshine_state_path: str) -> None:
|
def init_state(uuid: str, sunshine_state_path: Path) -> None:
|
||||||
print("Initializing sunshine state.")
|
print("Initializing sunshine state.")
|
||||||
|
|
||||||
data = {}
|
data: dict[str, Any] = {}
|
||||||
data["root"] = {}
|
data["root"] = {}
|
||||||
data["root"]["uniqueid"] = uuid
|
data["root"]["uniqueid"] = uuid
|
||||||
data["root"]["devices"] = []
|
data["root"]["devices"] = []
|
||||||
@@ -36,9 +34,9 @@ def init_state(uuid: str, sunshine_state_path: str) -> None:
|
|||||||
write_state(data, sunshine_state_path)
|
write_state(data, sunshine_state_path)
|
||||||
|
|
||||||
|
|
||||||
def write_state(data: dict[str, Any], sunshine_state_path: str) -> None:
|
def write_state(data: dict[str, Any], sunshine_state_path: Path) -> None:
|
||||||
sunshine_state_path = sunshine_state_path or default_sunshine_state_file()
|
sunshine_state_path = sunshine_state_path or default_sunshine_state_file()
|
||||||
with open(sunshine_state_path, "w") as file:
|
with sunshine_state_path.open("w") as file:
|
||||||
json.dump(data, file, indent=4)
|
json.dump(data, file, indent=4)
|
||||||
|
|
||||||
|
|
||||||
@@ -48,11 +46,13 @@ def pseudo_uuid() -> str:
|
|||||||
|
|
||||||
|
|
||||||
# TODO: finish this function
|
# TODO: finish this function
|
||||||
def add_moonlight_client(certificate: str, sunshine_state_path: str, uuid: str) -> None:
|
def add_moonlight_client(
|
||||||
|
certificate: str, sunshine_state_path: Path, uuid: str
|
||||||
|
) -> None:
|
||||||
print("Adding moonlight client to sunshine state.")
|
print("Adding moonlight client to sunshine state.")
|
||||||
state = load_state(sunshine_state_path)
|
raw_state = load_state(sunshine_state_path)
|
||||||
if state:
|
if raw_state:
|
||||||
state = json.loads(state)
|
state = json.loads(raw_state)
|
||||||
|
|
||||||
if not state["root"]["devices"]:
|
if not state["root"]["devices"]:
|
||||||
state["root"]["devices"].append(
|
state["root"]["devices"].append(
|
||||||
|
|||||||
Reference in New Issue
Block a user