commit 3ea838bdbbb14f1ebee4278c9ff19140e431d995 Author: mid-kid Date: Fri Oct 25 23:40:32 2024 +0200 Add prototype authentication server diff --git a/auth/server.py b/auth/server.py new file mode 100755 index 0000000..9918938 --- /dev/null +++ b/auth/server.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python3 +from http.server import * +from http import HTTPStatus +from urllib.parse import urlparse, parse_qs +from base64 import b64encode +from os.path import getmtime +import json + +server_domain = "127.0.0.1" +server_url = "http://%s:25564" % server_domain + +userdata = { + "9a10d47934294527b3a0a32ab41d207e": { + "name": "Sneppe" + }, + "4ad1f41992193cb986741243f14d81e8": { + "name": "mid-kid" + } +} +connections = {} + +class HTTPRequestHandler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def send_ok(self, data): + self.send_response(HTTPStatus.OK) + self.send_header("Content-Length", len(data)) + self.end_headers() + self.wfile.write(data) + + def do_GET(self): + if self.headers.get("Host") == "s3.amazonaws.com": + endpoints = { + "/MinecraftResources/": self._404 + } + elif self.headers.get("Host") == "www.minecraft.net": + endpoints = { + "/game/joinserver.jsp?": self.lgy_joinserver, + "/game/checkserver.jsp?": self.lgy_checkserver, + "/skin/": self.serve_skin + } + elif self.headers.get("Host") == "session.minecraft.net": + endpoints = { + "/game/joinserver.jsp?": self.lgy_joinserver, + "/game/checkserver.jsp?": self.lgy_checkserver + } + elif self.headers.get("Host") == "skins.minecraft.net": + endpoints = { + "/MinecraftSkins/": self.serve_skin, + "/MinecraftCloaks/": self.serve_cape + } + else: + endpoints = { + "/": self.root, + "/skin/": self.serve_skin, + "/cape/": self.serve_cape, + "/ygg": self.ygg, + "/ygg/sessionserver/session/minecraft/hasJoined?": self.ygg_checkserver, + "/ygg/sessionserver/session/minecraft/profile/": self.ygg_profile + } + + for url, func in endpoints.items(): + if url.endswith("?") and self.path.startswith(url): + params = parse_qs(self.path[len(url):]) + return func(params) + if url.endswith("/") and self.path.startswith(url) and url != "/": + parse = urlparse(self.path[len(url):]) + path = parse.path + params = parse_qs(parse.query) + return func(path, params) + if url == self.path: + return func() + + body = "".encode() + self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) + self.log_message("self.command: %s", self.command) + self.log_message("self.path: %s", self.path) + self.log_message("body: %s", body) + self._404() + + def do_POST(self): + body = self.rfile.read(int(self.headers.get("Content-Length"))) + + endpoints = { + "/ygg/sessionserver/session/minecraft/join": self.ygg_joinserver + } + + for url, func in endpoints.items(): + if url == self.path: + return func(body) + + self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) + self.log_message("self.command: %s", self.command) + self.log_message("self.path: %s", self.path) + self.log_message("body: %s", body) + self._404() + + def _404(self, *args): + self.send_response(HTTPStatus.NOT_FOUND) + self.end_headers() + + def open_data(self, path, file): + return open(path + "/" + file.replace("/", "_"), "rb") + + def serve_data(self, path, file): + data = None + try: + with self.open_data(path, file) as f: + data = f.read() + except FileNotFoundError: + self.send_response(HTTPStatus.NOT_FOUND) + self.end_headers() + else: + self.send_ok(data) + + def serve_skin(self, path, params): + return self.serve_data("skin", path) + + def serve_cape(self, path, params): + if path.endswith(".cape.png"): + path = path[:-9] + ".png" + return self.serve_data("cape", path) + + def check_data(self, path, file): + try: + return int(getmtime(path + "/" + file.replace("/", "_"))) + except FileNotFoundError: + return None + + def check_skin(self, name): + return self.check_data("skin", name + ".png") + + def check_cape(self, name): + return self.check_data("cape", name + ".png") + + def lgy_joinserver(self, params): + user = params["user"][0] + assert params["sessionId"][0].startswith("token:") + token, profileId = params["sessionId"][0].split(":")[1:] + serverId = params["serverId"][0] + + connections[serverId] = profileId + + if True: + self.send_ok(b"OK") + else: + # Displayed directly to the user + self.send_ok(b"Bad login") + + def ygg_joinserver(self, body): + data = json.loads(body.decode()) + token = data["accessToken"] + profileId = data["selectedProfile"] + serverId = data["serverId"] + + connections[serverId] = profileId + + if True: + self.send_response(HTTPStatus.NO_CONTENT) + self.end_headers() + else: + resp = b'{"error":"ForbiddenOperationException"}' + self.send_response(HTTPStatus.FORBIDDEN) + self.send_header("Content-Length", len(resp)) + self.end_headers() + self.wfile.write(resp) + + def lgy_checkserver(self, params): + user = params["user"][0] + serverId = params["serverId"][0] + + if not serverId in connections: + self.send_ok(b"NO") + + self.send_ok(b"YES") + + def ygg_checkserver(self, params): + serverId = params["serverId"][0] + user = params["username"][0] + + if not serverId in connections: + self.send_response(HTTPStatus.NO_CONTENT) + self.end_headers() + + profileId = connections[serverId] + + data = self.get_profile(profileId) + if data is None: + data = { + "id": profileId, + } + + self.send_ok(json.dumps(data).encode()) + + def get_profile(self, profileId): + if profileId not in userdata: + return None + + user = userdata[profileId] + + properties = [] + + check_skin = self.check_skin(user["name"]) + check_cape = self.check_cape(user["name"]) + + textures = {} + textures_stamp = 0 + if check_skin is not None: + textures_stamp = max(textures_stamp, check_skin) + textures["SKIN"] = {"url": server_url + "/skin/%s.png" % user["name"]} + if check_cape is not None: + textures_stamp = max(textures_stamp, check_cape) + textures["CAPE"] = {"url": server_url + "/cape/%s.cape.png" % user["name"]} + + if textures: + prop = { + "timestamp": textures_stamp, + "profileId": profileId, + "profileName": user["name"], + "textures": textures + } + properties.append({ + "name": "textures", + "value": b64encode(json.dumps(prop).encode()).decode() + }) + + data = { + "id": profileId, + "name": user["name"], + "properties": properties, + "profileActions": [] + } + + return data + + def ygg_profile(self, path, params): + profileId = path + + data = self.get_profile(profileId) + if data is None: + self.send_response(HTTPStatus.NO_CONTENT) + self.end_headers() + self.send_ok(json.dumps(data).encode()) + + def ygg(self): + settings = { + "meta": { + "serverName": "server", + "feature.no_mojang_namespace": True, + "feature.legacy_skin_api": True + }, + "skinDomains": [server_domain] + } + body = json.dumps(settings).encode() + self.send_ok(body) + + def root(self): + self.send_response(HTTPStatus.NO_CONTENT) + self.send_header("X-Authlib-Injector-API-Location", server_url + "/ygg") + self.end_headers() + +def run(server_class=ThreadingHTTPServer, handler_class=HTTPRequestHandler): + server_address = ('', 25564) + httpd = server_class(server_address, handler_class) + httpd.serve_forever() +try: + run() +except KeyboardInterrupt: + pass