from fastapi import FastAPI, Query, BackgroundTasks from fastapi.responses import FileResponse import aiohttp import asyncio import time import tempfile import zipfile import os from pydantic import BaseModel from typing import List, Dict img_base = 'https://i.pximg.net/img-original/img/' class pixifModel(BaseModel): post_ids: List[int] phpsessid: str class PixifDownloadModel(BaseModel): posts: Dict[str, str] class PixifZipModel(BaseModel): d: Dict[str, str] headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", 'referer': 'https://www.pixiv.net/', } app = FastAPI() async def fetch_page(session, url): async with session.get(url) as response: data = await response.json() return data def build_cookies(phpsessid: str) -> Dict[str, str]: return {"PHPSESSID": phpsessid} def base26_time(): x='' n=int(time.time()*100) while n: x=chr(97+n%26)+x n//=26 return x def determine_exif_type(metadata): if metadata is None: return None elif metadata == b'TitleAI generated image': return "novelai" elif metadata.startswith(b"parameter"): return "sd" elif b'{"' in metadata: return "comfy" elif b"Dig" in metadata: return "mj" elif metadata.startswith(b"SoftwareCelsys"): return "celsys" else: return "photoshop" async def get_exif(url, session): start_range = 0 end_range = 512 headers = { "Referer": "https://www.pixiv.net/", "Range": f"bytes={start_range}-{end_range}" } async with session.get(url, headers=headers) as response: data = await response.read() return parse_png_metadata(data) def parse_png_metadata(data): index = 8 while index < len(data): if index + 8 > len(data): break chunk_len = int.from_bytes(data[index:index+4], 'big') chunk_type = data[index+4:index+8].decode('ascii') index += 8 if chunk_type in ['tEXt', 'iTXt']: content = data[index:index+chunk_len] if chunk_type == 'tEXt': return content.replace(b'\0', b'') elif chunk_type == 'iTXt': return content.strip() index += chunk_len + 4 return None async def process_post(post_id, session, semaphore): async with semaphore: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] initial_offsets = [1, 5, 5, 10, 10, 10] chunks = [] start = 0 for offset in initial_offsets: end = start + offset if end > len(image_urls): end = len(image_urls) chunks.append((start, end)) start = end while start < len(image_urls): end = min(start + 10, len(image_urls)) chunks.append((start, end)) start = end for s, e in chunks: chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] results = await asyncio.gather(*chunk_tasks) for image_url, metadata in zip(image_urls[s:e], results): exif_type = determine_exif_type(metadata) if exif_type not in ['photoshop', 'celsys', None]: return post_id, image_url return post_id, None except Exception: return post_id, None async def fetch_image_bytes(session, url, post_id, semaphore): async with semaphore: async with session.get(url) as response: return post_id, await response.read() @app.get("/allimages") async def all_images( only_first = Query("0", description="Only fetch the first image of each post."), post_ids: List[int] = Query(..., alias='post_ids'), phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") ): cookies = build_cookies(phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [] for pid in post_ids: tasks.append(fetch_page(session, f"https://www.pixiv.net/ajax/illust/{pid}/pages")) results = await asyncio.gather(*tasks) all_image_urls = [] for data in results: if "body" in data: for page in data["body"]: if "urls" in page and "original" in page["urls"]: all_image_urls.append(page["urls"]["original"]) if only_first != "0": break return {"image_urls": all_image_urls} @app.get("/allimage") async def all_image( post_id: int = Query(..., description="The post ID to fetch all images from."), phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") ): cookies = build_cookies(phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body']] return {"post_id": post_id, "image_urls": image_urls} except Exception as e: return {"error": str(e)} @app.post("/pixif") async def pixif( items: pixifModel ): post_ids = items.post_ids semaphore = asyncio.Semaphore(100) cookies = build_cookies(items.phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] results = await asyncio.gather(*tasks) image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} return image_exifs @app.post("/pixif_zip") async def pixif_zip(items: PixifZipModel, background_tasks: BackgroundTasks): downloads = items.d if not downloads: return {"detail": "No downloads requested."} tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") tmp_path = tmp_file.name tmp_file.close() connector = aiohttp.TCPConnector(limit=20) semaphore = asyncio.Semaphore(20) async with aiohttp.ClientSession(headers=headers, connector=connector) as session: tasks = [] for post_id, url in downloads.items(): full_url = url if url.startswith("http") else img_base + url tasks.append(asyncio.create_task(fetch_image_bytes(session, full_url, post_id, semaphore))) with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for task in asyncio.as_completed(tasks): try: post_id, data = await task except Exception: continue if data: zf.writestr(f"{post_id}.png", data) background_tasks.add_task(os.remove, tmp_path) filename = f"pixif_{base26_time()}.zip" return FileResponse(tmp_path, media_type="application/zip", filename=filename) @app.get("/") async def read_root(): return {"message": "Hello, World!"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=7860)