from asyncio import Event, get_running_loop, sleep from io import BytesIO from numbers import Number from pathlib import Path from random import choice from re import compile from typing import Any import aiofiles from aiofiles.os import mkdir from aiohttp import ClientSession, ContentTypeError from fastapi import Depends, FastAPI, Response from fastapi.responses import HTMLResponse from jinja2 import Environment, FileSystemLoader, select_autoescape from PIL import UnidentifiedImageError from PIL.Image import Image from PIL.Image import open as open_image, new from PIL.PyAccess import _PyAccess32_4 from starlette.requests import Request # from starlette.responses import FileResponse try: from uvloop import install install() except ImportError: ... env = Environment( loader=FileSystemLoader("./static/"), autoescape=select_autoescape(), trim_blocks=True, ) DIGITS = compile(r"^[0-9]+$") VERSION = compile(r"(?=\w\d?\.(\d+)\.png)") IMAGES = Path("./images/sorted/Mob/") PAGES = Path("./pages/") SVGS = Path("./svgs/") mob_mapping = {} stance_mapping = [] for p in SVGS.iterdir(): if not p.is_dir(): continue for m in p.glob("./*.svg"): mob_mapping.setdefault(p.name, list()) mob_mapping[p.name].append(m.name.rstrip(".svg")) stance_mapping.append(p.name) stance_mapping = sorted(stance_mapping) PAGE_PATHS: list[Path] = [] TEMPLATE = env.get_template("./pages/page.j2") HUGE_FILE = env.get_template("./pages/huge_file.j2") HOVER = env.get_template("./pages/hover.j2") wsgi_app = FastAPI(title="ra.tcp.direct", description="ra.tcp.direct") AVAILABLE = {} CURRENT = {} MOB_DATA = {} ERROR_PATHS: dict[int, set[str]] = {} CACHING = Event() for mob_folder in IMAGES.glob("./*/"): mob_id = int(mob_folder.name) for stance_folder in mob_folder.glob("./*/"): stance = stance_folder.name AVAILABLE.setdefault(mob_id, {}) AVAILABLE[mob_id][stance] = stance_folder for mob_stance in SVGS.glob("./*/*.svg"): mob_id = mob_stance.name.removesuffix(".svg") stance = mob_stance.parent.name CURRENT.setdefault(mob_id, {}) CURRENT[mob_id][stance] = mob_stance available_mob_ids: list[int] = [int(k) for k in AVAILABLE.keys()] class SVGPath: def __init__( self, color: list[Number], x: Number, y: Number, scale: Number, len_x: Number ): self.color = color self.x = x self.y = y self.scale = scale self.len_x = len_x def __eq__(self, obj): if isinstance(obj, dict): c1, c2, c3, c4 = obj.get("color", tuple(bytearray(4))) if self.color == (c1, c2, c3, c4): return True return self.color[0] == obj class Mob: __slots__ = "id", "name", "stance" def __init__(self, _id=0, name="", stance=None): self.id = _id self.name = name if stance: if len(stance) > 0: self.stance = stance def gen_path(path_vars: tuple[list[Number], Number, Number, Number, Number]): color, x, y, scale, len_x = path_vars def render_html(img: Image, frame_id=None, link_id=None, scale=1): pixels: _PyAccess32_4 = img.load() # type: ignore width, height = img.size nodes = [] x, y = 0, 0 def count_x(x_start, y): i = x_start start_val = pixels[x_start, y] for xx in range(x_start + 1, width): if (pixels[xx, y] == start_val) and ( pixels[xx, y][3] / 255 == start_val[3] / 255 ): i += 1 else: break return i - x_start while y < height: if y >= height - 1: break len_x = count_x(x, y) color = pixels[x, y] if (color[0], color[1], color[2], color[3]) != (0, 0, 0, 0.0): nodes.append( '' ) x += len_x + 1 if x >= width: x = 0 y += 1 if frame_id is not None: begin = f"l{link_id}l.end" if frame_id != 0 else f"0s;l{link_id}l.end" animate = f"""""" else: animate = "" visible = 'visibility="hidden"' if link_id is not None else "" return f'{animate}{"".join(nodes)}' async def render_page( folder_path: Path, mob_id: None | int = None, stance: None | str = None ): if not mob_id: mob_id = int(folder_path.parent.name) if not stance: stance = folder_path.name if (svg_file := (SVGS / f"{stance}" / f"{mob_id}.svg")).exists(): async with aiofiles.open(svg_file, "r") as svg: return await svg.read() frames = [] imgs = {} widths = [] heights = [] scale = 1 for i, img_file in enumerate(folder_path.glob("./*.png")): version = VERSION.search(img_file.name) if version: v = int(version.group(1)) else: v = i async with aiofiles.open(img_file, "rb") as img_: try: img = open_image(BytesIO(await img_.read())) except UnidentifiedImageError: return "" widths.append(img.size[0]) heights.append(img.size[1]) imgs[v] = img keys = sorted(imgs.keys(), key=lambda x: x) imgs = [imgs[k] for k in keys] if not len(widths) or not len(heights): return "" maxes = (max(widths), max(heights)) for a, image in enumerate(imgs): box = new("RGBA", maxes) box.paste(image, (maxes[0] - image.size[0], maxes[1] - image.size[1])) img_bytes = BytesIO() box.save(img_bytes, "PNG") if a < len(imgs) or a == 0: lid = a - 1 else: lid = 0 if a == 0: lid = len(imgs) - 1 frame_id = a if len(imgs) > 1 else None else: frame_id = a frames.append( render_html( box, frame_id=frame_id, link_id=lid if len(imgs) > 1 else None, scale=scale, ) ) if len(frames) < 1: return "" svg_folder = SVGS / f"{stance}" if not svg_folder.exists(): await mkdir(svg_folder) svg_file = svg_folder / f"{mob_id}.svg" svg_cont = ( '' f'{"".join(frames)}' ) async with aiofiles.open(svg_file, "w") as svg: await svg.write(svg_cont) CURRENT.setdefault(mob_id, {}) CURRENT[mob_id][stance] = svg_file return svg_cont async def get_rand_page(): confirmed = False page_file = None while not confirmed: mob_id = int(choice(available_mob_ids)) stance = choice(list(AVAILABLE[mob_id].keys())) page_file = CURRENT.get(mob_id, {}).get(stance, None) huge_file = False if not page_file: total_bytes = 0 confirmed = False for f in AVAILABLE[mob_id][stance].glob("*.png"): try: async with aiofiles.open(f, "rb") as img: byt = BytesIO(await img.read()) total_bytes += len(byt.getbuffer()) if total_bytes > 500000: huge_file = True if total_bytes < 100: continue if not confirmed: open_image(byt) confirmed = True except UnidentifiedImageError: ERROR_PATHS.setdefault(mob_id, set()) ERROR_PATHS[mob_id].add(stance) continue if huge_file: return HUGE_FILE.render(mob_id=mob_id, stance=stance) page = await render_page( AVAILABLE[mob_id][stance], mob_id=mob_id, stance=stance ) if page: return TEMPLATE.render(frame=page, mob_id=mob_id, stance=stance) else: ERROR_PATHS.setdefault(mob_id, set()) ERROR_PATHS[mob_id].add(stance) continue break if page_file: async with aiofiles.open(page_file, "r") as ret_page: return await ret_page.read() async def get_specific_mob(mob_id: int = 0, stance: str | None = None): if not stance: stance = choice(list(AVAILABLE.get(mob_id, []))) possibly_page = CURRENT.get(mob_id, {}).get(stance, None) if possibly_page and possibly_page.exists(): async with aiofiles.open(possibly_page, "r") as cont: return TEMPLATE.render( frame=await cont.read(), mob_id=mob_id, stance=stance ) possibly_exists = AVAILABLE.get(mob_id, {}).get(stance, None) if possibly_exists and possibly_exists.exists(): return TEMPLATE.render( frame=await render_page(possibly_exists, mob_id=mob_id, stance=stance), mob_id=mob_id, stance=stance, ) return "Does not Exist" @wsgi_app.get("/", status_code=200) async def rand_page(page: str = Depends(get_rand_page)): return HTMLResponse(page) @wsgi_app.get("/mob/", status_code=200) async def view_mobs(): mob_page = env.get_template("pages/browse.j2") next_ = 6 if (len(MOB_DATA) // 500) - 6 > 0 else 6 - abs((len(MOB_DATA) // 500) - 6) return HTMLResponse( mob_page.render( mobs=dict(tuple(MOB_DATA.items())[0:500]), total=len(MOB_DATA.values()) // 500, current=1, prev=[" ", " ", " ", " ", " "], next=[_ for _ in range(1, next_)], ) ) @wsgi_app.get("/mob/page/{page:int}/", status_code=200) async def view_mob_page(page: int): if page < 1 or page * 500 > len(MOB_DATA): return HTMLResponse("Invalid Page Number", status_code=404) if abs(page - 5) + (page - 5) == 0: prev = 1 else: prev = page - 5 next_ = ( page + 5 if (len(MOB_DATA) // 500) - (page + 5) > 0 else page + (5 - abs((len(MOB_DATA) // 500) - (page + 5))) ) mob_page = env.get_template("pages/browse.j2") prev = [" " for i in range(5 - (page - (prev)))] + [ f"{i}" for i in range(prev, page) ] next_ = [f"{i}" for i in range(page + 1, next_)] + [ " " for i in range(5 - (next_ - (page + 1))) ] return HTMLResponse( mob_page.render( mobs=dict(tuple(MOB_DATA.items())[slice((page - 1) * 500, page * 500)]), total=len(MOB_DATA.values()) // 500, prev=prev, current=page, next=next_, ) ) @wsgi_app.get("/mob/{mob_id:int}/", status_code=200) async def view_mob(mob_id: int) -> HTMLResponse: data = await get_specific_mob(mob_id) return HTMLResponse(data) @wsgi_app.get("/assets/css/combo.css", status_code=200) async def get_combo_css() -> HTMLResponse: async with aiofiles.open("static/css/combo.css", "r") as f: return HTMLResponse(await f.read(), media_type="text/css") @wsgi_app.get("/assets/fonts/Symbols-2048-em Nerd Font Complete.woff2", status_code=200) async def get_nf_font() -> Response: async with aiofiles.open("static/fonts/nf-complete.woff2", "rb") as f: return Response(await f.read(), media_type="font/woff2") @wsgi_app.get("/assets/js/site.js", status_code=200) async def get_site_js() -> Response: async with aiofiles.open("static/js/site.js", "rb") as f: return Response(await f.read(), media_type="") @wsgi_app.get("/mob/{mob_id:int}/{stance}", status_code=200) async def view_mob_stance(mob_id: int, stance: str) -> HTMLResponse: data = await get_specific_mob(mob_id, stance) return HTMLResponse(data) async def startup(): get_running_loop().create_task(fill_cache()) wsgi_app.add_event_handler("startup", startup) @wsgi_app.get("/export/mob/{mob_id:int}_{stance:path}.svg", status_code=200) async def export_mob_svg(mob_id: int, stance: str): mob_id = int(mob_id) possibly_page = CURRENT.get(mob_id, {}).get(stance, None) if possibly_page and possibly_page.exists(): async with aiofiles.open(possibly_page, "r") as page: return Response( await page.read(), media_type="image/svg+xml", headers={"Content-Type": "image/svg+xml"}, ) possibly_exists = AVAILABLE.get(mob_id, {}).get(stance, None) if possibly_exists and possibly_exists.exists(): return Response( await render_page(possibly_exists, mob_id=mob_id, stance=stance), headers={"Content-Type": "image/svg+xml", "Vary": "Accept-Encoding"}, ) return HTMLResponse("No data") async def sumshit(arg1: Request, arg2) -> Response: return Response( content=( "" "
ᓚᘏᗢ
" ), status_code=451, headers={"encoding": "utf-8", "content-type": "text/html; charset=utf-8"}, ) @wsgi_app.get("/hover", status_code=200) async def hover(): mobs = mob_mapping["stand"] return HTMLResponse(HOVER.render(mobs=mobs, stance="stand", stances=stance_mapping)) @wsgi_app.get("/hover/{stance:str}", status_code=200) async def hover_stance(stance: str): mobs = mob_mapping.get(stance, mob_mapping["stand"]) return HTMLResponse(HOVER.render(mobs=mobs, stance=stance, stances=stance_mapping)) async def update_lib(upd_ids) -> None: i = 0 async with ClientSession() as session: for mob_id in upd_ids: if AVAILABLE.get(int(mob_id), {}): continue AVAILABLE.setdefault(int(mob_id), {}) jsn = {} async with session.request( "GET", f"https://maplestory.io/api/GMS/233/mob/{mob_id}/" ) as resp2: try: jsn: dict[str, Any] = await resp2.json() except ContentTypeError: continue for frm_nm, frm_ct in jsn.get("framebooks", {}).items(): if i > 20: await sleep(10) i = 0 if frm_nm in ERROR_PATHS.get(mob_id, set()): continue img_fld: Path = IMAGES / f"{mob_id}" / "" if not img_fld.exists(): await mkdir(img_fld) stnc_fld: Path = img_fld / f"{frm_nm}" / "" if not stnc_fld.exists(): await mkdir(stnc_fld) for i in range(frm_ct): img_path: Path = stnc_fld / f"{frm_nm}.{i+1}.png" if not img_path.exists(): async with session.request( "GET", f"https://maplestory.io/api/GMS/233/mob/{mob_id}/render/{frm_nm}/{i+1}/", ) as resp: async with aiofiles.open(img_path, "wb") as img: await img.write(await resp.read()) AVAILABLE.setdefault(int(mob_id), {}) AVAILABLE[mob_id].setdefault(frm_nm, {}) AVAILABLE[mob_id][frm_nm] = stnc_fld async def fill_cache(): data = {} async with ClientSession() as session: async with session.request( "GET", "https://maplestory.io/api/GMS/233/mob/" ) as resp: data = await resp.json() mob_ids = {int(mob["id"]) for mob in data} | set(available_mob_ids) for mob in data: if mob["id"] in mob_ids: MOB_DATA[mob["id"]] = Mob( _id=mob["id"], name=mob["name"], stance=AVAILABLE.get(int(mob["id"]), {}), ) if not CACHING.is_set(): get_running_loop().create_task( update_lib({int(m["id"]) for m in data if int(m["id"]) not in mob_ids}) ) CACHING.set() # async def app(): # await run_in_threadpool(lambda: run("server:wsgi_app")) wsgi_app.add_exception_handler(404, handler=sumshit)