mirror of https://github.com/Rooba/MapleSVG.git
546 lines
17 KiB
Python
546 lines
17 KiB
Python
from asyncio import Event, get_running_loop, sleep
|
|
from io import BytesIO
|
|
from numbers import Number
|
|
from pathlib import Path
|
|
from random import choice
|
|
from re import compile
|
|
from typing import Any
|
|
|
|
import aiofiles
|
|
from aiofiles.os import mkdir
|
|
from aiohttp import ClientSession, ContentTypeError
|
|
from fastapi import Depends, FastAPI, Response
|
|
from fastapi.responses import HTMLResponse
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
|
from PIL import UnidentifiedImageError
|
|
from PIL.Image import Image
|
|
from PIL.Image import open as open_image, new
|
|
from PIL.PyAccess import _PyAccess32_4
|
|
from starlette.requests import Request
|
|
|
|
# from starlette.responses import FileResponse
|
|
|
|
try:
|
|
from uvloop import install
|
|
|
|
install()
|
|
except ImportError:
|
|
...
|
|
|
|
env = Environment(
|
|
loader=FileSystemLoader("./static/"),
|
|
autoescape=select_autoescape(),
|
|
trim_blocks=True,
|
|
)
|
|
DIGITS = compile(r"^[0-9]+$")
|
|
VERSION = compile(r"(?=\w\d?\.(\d+)\.png)")
|
|
|
|
IMAGES = Path("./images/sorted/Mob/")
|
|
PAGES = Path("./pages/")
|
|
SVGS = Path("./svgs/")
|
|
|
|
mob_mapping = {}
|
|
stance_mapping = []
|
|
|
|
for p in SVGS.iterdir():
|
|
if not p.is_dir():
|
|
continue
|
|
|
|
for m in p.glob("./*.svg"):
|
|
mob_mapping.setdefault(p.name, list())
|
|
mob_mapping[p.name].append(m.name.rstrip(".svg"))
|
|
|
|
stance_mapping.append(p.name)
|
|
|
|
stance_mapping = sorted(stance_mapping)
|
|
|
|
PAGE_PATHS: list[Path] = []
|
|
TEMPLATE = env.get_template("./pages/page.j2")
|
|
HUGE_FILE = env.get_template("./pages/huge_file.j2")
|
|
HOVER = env.get_template("./pages/hover.j2")
|
|
wsgi_app = FastAPI(title="ra.tcp.direct", description="ra.tcp.direct")
|
|
AVAILABLE = {}
|
|
CURRENT = {}
|
|
MOB_DATA = {}
|
|
ERROR_PATHS: dict[int, set[str]] = {}
|
|
CACHING = Event()
|
|
|
|
for mob_folder in IMAGES.glob("./*/"):
|
|
mob_id = int(mob_folder.name)
|
|
for stance_folder in mob_folder.glob("./*/"):
|
|
stance = stance_folder.name
|
|
|
|
AVAILABLE.setdefault(mob_id, {})
|
|
AVAILABLE[mob_id][stance] = stance_folder
|
|
|
|
for mob_stance in SVGS.glob("./*/*.svg"):
|
|
mob_id = mob_stance.name.removesuffix(".svg")
|
|
stance = mob_stance.parent.name
|
|
CURRENT.setdefault(mob_id, {})
|
|
CURRENT[mob_id][stance] = mob_stance
|
|
|
|
|
|
available_mob_ids: list[int] = [int(k) for k in AVAILABLE.keys()]
|
|
|
|
|
|
class SVGPath:
|
|
def __init__(
|
|
self, color: list[Number], x: Number, y: Number, scale: Number, len_x: Number
|
|
):
|
|
self.color = color
|
|
self.x = x
|
|
self.y = y
|
|
self.scale = scale
|
|
self.len_x = len_x
|
|
|
|
def __eq__(self, obj):
|
|
if isinstance(obj, dict):
|
|
c1, c2, c3, c4 = obj.get("color", tuple(bytearray(4)))
|
|
if self.color == (c1, c2, c3, c4):
|
|
return True
|
|
return self.color[0] == obj
|
|
|
|
|
|
class Mob:
|
|
__slots__ = "id", "name", "stance"
|
|
|
|
def __init__(self, _id=0, name="", stance=None):
|
|
self.id = _id
|
|
self.name = name
|
|
if stance:
|
|
if len(stance) > 0:
|
|
self.stance = stance
|
|
|
|
|
|
def gen_path(path_vars: tuple[list[Number], Number, Number, Number, Number]):
|
|
color, x, y, scale, len_x = path_vars
|
|
|
|
|
|
def render_html(img: Image, frame_id=None, link_id=None, scale=1):
|
|
pixels: _PyAccess32_4 = img.load() # type: ignore
|
|
width, height = img.size
|
|
nodes = []
|
|
x, y = 0, 0
|
|
|
|
def count_x(x_start, y):
|
|
i = x_start
|
|
start_val = pixels[x_start, y]
|
|
|
|
for xx in range(x_start + 1, width):
|
|
if (pixels[xx, y] == start_val) and (
|
|
pixels[xx, y][3] / 255 == start_val[3] / 255
|
|
):
|
|
i += 1
|
|
else:
|
|
break
|
|
|
|
return i - x_start
|
|
|
|
while y < height:
|
|
if y >= height - 1:
|
|
break
|
|
len_x = count_x(x, y)
|
|
color = pixels[x, y]
|
|
if (color[0], color[1], color[2], color[3]) != (0, 0, 0, 0.0):
|
|
nodes.append(
|
|
'<path style="fill: '
|
|
f'rgba({color[0]}, {color[1]}, {color[2]}, {color[3] / 255})" '
|
|
f'd="M{x*scale} {y*scale} H{(x+(len_x)+1)*scale} '
|
|
f'V{(y+1)*scale} L{x*scale} {(y+1)*scale} Z"/>'
|
|
)
|
|
|
|
x += len_x + 1
|
|
if x >= width:
|
|
x = 0
|
|
y += 1
|
|
|
|
if frame_id is not None:
|
|
begin = f"l{link_id}l.end" if frame_id != 0 else f"0s;l{link_id}l.end"
|
|
animate = f"""<animate attributeType="XML" attributeName="visibility"
|
|
from="visible" to="visible" id="l{frame_id}l" begin="{begin}" dur="0.1s"/>"""
|
|
else:
|
|
animate = ""
|
|
|
|
visible = 'visibility="hidden"' if link_id is not None else ""
|
|
return f'<svg {visible} height="{height*scale}" width="{width*scale}" >{animate}{"".join(nodes)}</svg>'
|
|
|
|
|
|
async def render_page(
|
|
folder_path: Path, mob_id: None | int = None, stance: None | str = None
|
|
):
|
|
if not mob_id:
|
|
mob_id = int(folder_path.parent.name)
|
|
if not stance:
|
|
stance = folder_path.name
|
|
|
|
if (svg_file := (SVGS / f"{stance}" / f"{mob_id}.svg")).exists():
|
|
async with aiofiles.open(svg_file, "r") as svg:
|
|
return await svg.read()
|
|
|
|
frames = []
|
|
imgs = {}
|
|
widths = []
|
|
heights = []
|
|
scale = 1
|
|
for i, img_file in enumerate(folder_path.glob("./*.png")):
|
|
version = VERSION.search(img_file.name)
|
|
if version:
|
|
v = int(version.group(1))
|
|
else:
|
|
v = i
|
|
async with aiofiles.open(img_file, "rb") as img_:
|
|
try:
|
|
img = open_image(BytesIO(await img_.read()))
|
|
except UnidentifiedImageError:
|
|
return ""
|
|
|
|
widths.append(img.size[0])
|
|
heights.append(img.size[1])
|
|
imgs[v] = img
|
|
|
|
keys = sorted(imgs.keys(), key=lambda x: x)
|
|
imgs = [imgs[k] for k in keys]
|
|
|
|
if not len(widths) or not len(heights):
|
|
return ""
|
|
|
|
maxes = (max(widths), max(heights))
|
|
|
|
for a, image in enumerate(imgs):
|
|
box = new("RGBA", maxes)
|
|
box.paste(image, (maxes[0] - image.size[0], maxes[1] - image.size[1]))
|
|
img_bytes = BytesIO()
|
|
box.save(img_bytes, "PNG")
|
|
if a < len(imgs) or a == 0:
|
|
lid = a - 1
|
|
else:
|
|
lid = 0
|
|
if a == 0:
|
|
lid = len(imgs) - 1
|
|
frame_id = a if len(imgs) > 1 else None
|
|
else:
|
|
frame_id = a
|
|
|
|
frames.append(
|
|
render_html(
|
|
box,
|
|
frame_id=frame_id,
|
|
link_id=lid if len(imgs) > 1 else None,
|
|
scale=scale,
|
|
)
|
|
)
|
|
|
|
if len(frames) < 1:
|
|
return ""
|
|
|
|
svg_folder = SVGS / f"{stance}"
|
|
if not svg_folder.exists():
|
|
await mkdir(svg_folder)
|
|
|
|
svg_file = svg_folder / f"{mob_id}.svg"
|
|
svg_cont = (
|
|
'<?xml version="1.0" standalone="yes"?>'
|
|
f'<svg xmlns="http://www.w3.org/2000/svg" version="1.1" '
|
|
f'width="{maxes[0]*scale}" height="{maxes[1]*scale}">{"".join(frames)}</svg>'
|
|
)
|
|
async with aiofiles.open(svg_file, "w") as svg:
|
|
await svg.write(svg_cont)
|
|
|
|
CURRENT.setdefault(mob_id, {})
|
|
CURRENT[mob_id][stance] = svg_file
|
|
return svg_cont
|
|
|
|
|
|
async def get_rand_page():
|
|
confirmed = False
|
|
page_file = None
|
|
while not confirmed:
|
|
mob_id = int(choice(available_mob_ids))
|
|
stance = choice(list(AVAILABLE[mob_id].keys()))
|
|
page_file = CURRENT.get(mob_id, {}).get(stance, None)
|
|
huge_file = False
|
|
if not page_file:
|
|
total_bytes = 0
|
|
confirmed = False
|
|
for f in AVAILABLE[mob_id][stance].glob("*.png"):
|
|
try:
|
|
async with aiofiles.open(f, "rb") as img:
|
|
byt = BytesIO(await img.read())
|
|
total_bytes += len(byt.getbuffer())
|
|
if total_bytes > 500000:
|
|
huge_file = True
|
|
|
|
if total_bytes < 100:
|
|
continue
|
|
|
|
if not confirmed:
|
|
open_image(byt)
|
|
confirmed = True
|
|
|
|
except UnidentifiedImageError:
|
|
ERROR_PATHS.setdefault(mob_id, set())
|
|
ERROR_PATHS[mob_id].add(stance)
|
|
continue
|
|
|
|
if huge_file:
|
|
return HUGE_FILE.render(mob_id=mob_id, stance=stance)
|
|
|
|
page = await render_page(
|
|
AVAILABLE[mob_id][stance], mob_id=mob_id, stance=stance
|
|
)
|
|
|
|
if page:
|
|
return TEMPLATE.render(frame=page, mob_id=mob_id, stance=stance)
|
|
else:
|
|
ERROR_PATHS.setdefault(mob_id, set())
|
|
ERROR_PATHS[mob_id].add(stance)
|
|
continue
|
|
|
|
break
|
|
|
|
if page_file:
|
|
async with aiofiles.open(page_file, "r") as ret_page:
|
|
return await ret_page.read()
|
|
|
|
|
|
async def get_specific_mob(mob_id: int = 0, stance: str | None = None):
|
|
if not stance:
|
|
stance = choice(list(AVAILABLE.get(mob_id, [])))
|
|
|
|
possibly_page = CURRENT.get(mob_id, {}).get(stance, None)
|
|
if possibly_page and possibly_page.exists():
|
|
async with aiofiles.open(possibly_page, "r") as cont:
|
|
return TEMPLATE.render(
|
|
frame=await cont.read(), mob_id=mob_id, stance=stance
|
|
)
|
|
|
|
possibly_exists = AVAILABLE.get(mob_id, {}).get(stance, None)
|
|
if possibly_exists and possibly_exists.exists():
|
|
return TEMPLATE.render(
|
|
frame=await render_page(possibly_exists, mob_id=mob_id, stance=stance),
|
|
mob_id=mob_id,
|
|
stance=stance,
|
|
)
|
|
|
|
return "Does not Exist"
|
|
|
|
|
|
@wsgi_app.get("/", status_code=200)
|
|
async def rand_page(page: str = Depends(get_rand_page)):
|
|
return HTMLResponse(page)
|
|
|
|
|
|
@wsgi_app.get("/mob/", status_code=200)
|
|
async def view_mobs():
|
|
mob_page = env.get_template("pages/browse.j2")
|
|
next_ = 6 if (len(MOB_DATA) // 500) - 6 > 0 else 6 - abs((len(MOB_DATA) // 500) - 6)
|
|
|
|
return HTMLResponse(
|
|
mob_page.render(
|
|
mobs=dict(tuple(MOB_DATA.items())[0:500]),
|
|
total=len(MOB_DATA.values()) // 500,
|
|
current=1,
|
|
prev=[" ", " ", " ", " ", " "],
|
|
next=[_ for _ in range(1, next_)],
|
|
)
|
|
)
|
|
|
|
|
|
@wsgi_app.get("/mob/page/{page:int}/", status_code=200)
|
|
async def view_mob_page(page: int):
|
|
if page < 1 or page * 500 > len(MOB_DATA):
|
|
return HTMLResponse("Invalid Page Number", status_code=404)
|
|
|
|
if abs(page - 5) + (page - 5) == 0:
|
|
prev = 1
|
|
else:
|
|
prev = page - 5
|
|
|
|
next_ = (
|
|
page + 5
|
|
if (len(MOB_DATA) // 500) - (page + 5) > 0
|
|
else page + (5 - abs((len(MOB_DATA) // 500) - (page + 5)))
|
|
)
|
|
|
|
mob_page = env.get_template("pages/browse.j2")
|
|
prev = [" " for i in range(5 - (page - (prev)))] + [
|
|
f"{i}" for i in range(prev, page)
|
|
]
|
|
next_ = [f"{i}" for i in range(page + 1, next_)] + [
|
|
" " for i in range(5 - (next_ - (page + 1)))
|
|
]
|
|
|
|
return HTMLResponse(
|
|
mob_page.render(
|
|
mobs=dict(tuple(MOB_DATA.items())[slice((page - 1) * 500, page * 500)]),
|
|
total=len(MOB_DATA.values()) // 500,
|
|
prev=prev,
|
|
current=page,
|
|
next=next_,
|
|
)
|
|
)
|
|
|
|
|
|
@wsgi_app.get("/mob/{mob_id:int}/", status_code=200)
|
|
async def view_mob(mob_id: int) -> HTMLResponse:
|
|
data = await get_specific_mob(mob_id)
|
|
return HTMLResponse(data)
|
|
|
|
|
|
@wsgi_app.get("/assets/css/combo.css", status_code=200)
|
|
async def get_combo_css() -> HTMLResponse:
|
|
async with aiofiles.open("static/css/combo.css", "r") as f:
|
|
return HTMLResponse(await f.read(), media_type="text/css")
|
|
|
|
|
|
@wsgi_app.get("/assets/fonts/Symbols-2048-em Nerd Font Complete.woff2", status_code=200)
|
|
async def get_nf_font() -> Response:
|
|
async with aiofiles.open("static/fonts/nf-complete.woff2", "rb") as f:
|
|
return Response(await f.read(), media_type="font/woff2")
|
|
|
|
|
|
@wsgi_app.get("/assets/js/site.js", status_code=200)
|
|
async def get_site_js() -> Response:
|
|
async with aiofiles.open("static/js/site.js", "rb") as f:
|
|
return Response(await f.read(), media_type="")
|
|
|
|
|
|
@wsgi_app.get("/mob/{mob_id:int}/{stance}", status_code=200)
|
|
async def view_mob_stance(mob_id: int, stance: str) -> HTMLResponse:
|
|
data = await get_specific_mob(mob_id, stance)
|
|
return HTMLResponse(data)
|
|
|
|
|
|
async def startup():
|
|
get_running_loop().create_task(fill_cache())
|
|
|
|
|
|
wsgi_app.add_event_handler("startup", startup)
|
|
|
|
|
|
@wsgi_app.get("/export/mob/{mob_id:int}_{stance:path}.svg", status_code=200)
|
|
async def export_mob_svg(mob_id: int, stance: str):
|
|
mob_id = int(mob_id)
|
|
possibly_page = CURRENT.get(mob_id, {}).get(stance, None)
|
|
if possibly_page and possibly_page.exists():
|
|
async with aiofiles.open(possibly_page, "r") as page:
|
|
return Response(
|
|
await page.read(),
|
|
media_type="image/svg+xml",
|
|
headers={"Content-Type": "image/svg+xml"},
|
|
)
|
|
|
|
possibly_exists = AVAILABLE.get(mob_id, {}).get(stance, None)
|
|
if possibly_exists and possibly_exists.exists():
|
|
return Response(
|
|
await render_page(possibly_exists, mob_id=mob_id, stance=stance),
|
|
headers={"Content-Type": "image/svg+xml", "Vary": "Accept-Encoding"},
|
|
)
|
|
|
|
return HTMLResponse("No data")
|
|
|
|
|
|
async def sumshit(arg1: Request, arg2) -> Response:
|
|
return Response(
|
|
content=(
|
|
"<body style='display: grid;background: #151515; margin: 0px;"
|
|
"color: #C4C4C4;height: 100%;align-items: center;'>"
|
|
"<pre style='text-align: center; display:inline;'>ᓚᘏᗢ</pre></body>"
|
|
),
|
|
status_code=451,
|
|
headers={"encoding": "utf-8", "content-type": "text/html; charset=utf-8"},
|
|
)
|
|
|
|
|
|
@wsgi_app.get("/hover", status_code=200)
|
|
async def hover():
|
|
mobs = mob_mapping["stand"]
|
|
return HTMLResponse(HOVER.render(mobs=mobs, stance="stand", stances=stance_mapping))
|
|
|
|
|
|
@wsgi_app.get("/hover/{stance:str}", status_code=200)
|
|
async def hover_stance(stance: str):
|
|
mobs = mob_mapping.get(stance, mob_mapping["stand"])
|
|
return HTMLResponse(HOVER.render(mobs=mobs, stance=stance, stances=stance_mapping))
|
|
|
|
|
|
async def update_lib(upd_ids) -> None:
|
|
i = 0
|
|
async with ClientSession() as session:
|
|
for mob_id in upd_ids:
|
|
if AVAILABLE.get(int(mob_id), {}):
|
|
continue
|
|
|
|
AVAILABLE.setdefault(int(mob_id), {})
|
|
jsn = {}
|
|
|
|
async with session.request(
|
|
"GET", f"https://maplestory.io/api/GMS/233/mob/{mob_id}/"
|
|
) as resp2:
|
|
try:
|
|
jsn: dict[str, Any] = await resp2.json()
|
|
except ContentTypeError:
|
|
continue
|
|
|
|
for frm_nm, frm_ct in jsn.get("framebooks", {}).items():
|
|
if i > 20:
|
|
await sleep(10)
|
|
i = 0
|
|
|
|
if frm_nm in ERROR_PATHS.get(mob_id, set()):
|
|
continue
|
|
|
|
img_fld: Path = IMAGES / f"{mob_id}" / ""
|
|
if not img_fld.exists():
|
|
await mkdir(img_fld)
|
|
|
|
stnc_fld: Path = img_fld / f"{frm_nm}" / ""
|
|
if not stnc_fld.exists():
|
|
await mkdir(stnc_fld)
|
|
|
|
for i in range(frm_ct):
|
|
img_path: Path = stnc_fld / f"{frm_nm}.{i+1}.png"
|
|
if not img_path.exists():
|
|
async with session.request(
|
|
"GET",
|
|
f"https://maplestory.io/api/GMS/233/mob/{mob_id}/render/{frm_nm}/{i+1}/",
|
|
) as resp:
|
|
async with aiofiles.open(img_path, "wb") as img:
|
|
await img.write(await resp.read())
|
|
|
|
AVAILABLE.setdefault(int(mob_id), {})
|
|
AVAILABLE[mob_id].setdefault(frm_nm, {})
|
|
AVAILABLE[mob_id][frm_nm] = stnc_fld
|
|
|
|
|
|
async def fill_cache():
|
|
data = {}
|
|
async with ClientSession() as session:
|
|
async with session.request(
|
|
"GET", "https://maplestory.io/api/GMS/233/mob/"
|
|
) as resp:
|
|
data = await resp.json()
|
|
|
|
mob_ids = {int(mob["id"]) for mob in data} | set(available_mob_ids)
|
|
|
|
for mob in data:
|
|
if mob["id"] in mob_ids:
|
|
MOB_DATA[mob["id"]] = Mob(
|
|
_id=mob["id"],
|
|
name=mob["name"],
|
|
stance=AVAILABLE.get(int(mob["id"]), {}),
|
|
)
|
|
|
|
if not CACHING.is_set():
|
|
get_running_loop().create_task(
|
|
update_lib({int(m["id"]) for m in data if int(m["id"]) not in mob_ids})
|
|
)
|
|
CACHING.set()
|
|
|
|
|
|
# async def app():
|
|
# await run_in_threadpool(lambda: run("server:wsgi_app"))
|
|
|
|
|
|
wsgi_app.add_exception_handler(404, handler=sumshit)
|