#!/usr/bin/env python3 from http import HTTPStatus from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler from threading import Lock from urllib.parse import urlparse, parse_qs from configparser import ConfigParser from base64 import b64encode from os.path import getmtime import json from auth import Authentication userdata = {} _g_connections = {} _g_connections_lock = Lock() class HTTPRequestHandler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" def setup(self, *args, **kwargs): super().setup(*args, **kwargs) self.config = self.server.config["server"] self.auth = self.server.auth def send_ok(self, data): self.send_response(HTTPStatus.OK) self.send_header("Content-Length", len(data)) self.end_headers() self.wfile.write(data) def do_GET(self): if self.headers.get("Host") == "s3.amazonaws.com": endpoints = { "/MinecraftResources/": self._404 } elif self.headers.get("Host") == "www.minecraft.net": endpoints = { "/game/joinserver.jsp?": self.lgy_joinserver, "/game/checkserver.jsp?": self.lgy_checkserver, "/skin/": self.serve_skin } elif self.headers.get("Host") == "session.minecraft.net": endpoints = { "/game/joinserver.jsp?": self.lgy_joinserver, "/game/checkserver.jsp?": self.lgy_checkserver } elif self.headers.get("Host") == "skins.minecraft.net": endpoints = { "/MinecraftSkins/": self.serve_skin, "/MinecraftCloaks/": self.serve_cape } else: endpoints = { "/": self.root, "/skin/": self.serve_skin, "/cape/": self.serve_cape, "/ygg": self.ygg, "/ygg/sessionserver/session/minecraft/hasJoined?": self.ygg_checkserver, "/ygg/sessionserver/session/minecraft/profile/": self.ygg_profile } for url, func in endpoints.items(): if url.endswith("?") and self.path.startswith(url): params = parse_qs(self.path[len(url):]) return func(params) if url.endswith("/") and self.path.startswith(url) and url != "/": parse = urlparse(self.path[len(url):]) path = parse.path params = parse_qs(parse.query) return func(path, params) if url == self.path: return func() body = "".encode() self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) self.log_message("self.command: %s", self.command) self.log_message("self.path: %s", self.path) self.log_message("body: %s", body) self._404() def do_POST(self): body = self.rfile.read(int(self.headers.get("Content-Length"))) endpoints = { "/ygg/sessionserver/session/minecraft/join": self.ygg_joinserver } for url, func in endpoints.items(): if url == self.path: return func(body) self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) self.log_message("self.command: %s", self.command) self.log_message("self.path: %s", self.path) self.log_message("body: %s", body) self._404() def _404(self, *args): self.send_response(HTTPStatus.NOT_FOUND) self.end_headers() def open_data(self, path, file): return open(path + "/" + file.replace("/", "_"), "rb") def serve_data(self, path, file): data = None try: with self.open_data(path, file) as f: data = f.read() except FileNotFoundError: self.send_response(HTTPStatus.NOT_FOUND) self.end_headers() else: self.send_ok(data) def serve_skin(self, path, params): return self.serve_data("skin", path) def serve_cape(self, path, params): if path.endswith(".cape.png"): path = path[:-9] + ".png" return self.serve_data("cape", path) def check_data(self, path, file): try: return int(getmtime(path + "/" + file.replace("/", "_"))) except FileNotFoundError: return None def check_skin(self, name): return self.check_data("skin", name + ".png") def check_cape(self, name): return self.check_data("cape", name + ".png") def lgy_joinserver(self, params): user = params["user"][0] assert params["sessionId"][0].startswith("token:") token, profileId = params["sessionId"][0].split(":")[1:] serverId = params["serverId"][0] if not self.auth.check_token(profileId, token): # Displayed directly to the user self.send_ok(b"Bad login") return with _g_connections_lock: _g_connections[serverId] = (profileId, token) self.send_ok(b"OK") def ygg_joinserver(self, body): data = json.loads(body.decode()) token = data["accessToken"] profileId = data["selectedProfile"] serverId = data["serverId"] if not self.auth.check_token(profileId, token): resp = b'{"error":"ForbiddenOperationException"}' self.send_response(HTTPStatus.FORBIDDEN) self.send_header("Content-Length", len(resp)) self.end_headers() self.wfile.write(resp) return with _g_connections_lock: _g_connections[serverId] = (profileId, token) self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() def lgy_checkserver(self, params): user = params["user"][0] serverId = params["serverId"][0] with _g_connections_lock: if not serverId in _g_connections: self.send_ok(b"NO") return profileId, token = _g_connections[serverId] del _g_connections[serverId] if not self.auth.check_user(profileId, token, user): self.send_ok(b"NO") return self.send_ok(b"YES") def ygg_checkserver(self, params): serverId = params["serverId"][0] user = params["username"][0] with _g_connections_lock: if not serverId in _g_connections: self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() return profileId, token = _g_connections[serverId] del _g_connections[serverId] if not self.auth.check_user(profileId, token, user): self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() return data = self.get_profile(profileId) if data is None: data = { "id": profileId, } self.send_ok(json.dumps(data).encode()) def get_profile(self, profileId): user = self.auth.get_user(profileId) if user is None: return None properties = [] check_skin = self.check_skin(user) check_cape = self.check_cape(user) url = self.config["server_url"] textures = {} textures_stamp = 0 if check_skin is not None: textures_stamp = max(textures_stamp, check_skin) textures["SKIN"] = {"url": url + "/skin/%s.png" % user} if check_cape is not None: textures_stamp = max(textures_stamp, check_cape) textures["CAPE"] = {"url": url + "/cape/%s.cape.png" % user} if textures: prop = { "timestamp": textures_stamp, "profileId": profileId, "profileName": user, "textures": textures } properties.append({ "name": "textures", "value": b64encode(json.dumps(prop).encode()).decode() }) data = { "id": profileId, "name": user, "properties": properties, "profileActions": [] } return data def ygg_profile(self, path, params): profileId = path data = self.get_profile(profileId) if data is None: self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() self.send_ok(json.dumps(data).encode()) def ygg(self): settings = { "meta": { "serverName": "authlib-injector", "feature.no_mojang_namespace": True, "feature.legacy_skin_api": True }, "skinDomains": [self.config["server_dom"]] } body = json.dumps(settings).encode() self.send_ok(body) def root(self): self.send_response(HTTPStatus.NO_CONTENT) self.send_header("X-Authlib-Injector-API-Location", self.config["server_url"] + "/ygg") self.end_headers() if __name__ == "__main__": server_address = ("", 25564) config = ConfigParser() config.read("config.ini") auth = Authentication("users.db") httpd = ThreadingHTTPServer(server_address, HTTPRequestHandler) httpd.config = config httpd.auth = auth try: httpd.serve_forever() except KeyboardInterrupt: pass