You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
294 lines
9.3 KiB
294 lines
9.3 KiB
#!/usr/bin/env python3
|
|
from http import HTTPStatus
|
|
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
|
from threading import Lock
|
|
from urllib.parse import urlparse, parse_qs
|
|
from configparser import ConfigParser
|
|
from base64 import b64encode
|
|
from os.path import getmtime
|
|
import json
|
|
|
|
from auth import Authentication
|
|
|
|
_g_connections = {}
|
|
_g_connections_lock = Lock()
|
|
|
|
class HTTPRequestHandler(BaseHTTPRequestHandler):
|
|
protocol_version = "HTTP/1.1"
|
|
|
|
def setup(self, *args, **kwargs):
|
|
super().setup(*args, **kwargs)
|
|
self.config = self.server.config["server"]
|
|
self.auth = self.server.auth
|
|
|
|
def send_ok(self, data):
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Length", len(data))
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def do_GET(self):
|
|
if self.headers.get("Host") == "s3.amazonaws.com":
|
|
endpoints = {
|
|
"/MinecraftResources/": self._404
|
|
}
|
|
elif self.headers.get("Host") == "www.minecraft.net":
|
|
endpoints = {
|
|
"/game/joinserver.jsp?": self.lgy_joinserver,
|
|
"/game/checkserver.jsp?": self.lgy_checkserver,
|
|
"/skin/": self.serve_skin
|
|
}
|
|
elif self.headers.get("Host") == "session.minecraft.net":
|
|
endpoints = {
|
|
"/game/joinserver.jsp?": self.lgy_joinserver,
|
|
"/game/checkserver.jsp?": self.lgy_checkserver
|
|
}
|
|
elif self.headers.get("Host") == "skins.minecraft.net":
|
|
endpoints = {
|
|
"/MinecraftSkins/": self.serve_skin,
|
|
"/MinecraftCloaks/": self.serve_cape
|
|
}
|
|
else:
|
|
endpoints = {
|
|
"/": self.root,
|
|
"/skin/": self.serve_skin,
|
|
"/cape/": self.serve_cape,
|
|
"/ygg": self.ygg,
|
|
"/ygg/sessionserver/session/minecraft/hasJoined?": self.ygg_checkserver,
|
|
"/ygg/sessionserver/session/minecraft/profile/": self.ygg_profile
|
|
}
|
|
|
|
for url, func in endpoints.items():
|
|
if url.endswith("?") and self.path.startswith(url):
|
|
params = parse_qs(self.path[len(url):])
|
|
return func(params)
|
|
if url.endswith("/") and self.path.startswith(url) and url != "/":
|
|
parse = urlparse(self.path[len(url):])
|
|
path = parse.path
|
|
params = parse_qs(parse.query)
|
|
return func(path, params)
|
|
if url == self.path:
|
|
return func()
|
|
|
|
body = "".encode()
|
|
self.log_message("self.headers.get('Host'): %s", self.headers.get("Host"))
|
|
self.log_message("self.command: %s", self.command)
|
|
self.log_message("self.path: %s", self.path)
|
|
self.log_message("body: %s", body)
|
|
self._404()
|
|
|
|
def do_POST(self):
|
|
body = self.rfile.read(int(self.headers.get("Content-Length")))
|
|
|
|
endpoints = {
|
|
"/ygg/sessionserver/session/minecraft/join": self.ygg_joinserver
|
|
}
|
|
|
|
for url, func in endpoints.items():
|
|
if url == self.path:
|
|
return func(body)
|
|
|
|
self.log_message("self.headers.get('Host'): %s", self.headers.get("Host"))
|
|
self.log_message("self.command: %s", self.command)
|
|
self.log_message("self.path: %s", self.path)
|
|
self.log_message("body: %s", body)
|
|
self._404()
|
|
|
|
def _404(self, *args):
|
|
self.send_response(HTTPStatus.NOT_FOUND)
|
|
self.end_headers()
|
|
|
|
def open_data(self, path, file):
|
|
return open(path + "/" + file.replace("/", "_"), "rb")
|
|
|
|
def serve_data(self, path, file):
|
|
data = None
|
|
try:
|
|
with self.open_data(path, file) as f:
|
|
data = f.read()
|
|
except FileNotFoundError:
|
|
self.send_response(HTTPStatus.NOT_FOUND)
|
|
self.end_headers()
|
|
else:
|
|
self.send_ok(data)
|
|
|
|
def serve_skin(self, path, params):
|
|
return self.serve_data("skin", path)
|
|
|
|
def serve_cape(self, path, params):
|
|
if path.endswith(".cape.png"):
|
|
path = path[:-9] + ".png"
|
|
return self.serve_data("cape", path)
|
|
|
|
def check_data(self, path, file):
|
|
try:
|
|
return int(getmtime(path + "/" + file.replace("/", "_")))
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def check_skin(self, name):
|
|
return self.check_data("skin", name + ".png")
|
|
|
|
def check_cape(self, name):
|
|
return self.check_data("cape", name + ".png")
|
|
|
|
def lgy_joinserver(self, params):
|
|
user = params["user"][0]
|
|
assert params["sessionId"][0].startswith("token:")
|
|
token, profileId = params["sessionId"][0].split(":")[1:]
|
|
serverId = params["serverId"][0]
|
|
|
|
token = token.split(".")[-1]
|
|
|
|
if not self.auth.check_token(profileId, token):
|
|
# Displayed directly to the user
|
|
self.send_ok(b"Bad login")
|
|
return
|
|
|
|
with _g_connections_lock:
|
|
_g_connections[serverId] = (profileId, token)
|
|
self.send_ok(b"OK")
|
|
|
|
def ygg_joinserver(self, body):
|
|
data = json.loads(body.decode())
|
|
token = data["accessToken"]
|
|
profileId = data["selectedProfile"]
|
|
serverId = data["serverId"]
|
|
|
|
token = token.split(".")[-1]
|
|
|
|
if not self.auth.check_token(profileId, token):
|
|
resp = b'{"error":"ForbiddenOperationException"}'
|
|
self.send_response(HTTPStatus.FORBIDDEN)
|
|
self.send_header("Content-Length", len(resp))
|
|
self.end_headers()
|
|
self.wfile.write(resp)
|
|
return
|
|
|
|
with _g_connections_lock:
|
|
_g_connections[serverId] = (profileId, token)
|
|
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
|
|
def lgy_checkserver(self, params):
|
|
user = params["user"][0]
|
|
serverId = params["serverId"][0]
|
|
|
|
with _g_connections_lock:
|
|
if not serverId in _g_connections:
|
|
self.send_ok(b"NO")
|
|
return
|
|
profileId, token = _g_connections[serverId]
|
|
del _g_connections[serverId]
|
|
|
|
if not self.auth.check_user(profileId, token, user):
|
|
self.send_ok(b"NO")
|
|
return
|
|
|
|
self.send_ok(b"YES")
|
|
|
|
def ygg_checkserver(self, params):
|
|
serverId = params["serverId"][0]
|
|
user = params["username"][0]
|
|
|
|
with _g_connections_lock:
|
|
if not serverId in _g_connections:
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
return
|
|
profileId, token = _g_connections[serverId]
|
|
del _g_connections[serverId]
|
|
|
|
if not self.auth.check_user(profileId, token, user):
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
return
|
|
|
|
data = self.get_profile(profileId)
|
|
if data is None:
|
|
data = {
|
|
"id": profileId,
|
|
}
|
|
|
|
self.send_ok(json.dumps(data).encode())
|
|
|
|
def get_profile(self, profileId):
|
|
user = self.auth.get_user(profileId)
|
|
if user is None:
|
|
return None
|
|
|
|
properties = []
|
|
|
|
check_skin = self.check_skin(user)
|
|
check_cape = self.check_cape(user)
|
|
|
|
url = self.config["server_url"]
|
|
textures = {}
|
|
textures_stamp = 0
|
|
if check_skin is not None:
|
|
textures_stamp = max(textures_stamp, check_skin)
|
|
textures["SKIN"] = {"url": url + "/skin/%s.png" % user}
|
|
if check_cape is not None:
|
|
textures_stamp = max(textures_stamp, check_cape)
|
|
textures["CAPE"] = {"url": url + "/cape/%s.cape.png" % user}
|
|
|
|
if textures:
|
|
prop = {
|
|
"timestamp": textures_stamp,
|
|
"profileId": profileId,
|
|
"profileName": user,
|
|
"textures": textures
|
|
}
|
|
properties.append({
|
|
"name": "textures",
|
|
"value": b64encode(json.dumps(prop).encode()).decode()
|
|
})
|
|
|
|
data = {
|
|
"id": profileId,
|
|
"name": user,
|
|
"properties": properties,
|
|
"profileActions": []
|
|
}
|
|
|
|
return data
|
|
|
|
def ygg_profile(self, path, params):
|
|
profileId = path
|
|
|
|
data = self.get_profile(profileId)
|
|
if data is None:
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
self.send_ok(json.dumps(data).encode())
|
|
|
|
def ygg(self):
|
|
settings = {
|
|
"meta": {
|
|
"serverName": "authlib-injector",
|
|
"feature.no_mojang_namespace": True,
|
|
"feature.legacy_skin_api": True
|
|
},
|
|
"skinDomains": [self.config["server_dom"]]
|
|
}
|
|
body = json.dumps(settings).encode()
|
|
self.send_ok(body)
|
|
|
|
def root(self):
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.send_header("X-Authlib-Injector-API-Location", self.config["server_url"] + "/ygg")
|
|
self.end_headers()
|
|
|
|
if __name__ == "__main__":
|
|
server_address = ("", 25564)
|
|
config = ConfigParser()
|
|
config.read("config.ini")
|
|
auth = Authentication("users.db")
|
|
httpd = ThreadingHTTPServer(server_address, HTTPRequestHandler)
|
|
httpd.config = config
|
|
httpd.auth = auth
|
|
try:
|
|
httpd.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|