Minecraft through the ages
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

290 lines
9.2 KiB

#!/usr/bin/env python3
from http import HTTPStatus
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from threading import Lock
from urllib.parse import urlparse, parse_qs
from configparser import ConfigParser
from base64 import b64encode
from os.path import getmtime
import json
from auth import Authentication
userdata = {}
_g_connections = {}
_g_connections_lock = Lock()
class HTTPRequestHandler(BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"
def setup(self, *args, **kwargs):
super().setup(*args, **kwargs)
self.config = self.server.config["server"]
def send_ok(self, data):
self.send_response(HTTPStatus.OK)
self.send_header("Content-Length", len(data))
self.end_headers()
self.wfile.write(data)
def do_GET(self):
if self.headers.get("Host") == "s3.amazonaws.com":
endpoints = {
"/MinecraftResources/": self._404
}
elif self.headers.get("Host") == "www.minecraft.net":
endpoints = {
"/game/joinserver.jsp?": self.lgy_joinserver,
"/game/checkserver.jsp?": self.lgy_checkserver,
"/skin/": self.serve_skin
}
elif self.headers.get("Host") == "session.minecraft.net":
endpoints = {
"/game/joinserver.jsp?": self.lgy_joinserver,
"/game/checkserver.jsp?": self.lgy_checkserver
}
elif self.headers.get("Host") == "skins.minecraft.net":
endpoints = {
"/MinecraftSkins/": self.serve_skin,
"/MinecraftCloaks/": self.serve_cape
}
else:
endpoints = {
"/": self.root,
"/skin/": self.serve_skin,
"/cape/": self.serve_cape,
"/ygg": self.ygg,
"/ygg/sessionserver/session/minecraft/hasJoined?": self.ygg_checkserver,
"/ygg/sessionserver/session/minecraft/profile/": self.ygg_profile
}
for url, func in endpoints.items():
if url.endswith("?") and self.path.startswith(url):
params = parse_qs(self.path[len(url):])
return func(params)
if url.endswith("/") and self.path.startswith(url) and url != "/":
parse = urlparse(self.path[len(url):])
path = parse.path
params = parse_qs(parse.query)
return func(path, params)
if url == self.path:
return func()
body = "".encode()
self.log_message("self.headers.get('Host'): %s", self.headers.get("Host"))
self.log_message("self.command: %s", self.command)
self.log_message("self.path: %s", self.path)
self.log_message("body: %s", body)
self._404()
def do_POST(self):
body = self.rfile.read(int(self.headers.get("Content-Length")))
endpoints = {
"/ygg/sessionserver/session/minecraft/join": self.ygg_joinserver
}
for url, func in endpoints.items():
if url == self.path:
return func(body)
self.log_message("self.headers.get('Host'): %s", self.headers.get("Host"))
self.log_message("self.command: %s", self.command)
self.log_message("self.path: %s", self.path)
self.log_message("body: %s", body)
self._404()
def _404(self, *args):
self.send_response(HTTPStatus.NOT_FOUND)
self.end_headers()
def open_data(self, path, file):
return open(path + "/" + file.replace("/", "_"), "rb")
def serve_data(self, path, file):
data = None
try:
with self.open_data(path, file) as f:
data = f.read()
except FileNotFoundError:
self.send_response(HTTPStatus.NOT_FOUND)
self.end_headers()
else:
self.send_ok(data)
def serve_skin(self, path, params):
return self.serve_data("skin", path)
def serve_cape(self, path, params):
if path.endswith(".cape.png"):
path = path[:-9] + ".png"
return self.serve_data("cape", path)
def check_data(self, path, file):
try:
return int(getmtime(path + "/" + file.replace("/", "_")))
except FileNotFoundError:
return None
def check_skin(self, name):
return self.check_data("skin", name + ".png")
def check_cape(self, name):
return self.check_data("cape", name + ".png")
def lgy_joinserver(self, params):
user = params["user"][0]
assert params["sessionId"][0].startswith("token:")
token, profileId = params["sessionId"][0].split(":")[1:]
serverId = params["serverId"][0]
if not self.auth.check_token(profileId, token):
# Displayed directly to the user
self.send_ok(b"Bad login")
return
with _g_connections_lock:
_g_connections[serverId] = (profileId, token)
self.send_ok(b"OK")
def ygg_joinserver(self, body):
data = json.loads(body.decode())
token = data["accessToken"]
profileId = data["selectedProfile"]
serverId = data["serverId"]
if not self.auth.check_token(profileId, token):
resp = b'{"error":"ForbiddenOperationException"}'
self.send_response(HTTPStatus.FORBIDDEN)
self.send_header("Content-Length", len(resp))
self.end_headers()
self.wfile.write(resp)
return
with _g_connections_lock:
_g_connections[serverId] = (profileId, token)
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
def lgy_checkserver(self, params):
user = params["user"][0]
serverId = params["serverId"][0]
with _g_connections_lock:
if not serverId in _g_connections:
self.send_ok(b"NO")
return
profileId, token = _g_connections[serverId]
del _g_connections[serverId]
if not self.auth.check_user(profileId, token, user):
self.send_ok(b"NO")
return
self.send_ok(b"YES")
def ygg_checkserver(self, params):
serverId = params["serverId"][0]
user = params["username"][0]
with _g_connections_lock:
if not serverId in _g_connections:
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
return
profileId, token = _g_connections[serverId]
del _g_connections[serverId]
if not self.auth.check_user(profileId, token, user):
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
return
data = self.get_profile(profileId)
if data is None:
data = {
"id": profileId,
}
self.send_ok(json.dumps(data).encode())
def get_profile(self, profileId):
user = self.auth.get_user(profileId)
if user is None:
return None
properties = []
check_skin = self.check_skin(user)
check_cape = self.check_cape(user)
url = self.config["server_url"]
textures = {}
textures_stamp = 0
if check_skin is not None:
textures_stamp = max(textures_stamp, check_skin)
textures["SKIN"] = {"url": url + "/skin/%s.png" % user}
if check_cape is not None:
textures_stamp = max(textures_stamp, check_cape)
textures["CAPE"] = {"url": url + "/cape/%s.cape.png" % user}
if textures:
prop = {
"timestamp": textures_stamp,
"profileId": profileId,
"profileName": user,
"textures": textures
}
properties.append({
"name": "textures",
"value": b64encode(json.dumps(prop).encode()).decode()
})
data = {
"id": profileId,
"name": user,
"properties": properties,
"profileActions": []
}
return data
def ygg_profile(self, path, params):
profileId = path
data = self.get_profile(profileId)
if data is None:
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
self.send_ok(json.dumps(data).encode())
def ygg(self):
settings = {
"meta": {
"serverName": "authlib-injector",
"feature.no_mojang_namespace": True,
"feature.legacy_skin_api": True
},
"skinDomains": [self.config["server_dom"]]
}
body = json.dumps(settings).encode()
self.send_ok(body)
def root(self):
self.send_response(HTTPStatus.NO_CONTENT)
self.send_header("X-Authlib-Injector-API-Location", self.config["server_url"] + "/ygg")
self.end_headers()
if __name__ == "__main__":
server_address = ("", 25564)
config = ConfigParser()
config.read("config.ini")
auth = Authentication("users.db")
httpd = ThreadingHTTPServer(server_address, HTTPRequestHandler)
httpd.config = config
httpd.auth = auth
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass