from fastapi import FastAPI, Query, BackgroundTasks from fastapi.responses import FileResponse import aiohttp import asyncio import time import tempfile import zipfile import os from pydantic import BaseModel from typing import List, Dict, Optional img_base = 'https://i.pximg.net/img-original/img/' COMMENTS_ROOTS_URL = "https://www.pixiv.net/ajax/illusts/comments/roots" COMMENTS_REPLIES_URL = "https://www.pixiv.net/ajax/illusts/comments/replies" COMMENTS_LIMIT = 30 class pixifModel(BaseModel): post_ids: List[int] phpsessid: str class PixifDownloadModel(BaseModel): posts: Dict[str, str] class PixifZipModel(BaseModel): d: Dict[str, str] class PixifCommentsModel(BaseModel): post_ids: List[int] phpsessid: str limit: int = COMMENTS_LIMIT headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", 'referer': 'https://www.pixiv.net/', } app = FastAPI() async def fetch_page(session, url, params=None, extra_headers=None, semaphore: Optional[asyncio.Semaphore] = None): if semaphore: async with semaphore: async with session.get(url, params=params, headers=extra_headers) as response: data = await response.json() return data async with session.get(url, params=params, headers=extra_headers) as response: data = await response.json() return data def build_cookies(phpsessid: str) -> Dict[str, str]: return {"PHPSESSID": phpsessid} def base26_time(): x='' n=int(time.time()*100) while n: x=chr(97+n%26)+x n//=26 return x def determine_exif_type(metadata): if metadata is None: return None elif metadata == b'TitleAI generated image': return "novelai" elif metadata.startswith(b"parameter"): return "sd" elif b'{"' in metadata: return "comfy" elif b"Dig" in metadata: return "mj" elif metadata.startswith(b"SoftwareCelsys"): return "celsys" else: return "photoshop" async def get_exif(url, session): start_range = 0 end_range = 512 headers = { "Referer": "https://www.pixiv.net/", "Range": f"bytes={start_range}-{end_range}" } async with session.get(url, headers=headers) as response: data = await response.read() return parse_png_metadata(data) def parse_png_metadata(data): index = 8 while index < len(data): if index + 8 > len(data): break chunk_len = int.from_bytes(data[index:index+4], 'big') chunk_type = data[index+4:index+8].decode('ascii') index += 8 if chunk_type in ['tEXt', 'iTXt']: content = data[index:index+chunk_len] if chunk_type == 'tEXt': return content.replace(b'\0', b'') elif chunk_type == 'iTXt': return content.strip() index += chunk_len + 4 return None def format_comment_text(comment): text = comment.get("comment") or "" if not text and comment.get("stampId"): text = f"[stamp:{comment['stampId']}]" return text async def fetch_comment_replies(comment_id, post_id, session, semaphore): replies = [] page = 1 referer = {"Referer": f"https://www.pixiv.net/artworks/{post_id}"} while True: params = {"comment_id": comment_id, "page": page, "lang": "en"} payload = await fetch_page( session, COMMENTS_REPLIES_URL, params=params, extra_headers=referer, semaphore=semaphore, ) if payload.get("error"): message = payload.get("message") or "Unknown error" raise RuntimeError(f"Pixiv error for reply {comment_id}: {message}") body = payload.get("body") or {} comments = body.get("comments") or [] for reply in comments: replies.append( { "name": reply.get("userName") or "", "Comment": format_comment_text(reply), } ) if not body.get("hasNext"): break page += 1 return replies async def fetch_post_comments(post_id, session, limit, semaphore): offset = 0 results = [] referer = {"Referer": f"https://www.pixiv.net/artworks/{post_id}"} while True: params = {"illust_id": post_id, "offset": offset, "limit": limit} payload = await fetch_page( session, COMMENTS_ROOTS_URL, params=params, extra_headers=referer, semaphore=semaphore, ) if payload.get("error"): message = payload.get("message") or "Unknown error" raise RuntimeError(f"Pixiv error for {post_id}: {message}") body = payload.get("body") or {} comments = body.get("comments") or [] for comment in comments: replies = [] if comment.get("hasReplies"): try: replies = await fetch_comment_replies(comment.get("id"), post_id, session, semaphore) except Exception: replies = [] results.append( { "name": comment.get("userName") or "", "Comment": format_comment_text(comment), "Replies": replies, } ) if not body.get("hasNext"): break offset += limit return str(post_id), results async def process_post(post_id, session, semaphore): async with semaphore: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] initial_offsets = [1, 5, 5, 10, 10, 10] chunks = [] start = 0 for offset in initial_offsets: end = start + offset if end > len(image_urls): end = len(image_urls) chunks.append((start, end)) start = end while start < len(image_urls): end = min(start + 10, len(image_urls)) chunks.append((start, end)) start = end for s, e in chunks: chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] results = await asyncio.gather(*chunk_tasks) for image_url, metadata in zip(image_urls[s:e], results): exif_type = determine_exif_type(metadata) if exif_type not in ['photoshop', 'celsys', None]: return post_id, image_url return post_id, None except Exception: return post_id, None async def fetch_image_bytes(session, url, post_id, semaphore): async with semaphore: async with session.get(url) as response: return post_id, await response.read() @app.get("/allimages") async def all_images( only_first = Query("0", description="Only fetch the first image of each post."), post_ids: List[int] = Query(..., alias='post_ids'), phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") ): cookies = build_cookies(phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [] for pid in post_ids: tasks.append(fetch_page(session, f"https://www.pixiv.net/ajax/illust/{pid}/pages")) results = await asyncio.gather(*tasks) all_image_urls = [] for data in results: if "body" in data: for page in data["body"]: if "urls" in page and "original" in page["urls"]: all_image_urls.append(page["urls"]["original"]) if only_first != "0": break return {"image_urls": all_image_urls} @app.get("/allimage") async def all_image( post_id: int = Query(..., description="The post ID to fetch all images from."), phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") ): cookies = build_cookies(phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body']] return {"post_id": post_id, "image_urls": image_urls} except Exception as e: return {"error": str(e)} @app.post("/pixif") async def pixif( items: pixifModel ): post_ids = items.post_ids semaphore = asyncio.Semaphore(100) cookies = build_cookies(items.phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] results = await asyncio.gather(*tasks) image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} return image_exifs @app.post("/pixif_zip") async def pixif_zip(items: PixifZipModel, background_tasks: BackgroundTasks): downloads = items.d if not downloads: return {"detail": "No downloads requested."} tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") tmp_path = tmp_file.name tmp_file.close() connector = aiohttp.TCPConnector(limit=20) semaphore = asyncio.Semaphore(20) async with aiohttp.ClientSession(headers=headers, connector=connector) as session: tasks = [] for post_id, url in downloads.items(): full_url = url if url.startswith("http") else img_base + url tasks.append(asyncio.create_task(fetch_image_bytes(session, full_url, post_id, semaphore))) with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_STORED) as zf: for task in asyncio.as_completed(tasks): try: post_id, data = await task except Exception: continue if data: zf.writestr(f"{post_id}.png", data) background_tasks.add_task(os.remove, tmp_path) filename = f"pixif_{base26_time()}.zip" return FileResponse(tmp_path, media_type="application/zip", filename=filename) @app.post("/comments") async def comments(items: PixifCommentsModel): if not items.post_ids: return {"comments": {}} cookies = build_cookies(items.phpsessid) semaphore = asyncio.Semaphore(10) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [ fetch_post_comments(post_id, session, items.limit, semaphore) for post_id in items.post_ids ] results = await asyncio.gather(*tasks, return_exceptions=True) comments_by_post = {} for post_id, result in zip(items.post_ids, results): if isinstance(result, Exception): comments_by_post[str(post_id)] = [] else: key, comments = result comments_by_post[str(key)] = comments return {"comments": comments_by_post} @app.get("/") async def read_root(): return {"message": "Hello, World!"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=7860)