mid-kid
4 weeks ago
commit
3ea838bdbb
1 changed files with 269 additions and 0 deletions
@ -0,0 +1,269 @@ |
|||
#!/usr/bin/env python3 |
|||
from http.server import * |
|||
from http import HTTPStatus |
|||
from urllib.parse import urlparse, parse_qs |
|||
from base64 import b64encode |
|||
from os.path import getmtime |
|||
import json |
|||
|
|||
server_domain = "127.0.0.1" |
|||
server_url = "http://%s:25564" % server_domain |
|||
|
|||
userdata = { |
|||
"9a10d47934294527b3a0a32ab41d207e": { |
|||
"name": "Sneppe" |
|||
}, |
|||
"4ad1f41992193cb986741243f14d81e8": { |
|||
"name": "mid-kid" |
|||
} |
|||
} |
|||
connections = {} |
|||
|
|||
class HTTPRequestHandler(BaseHTTPRequestHandler): |
|||
protocol_version = "HTTP/1.1" |
|||
|
|||
def send_ok(self, data): |
|||
self.send_response(HTTPStatus.OK) |
|||
self.send_header("Content-Length", len(data)) |
|||
self.end_headers() |
|||
self.wfile.write(data) |
|||
|
|||
def do_GET(self): |
|||
if self.headers.get("Host") == "s3.amazonaws.com": |
|||
endpoints = { |
|||
"/MinecraftResources/": self._404 |
|||
} |
|||
elif self.headers.get("Host") == "www.minecraft.net": |
|||
endpoints = { |
|||
"/game/joinserver.jsp?": self.lgy_joinserver, |
|||
"/game/checkserver.jsp?": self.lgy_checkserver, |
|||
"/skin/": self.serve_skin |
|||
} |
|||
elif self.headers.get("Host") == "session.minecraft.net": |
|||
endpoints = { |
|||
"/game/joinserver.jsp?": self.lgy_joinserver, |
|||
"/game/checkserver.jsp?": self.lgy_checkserver |
|||
} |
|||
elif self.headers.get("Host") == "skins.minecraft.net": |
|||
endpoints = { |
|||
"/MinecraftSkins/": self.serve_skin, |
|||
"/MinecraftCloaks/": self.serve_cape |
|||
} |
|||
else: |
|||
endpoints = { |
|||
"/": self.root, |
|||
"/skin/": self.serve_skin, |
|||
"/cape/": self.serve_cape, |
|||
"/ygg": self.ygg, |
|||
"/ygg/sessionserver/session/minecraft/hasJoined?": self.ygg_checkserver, |
|||
"/ygg/sessionserver/session/minecraft/profile/": self.ygg_profile |
|||
} |
|||
|
|||
for url, func in endpoints.items(): |
|||
if url.endswith("?") and self.path.startswith(url): |
|||
params = parse_qs(self.path[len(url):]) |
|||
return func(params) |
|||
if url.endswith("/") and self.path.startswith(url) and url != "/": |
|||
parse = urlparse(self.path[len(url):]) |
|||
path = parse.path |
|||
params = parse_qs(parse.query) |
|||
return func(path, params) |
|||
if url == self.path: |
|||
return func() |
|||
|
|||
body = "".encode() |
|||
self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) |
|||
self.log_message("self.command: %s", self.command) |
|||
self.log_message("self.path: %s", self.path) |
|||
self.log_message("body: %s", body) |
|||
self._404() |
|||
|
|||
def do_POST(self): |
|||
body = self.rfile.read(int(self.headers.get("Content-Length"))) |
|||
|
|||
endpoints = { |
|||
"/ygg/sessionserver/session/minecraft/join": self.ygg_joinserver |
|||
} |
|||
|
|||
for url, func in endpoints.items(): |
|||
if url == self.path: |
|||
return func(body) |
|||
|
|||
self.log_message("self.headers.get('Host'): %s", self.headers.get("Host")) |
|||
self.log_message("self.command: %s", self.command) |
|||
self.log_message("self.path: %s", self.path) |
|||
self.log_message("body: %s", body) |
|||
self._404() |
|||
|
|||
def _404(self, *args): |
|||
self.send_response(HTTPStatus.NOT_FOUND) |
|||
self.end_headers() |
|||
|
|||
def open_data(self, path, file): |
|||
return open(path + "/" + file.replace("/", "_"), "rb") |
|||
|
|||
def serve_data(self, path, file): |
|||
data = None |
|||
try: |
|||
with self.open_data(path, file) as f: |
|||
data = f.read() |
|||
except FileNotFoundError: |
|||
self.send_response(HTTPStatus.NOT_FOUND) |
|||
self.end_headers() |
|||
else: |
|||
self.send_ok(data) |
|||
|
|||
def serve_skin(self, path, params): |
|||
return self.serve_data("skin", path) |
|||
|
|||
def serve_cape(self, path, params): |
|||
if path.endswith(".cape.png"): |
|||
path = path[:-9] + ".png" |
|||
return self.serve_data("cape", path) |
|||
|
|||
def check_data(self, path, file): |
|||
try: |
|||
return int(getmtime(path + "/" + file.replace("/", "_"))) |
|||
except FileNotFoundError: |
|||
return None |
|||
|
|||
def check_skin(self, name): |
|||
return self.check_data("skin", name + ".png") |
|||
|
|||
def check_cape(self, name): |
|||
return self.check_data("cape", name + ".png") |
|||
|
|||
def lgy_joinserver(self, params): |
|||
user = params["user"][0] |
|||
assert params["sessionId"][0].startswith("token:") |
|||
token, profileId = params["sessionId"][0].split(":")[1:] |
|||
serverId = params["serverId"][0] |
|||
|
|||
connections[serverId] = profileId |
|||
|
|||
if True: |
|||
self.send_ok(b"OK") |
|||
else: |
|||
# Displayed directly to the user |
|||
self.send_ok(b"Bad login") |
|||
|
|||
def ygg_joinserver(self, body): |
|||
data = json.loads(body.decode()) |
|||
token = data["accessToken"] |
|||
profileId = data["selectedProfile"] |
|||
serverId = data["serverId"] |
|||
|
|||
connections[serverId] = profileId |
|||
|
|||
if True: |
|||
self.send_response(HTTPStatus.NO_CONTENT) |
|||
self.end_headers() |
|||
else: |
|||
resp = b'{"error":"ForbiddenOperationException"}' |
|||
self.send_response(HTTPStatus.FORBIDDEN) |
|||
self.send_header("Content-Length", len(resp)) |
|||
self.end_headers() |
|||
self.wfile.write(resp) |
|||
|
|||
def lgy_checkserver(self, params): |
|||
user = params["user"][0] |
|||
serverId = params["serverId"][0] |
|||
|
|||
if not serverId in connections: |
|||
self.send_ok(b"NO") |
|||
|
|||
self.send_ok(b"YES") |
|||
|
|||
def ygg_checkserver(self, params): |
|||
serverId = params["serverId"][0] |
|||
user = params["username"][0] |
|||
|
|||
if not serverId in connections: |
|||
self.send_response(HTTPStatus.NO_CONTENT) |
|||
self.end_headers() |
|||
|
|||
profileId = connections[serverId] |
|||
|
|||
data = self.get_profile(profileId) |
|||
if data is None: |
|||
data = { |
|||
"id": profileId, |
|||
} |
|||
|
|||
self.send_ok(json.dumps(data).encode()) |
|||
|
|||
def get_profile(self, profileId): |
|||
if profileId not in userdata: |
|||
return None |
|||
|
|||
user = userdata[profileId] |
|||
|
|||
properties = [] |
|||
|
|||
check_skin = self.check_skin(user["name"]) |
|||
check_cape = self.check_cape(user["name"]) |
|||
|
|||
textures = {} |
|||
textures_stamp = 0 |
|||
if check_skin is not None: |
|||
textures_stamp = max(textures_stamp, check_skin) |
|||
textures["SKIN"] = {"url": server_url + "/skin/%s.png" % user["name"]} |
|||
if check_cape is not None: |
|||
textures_stamp = max(textures_stamp, check_cape) |
|||
textures["CAPE"] = {"url": server_url + "/cape/%s.cape.png" % user["name"]} |
|||
|
|||
if textures: |
|||
prop = { |
|||
"timestamp": textures_stamp, |
|||
"profileId": profileId, |
|||
"profileName": user["name"], |
|||
"textures": textures |
|||
} |
|||
properties.append({ |
|||
"name": "textures", |
|||
"value": b64encode(json.dumps(prop).encode()).decode() |
|||
}) |
|||
|
|||
data = { |
|||
"id": profileId, |
|||
"name": user["name"], |
|||
"properties": properties, |
|||
"profileActions": [] |
|||
} |
|||
|
|||
return data |
|||
|
|||
def ygg_profile(self, path, params): |
|||
profileId = path |
|||
|
|||
data = self.get_profile(profileId) |
|||
if data is None: |
|||
self.send_response(HTTPStatus.NO_CONTENT) |
|||
self.end_headers() |
|||
self.send_ok(json.dumps(data).encode()) |
|||
|
|||
def ygg(self): |
|||
settings = { |
|||
"meta": { |
|||
"serverName": "server", |
|||
"feature.no_mojang_namespace": True, |
|||
"feature.legacy_skin_api": True |
|||
}, |
|||
"skinDomains": [server_domain] |
|||
} |
|||
body = json.dumps(settings).encode() |
|||
self.send_ok(body) |
|||
|
|||
def root(self): |
|||
self.send_response(HTTPStatus.NO_CONTENT) |
|||
self.send_header("X-Authlib-Injector-API-Location", server_url + "/ygg") |
|||
self.end_headers() |
|||
|
|||
def run(server_class=ThreadingHTTPServer, handler_class=HTTPRequestHandler): |
|||
server_address = ('', 25564) |
|||
httpd = server_class(server_address, handler_class) |
|||
httpd.serve_forever() |
|||
try: |
|||
run() |
|||
except KeyboardInterrupt: |
|||
pass |
Loading…
Reference in new issue