#!/usr/bin/env python3 from http.server import * from http import HTTPStatus from urllib.parse import urlparse, parse_qs from base64 import b64encode from os.path import getmtime import json server_domain = "127.0.0.1" server_url = "http://%s:25564" % server_domain userdata = { "9a10d47934294527b3a0a32ab41d207e": { "name": "Sneppe" }, "4ad1f41992193cb986741243f14d81e8": { "name": "mid-kid" } } connections = {} class HTTPRequestHandler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def send_ok(self, data): self.send_response(HTTPStatus.OK) self.send_header("Content-Length", len(data)) self.end_headers() self.wfile.write(data) def do_GET(self): if self.headers.get("Host") == "s3.amazonaws.com": endpoints = { "/MinecraftResources/": self._404 } elif self.headers.get("Host") == "www.minecraft.net": endpoints = { "/game/joinserver.jsp?": self.lgy_joinserver, "/game/checkserver.jsp?": self.lgy_checkserver, "/skin/": self.serve_skin } elif self.headers.get("Host") == "session.minecraft.net": endpoints = { "/game/joinserver.jsp?": self.lgy_joinserver, "/game/checkserver.jsp?": self.lgy_checkserver } elif self.headers.get("Host") == "skins.minecraft.net": endpoints = { "/MinecraftSkins/": self.serve_skin, "/MinecraftCloaks/": self.serve_cape } else: endpoints = { "/": self.root, "/skin/": self.serve_skin, "/cape/": self.serve_cape, "/ygg": self.ygg, "/ygg/sessionserver/session/minecraft/hasJoined?": self.ygg_checkserver, "/ygg/sessionserver/session/minecraft/profile/": self.ygg_profile } for url, func in endpoints.items(): if url.endswith("?") and self.path.startswith(url): params = parse_qs(self.path[len(url):]) return func(params) if url.endswith("/") and self.path.startswith(url) and url != "/": parse = urlparse(self.path[len(url):]) path = parse.path params = parse_qs(parse.query) return func(path, params) if url == self.path: return func() body = "".encode() self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) self.log_message("self.command: %s", self.command) self.log_message("self.path: %s", self.path) self.log_message("body: %s", body) self._404() def do_POST(self): body = self.rfile.read(int(self.headers.get("Content-Length"))) endpoints = { "/ygg/sessionserver/session/minecraft/join": self.ygg_joinserver } for url, func in endpoints.items(): if url == self.path: return func(body) self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) self.log_message("self.command: %s", self.command) self.log_message("self.path: %s", self.path) self.log_message("body: %s", body) self._404() def _404(self, *args): self.send_response(HTTPStatus.NOT_FOUND) self.end_headers() def open_data(self, path, file): return open(path + "/" + file.replace("/", "_"), "rb") def serve_data(self, path, file): data = None try: with self.open_data(path, file) as f: data = f.read() except FileNotFoundError: self.send_response(HTTPStatus.NOT_FOUND) self.end_headers() else: self.send_ok(data) def serve_skin(self, path, params): return self.serve_data("skin", path) def serve_cape(self, path, params): if path.endswith(".cape.png"): path = path[:-9] + ".png" return self.serve_data("cape", path) def check_data(self, path, file): try: return int(getmtime(path + "/" + file.replace("/", "_"))) except FileNotFoundError: return None def check_skin(self, name): return self.check_data("skin", name + ".png") def check_cape(self, name): return self.check_data("cape", name + ".png") def lgy_joinserver(self, params): user = params["user"][0] assert params["sessionId"][0].startswith("token:") token, profileId = params["sessionId"][0].split(":")[1:] serverId = params["serverId"][0] connections[serverId] = profileId if True: self.send_ok(b"OK") else: # Displayed directly to the user self.send_ok(b"Bad login") def ygg_joinserver(self, body): data = json.loads(body.decode()) token = data["accessToken"] profileId = data["selectedProfile"] serverId = data["serverId"] connections[serverId] = profileId if True: self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() else: resp = b'{"error":"ForbiddenOperationException"}' self.send_response(HTTPStatus.FORBIDDEN) self.send_header("Content-Length", len(resp)) self.end_headers() self.wfile.write(resp) def lgy_checkserver(self, params): user = params["user"][0] serverId = params["serverId"][0] if not serverId in connections: self.send_ok(b"NO") self.send_ok(b"YES") def ygg_checkserver(self, params): serverId = params["serverId"][0] user = params["username"][0] if not serverId in connections: self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() profileId = connections[serverId] data = self.get_profile(profileId) if data is None: data = { "id": profileId, } self.send_ok(json.dumps(data).encode()) def get_profile(self, profileId): if profileId not in userdata: return None user = userdata[profileId] properties = [] check_skin = self.check_skin(user["name"]) check_cape = self.check_cape(user["name"]) textures = {} textures_stamp = 0 if check_skin is not None: textures_stamp = max(textures_stamp, check_skin) textures["SKIN"] = {"url": server_url + "/skin/%s.png" % user["name"]} if check_cape is not None: textures_stamp = max(textures_stamp, check_cape) textures["CAPE"] = {"url": server_url + "/cape/%s.cape.png" % user["name"]} if textures: prop = { "timestamp": textures_stamp, "profileId": profileId, "profileName": user["name"], "textures": textures } properties.append({ "name": "textures", "value": b64encode(json.dumps(prop).encode()).decode() }) data = { "id": profileId, "name": user["name"], "properties": properties, "profileActions": [] } return data def ygg_profile(self, path, params): profileId = path data = self.get_profile(profileId) if data is None: self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() self.send_ok(json.dumps(data).encode()) def ygg(self): settings = { "meta": { "serverName": "authlib-injector", "feature.no_mojang_namespace": True, "feature.legacy_skin_api": True }, "skinDomains": [server_domain] } body = json.dumps(settings).encode() self.send_ok(body) def root(self): self.send_response(HTTPStatus.NO_CONTENT) self.send_header("X-Authlib-Injector-API-Location", server_url + "/ygg") self.end_headers() def run(server_class=ThreadingHTTPServer, handler_class=HTTPRequestHandler): server_address = ('', 25564) httpd = server_class(server_address, handler_class) httpd.serve_forever() try: run() except KeyboardInterrupt: pass