revert Merge pull request 'Remove clanModules/*' (#4202) from remove-modules into main Reviewed-on: https://git.clan.lol/clan/clan-core/pulls/4202 See: https://git.clan.lol/clan/clan-core/issues/4365 Not all modules are migrated. If they are not migrated, we need to write migration docs and please display the link to the migration docs
248 lines
6.7 KiB
Python
248 lines
6.7 KiB
Python
import argparse
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
|
|
def ensure_config(path: Path, db_path: Path) -> None:
|
|
# Default JSON structure if the file doesn't exist
|
|
default_json = {
|
|
"misc": {
|
|
"audio_wizard_has_been_shown": True,
|
|
"database_location": str(db_path),
|
|
"viewed_server_ping_consent_message": True,
|
|
},
|
|
"settings_version": 1,
|
|
}
|
|
|
|
# Check if the file exists
|
|
if path.exists():
|
|
data = json.loads(path.read_text())
|
|
else:
|
|
data = default_json
|
|
# Create the file with default JSON structure
|
|
with path.open("w") as file:
|
|
json.dump(data, file, indent=4)
|
|
|
|
# TODO: make sure to only update the diff
|
|
updated_data = {**default_json, **data}
|
|
|
|
# Write the modified JSON object back to the file
|
|
with path.open("w") as file:
|
|
json.dump(updated_data, file, indent=4)
|
|
|
|
|
|
def initialize_database(db_location: str) -> None:
|
|
"""
|
|
Initializes the database. If the database or the servers table does not exist, it creates them.
|
|
|
|
:param db_location: The path to the SQLite database
|
|
"""
|
|
conn = sqlite3.connect(db_location)
|
|
try:
|
|
cursor = conn.cursor()
|
|
|
|
# Create the servers table if it doesn't exist
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS servers (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
hostname TEXT NOT NULL,
|
|
port INTEGER NOT NULL,
|
|
username TEXT NOT NULL,
|
|
password TEXT NOT NULL,
|
|
url TEXT
|
|
)
|
|
""")
|
|
|
|
# Commit the changes
|
|
conn.commit()
|
|
|
|
except sqlite3.Error as e:
|
|
print(f"An error occurred while initializing the database: {e}")
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def initialize_certificates(
|
|
db_location: str, hostname: str, port: str, digest: str
|
|
) -> None:
|
|
# Connect to the SQLite database
|
|
conn = sqlite3.connect(db_location)
|
|
|
|
try:
|
|
# Create a cursor object
|
|
cursor = conn.cursor()
|
|
|
|
# TODO: check if cert already there
|
|
# if server_check(cursor, name, hostname):
|
|
# print(
|
|
# f"Server with name '{name}' and hostname '{hostname}' already exists."
|
|
# )
|
|
# return
|
|
|
|
# SQL command to insert data into the servers table
|
|
insert_query = """
|
|
INSERT INTO cert (hostname, port, digest)
|
|
VALUES (?, ?, ?)
|
|
"""
|
|
|
|
# Data to be inserted
|
|
data = (hostname, port, digest)
|
|
|
|
# Execute the insert command with the provided data
|
|
cursor.execute(insert_query, data)
|
|
|
|
# Commit the changes
|
|
conn.commit()
|
|
|
|
print("Data has been successfully inserted.")
|
|
except sqlite3.Error as e:
|
|
print(f"An error occurred: {e}")
|
|
finally:
|
|
# Close the connection
|
|
conn.close()
|
|
|
|
|
|
def calculate_digest(cert: str) -> str:
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
|
|
cert = cert.strip()
|
|
cert = cert.encode("utf-8")
|
|
cert = x509.load_pem_x509_certificate(cert, default_backend())
|
|
digest = cert.fingerprint(hashes.SHA1()).hex()
|
|
return digest
|
|
|
|
|
|
def server_check(cursor: str, name: str, hostname: str) -> bool:
|
|
"""
|
|
Check if a server with the given name and hostname already exists.
|
|
|
|
:param cursor: The database cursor
|
|
:param name: The name of the server
|
|
:param hostname: The hostname of the server
|
|
:return: True if the server exists, False otherwise
|
|
"""
|
|
check_query = """
|
|
SELECT 1 FROM servers WHERE name = ? AND hostname = ?
|
|
"""
|
|
cursor.execute(check_query, (name, hostname))
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def insert_server(
|
|
name: str,
|
|
hostname: str,
|
|
port: str,
|
|
username: str,
|
|
password: str,
|
|
url: str,
|
|
db_location: str,
|
|
) -> None:
|
|
"""
|
|
Inserts a new server record into the servers table.
|
|
|
|
:param name: The name of the server
|
|
:param hostname: The hostname of the server
|
|
:param port: The port number
|
|
:param username: The username
|
|
:param password: The password
|
|
:param url: The URL
|
|
"""
|
|
# Connect to the SQLite database
|
|
conn = sqlite3.connect(db_location)
|
|
|
|
try:
|
|
# Create a cursor object
|
|
cursor = conn.cursor()
|
|
|
|
if server_check(cursor, name, hostname):
|
|
print(
|
|
f"Server with name '{name}' and hostname '{hostname}' already exists."
|
|
)
|
|
return
|
|
|
|
# SQL command to insert data into the servers table
|
|
insert_query = """
|
|
INSERT INTO servers (name, hostname, port, username, password, url)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
"""
|
|
|
|
# Data to be inserted
|
|
data = (name, hostname, port, username, password, url)
|
|
|
|
# Execute the insert command with the provided data
|
|
cursor.execute(insert_query, data)
|
|
|
|
# Commit the changes
|
|
conn.commit()
|
|
|
|
print("Data has been successfully inserted.")
|
|
except sqlite3.Error as e:
|
|
print(f"An error occurred: {e}")
|
|
finally:
|
|
# Close the connection
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
port = 64738
|
|
password = ""
|
|
url = None
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog="initialize_mumble",
|
|
)
|
|
|
|
subparser = parser.add_subparsers(dest="certificates")
|
|
# cert_parser = subparser.add_parser("certificates")
|
|
|
|
parser.add_argument("--cert")
|
|
parser.add_argument("--digest")
|
|
parser.add_argument("--machines")
|
|
parser.add_argument("--servers")
|
|
parser.add_argument("--username")
|
|
parser.add_argument("--db-location")
|
|
parser.add_argument("--ensure-config", type=Path)
|
|
args = parser.parse_args()
|
|
|
|
print(args)
|
|
|
|
if args.ensure_config:
|
|
ensure_config(args.ensure_config, args.db_location)
|
|
print("Initialized config")
|
|
exit(0)
|
|
|
|
if args.servers:
|
|
print(args.servers)
|
|
servers = json.loads(f"{args.servers}")
|
|
db_location = args.db_location
|
|
for server in servers:
|
|
digest = calculate_digest(server.get("value"))
|
|
name = server.get("name")
|
|
initialize_certificates(db_location, name, port, digest)
|
|
print("Initialized certificates")
|
|
exit(0)
|
|
|
|
initialize_database(args.db_location)
|
|
|
|
# Insert the server into the database
|
|
print(args.machines)
|
|
machines = json.loads(f"{args.machines}")
|
|
print(machines)
|
|
print(list(machines))
|
|
|
|
for machine in list(machines):
|
|
print(f"Inserting {machine}.")
|
|
insert_server(
|
|
machine,
|
|
machine,
|
|
port,
|
|
args.username,
|
|
password,
|
|
url,
|
|
args.db_location,
|
|
)
|