MapleSVG/server.py

546 lines
17 KiB
Python

from asyncio import Event, get_running_loop, sleep
from io import BytesIO
from numbers import Number
from pathlib import Path
from random import choice
from re import compile
from typing import Any
import aiofiles
from aiofiles.os import mkdir
from aiohttp import ClientSession, ContentTypeError
from fastapi import Depends, FastAPI, Response
from fastapi.responses import HTMLResponse
from jinja2 import Environment, FileSystemLoader, select_autoescape
from PIL import UnidentifiedImageError
from PIL.Image import Image
from PIL.Image import open as open_image, new
from PIL.PyAccess import _PyAccess32_4
from starlette.requests import Request
# from starlette.responses import FileResponse
try:
from uvloop import install
install()
except ImportError:
...
env = Environment(
loader=FileSystemLoader("./static/"),
autoescape=select_autoescape(),
trim_blocks=True,
)
DIGITS = compile(r"^[0-9]+$")
VERSION = compile(r"(?=\w\d?\.(\d+)\.png)")
IMAGES = Path("./images/sorted/Mob/")
PAGES = Path("./pages/")
SVGS = Path("./svgs/")
mob_mapping = {}
stance_mapping = []
for p in SVGS.iterdir():
if not p.is_dir():
continue
for m in p.glob("./*.svg"):
mob_mapping.setdefault(p.name, list())
mob_mapping[p.name].append(m.name.rstrip(".svg"))
stance_mapping.append(p.name)
stance_mapping = sorted(stance_mapping)
PAGE_PATHS: list[Path] = []
TEMPLATE = env.get_template("./pages/page.j2")
HUGE_FILE = env.get_template("./pages/huge_file.j2")
HOVER = env.get_template("./pages/hover.j2")
wsgi_app = FastAPI(title="ra.tcp.direct", description="ra.tcp.direct")
AVAILABLE = {}
CURRENT = {}
MOB_DATA = {}
ERROR_PATHS: dict[int, set[str]] = {}
CACHING = Event()
for mob_folder in IMAGES.glob("./*/"):
mob_id = int(mob_folder.name)
for stance_folder in mob_folder.glob("./*/"):
stance = stance_folder.name
AVAILABLE.setdefault(mob_id, {})
AVAILABLE[mob_id][stance] = stance_folder
for mob_stance in SVGS.glob("./*/*.svg"):
mob_id = mob_stance.name.removesuffix(".svg")
stance = mob_stance.parent.name
CURRENT.setdefault(mob_id, {})
CURRENT[mob_id][stance] = mob_stance
available_mob_ids: list[int] = [int(k) for k in AVAILABLE.keys()]
class SVGPath:
def __init__(
self, color: list[Number], x: Number, y: Number, scale: Number, len_x: Number
):
self.color = color
self.x = x
self.y = y
self.scale = scale
self.len_x = len_x
def __eq__(self, obj):
if isinstance(obj, dict):
c1, c2, c3, c4 = obj.get("color", tuple(bytearray(4)))
if self.color == (c1, c2, c3, c4):
return True
return self.color[0] == obj
class Mob:
__slots__ = "id", "name", "stance"
def __init__(self, _id=0, name="", stance=None):
self.id = _id
self.name = name
if stance:
if len(stance) > 0:
self.stance = stance
def gen_path(path_vars: tuple[list[Number], Number, Number, Number, Number]):
color, x, y, scale, len_x = path_vars
def render_html(img: Image, frame_id=None, link_id=None, scale=1):
pixels: _PyAccess32_4 = img.load() # type: ignore
width, height = img.size
nodes = []
x, y = 0, 0
def count_x(x_start, y):
i = x_start
start_val = pixels[x_start, y]
for xx in range(x_start + 1, width):
if (pixels[xx, y] == start_val) and (
pixels[xx, y][3] / 255 == start_val[3] / 255
):
i += 1
else:
break
return i - x_start
while y < height:
if y >= height - 1:
break
len_x = count_x(x, y)
color = pixels[x, y]
if (color[0], color[1], color[2], color[3]) != (0, 0, 0, 0.0):
nodes.append(
'<path style="fill: '
f'rgba({color[0]}, {color[1]}, {color[2]}, {color[3] / 255})" '
f'd="M{x*scale} {y*scale} H{(x+(len_x)+1)*scale} '
f'V{(y+1)*scale} L{x*scale} {(y+1)*scale} Z"/>'
)
x += len_x + 1
if x >= width:
x = 0
y += 1
if frame_id is not None:
begin = f"l{link_id}l.end" if frame_id != 0 else f"0s;l{link_id}l.end"
animate = f"""<animate attributeType="XML" attributeName="visibility"
from="visible" to="visible" id="l{frame_id}l" begin="{begin}" dur="0.1s"/>"""
else:
animate = ""
visible = 'visibility="hidden"' if link_id is not None else ""
return f'<svg {visible} height="{height*scale}" width="{width*scale}" >{animate}{"".join(nodes)}</svg>'
async def render_page(
folder_path: Path, mob_id: None | int = None, stance: None | str = None
):
if not mob_id:
mob_id = int(folder_path.parent.name)
if not stance:
stance = folder_path.name
if (svg_file := (SVGS / f"{stance}" / f"{mob_id}.svg")).exists():
async with aiofiles.open(svg_file, "r") as svg:
return await svg.read()
frames = []
imgs = {}
widths = []
heights = []
scale = 1
for i, img_file in enumerate(folder_path.glob("./*.png")):
version = VERSION.search(img_file.name)
if version:
v = int(version.group(1))
else:
v = i
async with aiofiles.open(img_file, "rb") as img_:
try:
img = open_image(BytesIO(await img_.read()))
except UnidentifiedImageError:
return ""
widths.append(img.size[0])
heights.append(img.size[1])
imgs[v] = img
keys = sorted(imgs.keys(), key=lambda x: x)
imgs = [imgs[k] for k in keys]
if not len(widths) or not len(heights):
return ""
maxes = (max(widths), max(heights))
for a, image in enumerate(imgs):
box = new("RGBA", maxes)
box.paste(image, (maxes[0] - image.size[0], maxes[1] - image.size[1]))
img_bytes = BytesIO()
box.save(img_bytes, "PNG")
if a < len(imgs) or a == 0:
lid = a - 1
else:
lid = 0
if a == 0:
lid = len(imgs) - 1
frame_id = a if len(imgs) > 1 else None
else:
frame_id = a
frames.append(
render_html(
box,
frame_id=frame_id,
link_id=lid if len(imgs) > 1 else None,
scale=scale,
)
)
if len(frames) < 1:
return ""
svg_folder = SVGS / f"{stance}"
if not svg_folder.exists():
await mkdir(svg_folder)
svg_file = svg_folder / f"{mob_id}.svg"
svg_cont = (
'<?xml version="1.0" standalone="yes"?>'
f'<svg xmlns="http://www.w3.org/2000/svg" version="1.1" '
f'width="{maxes[0]*scale}" height="{maxes[1]*scale}">{"".join(frames)}</svg>'
)
async with aiofiles.open(svg_file, "w") as svg:
await svg.write(svg_cont)
CURRENT.setdefault(mob_id, {})
CURRENT[mob_id][stance] = svg_file
return svg_cont
async def get_rand_page():
confirmed = False
page_file = None
while not confirmed:
mob_id = int(choice(available_mob_ids))
stance = choice(list(AVAILABLE[mob_id].keys()))
page_file = CURRENT.get(mob_id, {}).get(stance, None)
huge_file = False
if not page_file:
total_bytes = 0
confirmed = False
for f in AVAILABLE[mob_id][stance].glob("*.png"):
try:
async with aiofiles.open(f, "rb") as img:
byt = BytesIO(await img.read())
total_bytes += len(byt.getbuffer())
if total_bytes > 500000:
huge_file = True
if total_bytes < 100:
continue
if not confirmed:
open_image(byt)
confirmed = True
except UnidentifiedImageError:
ERROR_PATHS.setdefault(mob_id, set())
ERROR_PATHS[mob_id].add(stance)
continue
if huge_file:
return HUGE_FILE.render(mob_id=mob_id, stance=stance)
page = await render_page(
AVAILABLE[mob_id][stance], mob_id=mob_id, stance=stance
)
if page:
return TEMPLATE.render(frame=page, mob_id=mob_id, stance=stance)
else:
ERROR_PATHS.setdefault(mob_id, set())
ERROR_PATHS[mob_id].add(stance)
continue
break
if page_file:
async with aiofiles.open(page_file, "r") as ret_page:
return await ret_page.read()
async def get_specific_mob(mob_id: int = 0, stance: str | None = None):
if not stance:
stance = choice(list(AVAILABLE.get(mob_id, [])))
possibly_page = CURRENT.get(mob_id, {}).get(stance, None)
if possibly_page and possibly_page.exists():
async with aiofiles.open(possibly_page, "r") as cont:
return TEMPLATE.render(
frame=await cont.read(), mob_id=mob_id, stance=stance
)
possibly_exists = AVAILABLE.get(mob_id, {}).get(stance, None)
if possibly_exists and possibly_exists.exists():
return TEMPLATE.render(
frame=await render_page(possibly_exists, mob_id=mob_id, stance=stance),
mob_id=mob_id,
stance=stance,
)
return "Does not Exist"
@wsgi_app.get("/", status_code=200)
async def rand_page(page: str = Depends(get_rand_page)):
return HTMLResponse(page)
@wsgi_app.get("/mob/", status_code=200)
async def view_mobs():
mob_page = env.get_template("pages/browse.j2")
next_ = 6 if (len(MOB_DATA) // 500) - 6 > 0 else 6 - abs((len(MOB_DATA) // 500) - 6)
return HTMLResponse(
mob_page.render(
mobs=dict(tuple(MOB_DATA.items())[0:500]),
total=len(MOB_DATA.values()) // 500,
current=1,
prev=["&nbsp;", "&nbsp;", "&nbsp;", "&nbsp;", "&nbsp;"],
next=[_ for _ in range(1, next_)],
)
)
@wsgi_app.get("/mob/page/{page:int}/", status_code=200)
async def view_mob_page(page: int):
if page < 1 or page * 500 > len(MOB_DATA):
return HTMLResponse("Invalid Page Number", status_code=404)
if abs(page - 5) + (page - 5) == 0:
prev = 1
else:
prev = page - 5
next_ = (
page + 5
if (len(MOB_DATA) // 500) - (page + 5) > 0
else page + (5 - abs((len(MOB_DATA) // 500) - (page + 5)))
)
mob_page = env.get_template("pages/browse.j2")
prev = ["&nbsp;" for i in range(5 - (page - (prev)))] + [
f"{i}" for i in range(prev, page)
]
next_ = [f"{i}" for i in range(page + 1, next_)] + [
"&nbsp;" for i in range(5 - (next_ - (page + 1)))
]
return HTMLResponse(
mob_page.render(
mobs=dict(tuple(MOB_DATA.items())[slice((page - 1) * 500, page * 500)]),
total=len(MOB_DATA.values()) // 500,
prev=prev,
current=page,
next=next_,
)
)
@wsgi_app.get("/mob/{mob_id:int}/", status_code=200)
async def view_mob(mob_id: int) -> HTMLResponse:
data = await get_specific_mob(mob_id)
return HTMLResponse(data)
@wsgi_app.get("/assets/css/combo.css", status_code=200)
async def get_combo_css() -> HTMLResponse:
async with aiofiles.open("static/css/combo.css", "r") as f:
return HTMLResponse(await f.read(), media_type="text/css")
@wsgi_app.get("/assets/fonts/Symbols-2048-em Nerd Font Complete.woff2", status_code=200)
async def get_nf_font() -> Response:
async with aiofiles.open("static/fonts/nf-complete.woff2", "rb") as f:
return Response(await f.read(), media_type="font/woff2")
@wsgi_app.get("/assets/js/site.js", status_code=200)
async def get_site_js() -> Response:
async with aiofiles.open("static/js/site.js", "rb") as f:
return Response(await f.read(), media_type="")
@wsgi_app.get("/mob/{mob_id:int}/{stance}", status_code=200)
async def view_mob_stance(mob_id: int, stance: str) -> HTMLResponse:
data = await get_specific_mob(mob_id, stance)
return HTMLResponse(data)
async def startup():
get_running_loop().create_task(fill_cache())
wsgi_app.add_event_handler("startup", startup)
@wsgi_app.get("/export/mob/{mob_id:int}_{stance:path}.svg", status_code=200)
async def export_mob_svg(mob_id: int, stance: str):
mob_id = int(mob_id)
possibly_page = CURRENT.get(mob_id, {}).get(stance, None)
if possibly_page and possibly_page.exists():
async with aiofiles.open(possibly_page, "r") as page:
return Response(
await page.read(),
media_type="image/svg+xml",
headers={"Content-Type": "image/svg+xml"},
)
possibly_exists = AVAILABLE.get(mob_id, {}).get(stance, None)
if possibly_exists and possibly_exists.exists():
return Response(
await render_page(possibly_exists, mob_id=mob_id, stance=stance),
headers={"Content-Type": "image/svg+xml", "Vary": "Accept-Encoding"},
)
return HTMLResponse("No data")
async def sumshit(arg1: Request, arg2) -> Response:
return Response(
content=(
"<body style='display: grid;background: #151515; margin: 0px;"
"color: #C4C4C4;height: 100%;align-items: center;'>"
"<pre style='text-align: center; display:inline;'>ᓚᘏᗢ</pre></body>"
),
status_code=451,
headers={"encoding": "utf-8", "content-type": "text/html; charset=utf-8"},
)
@wsgi_app.get("/hover", status_code=200)
async def hover():
mobs = mob_mapping["stand"]
return HTMLResponse(HOVER.render(mobs=mobs, stance="stand", stances=stance_mapping))
@wsgi_app.get("/hover/{stance:str}", status_code=200)
async def hover_stance(stance: str):
mobs = mob_mapping.get(stance, mob_mapping["stand"])
return HTMLResponse(HOVER.render(mobs=mobs, stance=stance, stances=stance_mapping))
async def update_lib(upd_ids) -> None:
i = 0
async with ClientSession() as session:
for mob_id in upd_ids:
if AVAILABLE.get(int(mob_id), {}):
continue
AVAILABLE.setdefault(int(mob_id), {})
jsn = {}
async with session.request(
"GET", f"https://maplestory.io/api/GMS/233/mob/{mob_id}/"
) as resp2:
try:
jsn: dict[str, Any] = await resp2.json()
except ContentTypeError:
continue
for frm_nm, frm_ct in jsn.get("framebooks", {}).items():
if i > 20:
await sleep(10)
i = 0
if frm_nm in ERROR_PATHS.get(mob_id, set()):
continue
img_fld: Path = IMAGES / f"{mob_id}" / ""
if not img_fld.exists():
await mkdir(img_fld)
stnc_fld: Path = img_fld / f"{frm_nm}" / ""
if not stnc_fld.exists():
await mkdir(stnc_fld)
for i in range(frm_ct):
img_path: Path = stnc_fld / f"{frm_nm}.{i+1}.png"
if not img_path.exists():
async with session.request(
"GET",
f"https://maplestory.io/api/GMS/233/mob/{mob_id}/render/{frm_nm}/{i+1}/",
) as resp:
async with aiofiles.open(img_path, "wb") as img:
await img.write(await resp.read())
AVAILABLE.setdefault(int(mob_id), {})
AVAILABLE[mob_id].setdefault(frm_nm, {})
AVAILABLE[mob_id][frm_nm] = stnc_fld
async def fill_cache():
data = {}
async with ClientSession() as session:
async with session.request(
"GET", "https://maplestory.io/api/GMS/233/mob/"
) as resp:
data = await resp.json()
mob_ids = {int(mob["id"]) for mob in data} | set(available_mob_ids)
for mob in data:
if mob["id"] in mob_ids:
MOB_DATA[mob["id"]] = Mob(
_id=mob["id"],
name=mob["name"],
stance=AVAILABLE.get(int(mob["id"]), {}),
)
if not CACHING.is_set():
get_running_loop().create_task(
update_lib({int(m["id"]) for m in data if int(m["id"]) not in mob_ids})
)
CACHING.set()
# async def app():
# await run_in_threadpool(lambda: run("server:wsgi_app"))
wsgi_app.add_exception_handler(404, handler=sumshit)