from asyncio import Event, get_running_loop, sleep
from io import BytesIO
from numbers import Number
from pathlib import Path
from random import choice
from re import compile
from typing import Any
import aiofiles
from aiofiles.os import mkdir
from aiohttp import ClientSession, ContentTypeError
from fastapi import Depends, FastAPI, Response
from fastapi.responses import HTMLResponse
from jinja2 import Environment, FileSystemLoader, select_autoescape
from PIL import UnidentifiedImageError
from PIL.Image import Image
from PIL.Image import open as open_image, new
from PIL.PyAccess import _PyAccess32_4
from starlette.requests import Request
# from starlette.responses import FileResponse
try:
from uvloop import install
install()
except ImportError:
...
env = Environment(
loader=FileSystemLoader("./static/"),
autoescape=select_autoescape(),
trim_blocks=True,
)
DIGITS = compile(r"^[0-9]+$")
VERSION = compile(r"(?=\w\d?\.(\d+)\.png)")
IMAGES = Path("./images/sorted/Mob/")
PAGES = Path("./pages/")
SVGS = Path("./svgs/")
mob_mapping = {}
stance_mapping = []
for p in SVGS.iterdir():
if not p.is_dir():
continue
for m in p.glob("./*.svg"):
mob_mapping.setdefault(p.name, list())
mob_mapping[p.name].append(m.name.rstrip(".svg"))
stance_mapping.append(p.name)
stance_mapping = sorted(stance_mapping)
PAGE_PATHS: list[Path] = []
TEMPLATE = env.get_template("./pages/page.j2")
HUGE_FILE = env.get_template("./pages/huge_file.j2")
HOVER = env.get_template("./pages/hover.j2")
wsgi_app = FastAPI(title="ra.tcp.direct", description="ra.tcp.direct")
AVAILABLE = {}
CURRENT = {}
MOB_DATA = {}
ERROR_PATHS: dict[int, set[str]] = {}
CACHING = Event()
for mob_folder in IMAGES.glob("./*/"):
mob_id = int(mob_folder.name)
for stance_folder in mob_folder.glob("./*/"):
stance = stance_folder.name
AVAILABLE.setdefault(mob_id, {})
AVAILABLE[mob_id][stance] = stance_folder
for mob_stance in SVGS.glob("./*/*.svg"):
mob_id = mob_stance.name.removesuffix(".svg")
stance = mob_stance.parent.name
CURRENT.setdefault(mob_id, {})
CURRENT[mob_id][stance] = mob_stance
available_mob_ids: list[int] = [int(k) for k in AVAILABLE.keys()]
class SVGPath:
def __init__(
self, color: list[Number], x: Number, y: Number, scale: Number, len_x: Number
):
self.color = color
self.x = x
self.y = y
self.scale = scale
self.len_x = len_x
def __eq__(self, obj):
if isinstance(obj, dict):
c1, c2, c3, c4 = obj.get("color", tuple(bytearray(4)))
if self.color == (c1, c2, c3, c4):
return True
return self.color[0] == obj
class Mob:
__slots__ = "id", "name", "stance"
def __init__(self, _id=0, name="", stance=None):
self.id = _id
self.name = name
if stance:
if len(stance) > 0:
self.stance = stance
def gen_path(path_vars: tuple[list[Number], Number, Number, Number, Number]):
color, x, y, scale, len_x = path_vars
def render_html(img: Image, frame_id=None, link_id=None, scale=1):
pixels: _PyAccess32_4 = img.load() # type: ignore
width, height = img.size
nodes = []
x, y = 0, 0
def count_x(x_start, y):
i = x_start
start_val = pixels[x_start, y]
for xx in range(x_start + 1, width):
if (pixels[xx, y] == start_val) and (
pixels[xx, y][3] / 255 == start_val[3] / 255
):
i += 1
else:
break
return i - x_start
while y < height:
if y >= height - 1:
break
len_x = count_x(x, y)
color = pixels[x, y]
if (color[0], color[1], color[2], color[3]) != (0, 0, 0, 0.0):
nodes.append(
'
ᓚᘏᗢ" ), status_code=451, headers={"encoding": "utf-8", "content-type": "text/html; charset=utf-8"}, ) @wsgi_app.get("/hover", status_code=200) async def hover(): mobs = mob_mapping["stand"] return HTMLResponse(HOVER.render(mobs=mobs, stance="stand", stances=stance_mapping)) @wsgi_app.get("/hover/{stance:str}", status_code=200) async def hover_stance(stance: str): mobs = mob_mapping.get(stance, mob_mapping["stand"]) return HTMLResponse(HOVER.render(mobs=mobs, stance=stance, stances=stance_mapping)) async def update_lib(upd_ids) -> None: i = 0 async with ClientSession() as session: for mob_id in upd_ids: if AVAILABLE.get(int(mob_id), {}): continue AVAILABLE.setdefault(int(mob_id), {}) jsn = {} async with session.request( "GET", f"https://maplestory.io/api/GMS/233/mob/{mob_id}/" ) as resp2: try: jsn: dict[str, Any] = await resp2.json() except ContentTypeError: continue for frm_nm, frm_ct in jsn.get("framebooks", {}).items(): if i > 20: await sleep(10) i = 0 if frm_nm in ERROR_PATHS.get(mob_id, set()): continue img_fld: Path = IMAGES / f"{mob_id}" / "" if not img_fld.exists(): await mkdir(img_fld) stnc_fld: Path = img_fld / f"{frm_nm}" / "" if not stnc_fld.exists(): await mkdir(stnc_fld) for i in range(frm_ct): img_path: Path = stnc_fld / f"{frm_nm}.{i+1}.png" if not img_path.exists(): async with session.request( "GET", f"https://maplestory.io/api/GMS/233/mob/{mob_id}/render/{frm_nm}/{i+1}/", ) as resp: async with aiofiles.open(img_path, "wb") as img: await img.write(await resp.read()) AVAILABLE.setdefault(int(mob_id), {}) AVAILABLE[mob_id].setdefault(frm_nm, {}) AVAILABLE[mob_id][frm_nm] = stnc_fld async def fill_cache(): data = {} async with ClientSession() as session: async with session.request( "GET", "https://maplestory.io/api/GMS/233/mob/" ) as resp: data = await resp.json() mob_ids = {int(mob["id"]) for mob in data} | set(available_mob_ids) for mob in data: if mob["id"] in mob_ids: MOB_DATA[mob["id"]] = Mob( _id=mob["id"], name=mob["name"], stance=AVAILABLE.get(int(mob["id"]), {}), ) if not CACHING.is_set(): get_running_loop().create_task( update_lib({int(m["id"]) for m in data if int(m["id"]) not in mob_ids}) ) CACHING.set() # async def app(): # await run_in_threadpool(lambda: run("server:wsgi_app")) wsgi_app.add_exception_handler(404, handler=sumshit)