from fastapi import FastAPI, Query import aiohttp import asyncio import time from pydantic import BaseModel from typing import List, Dict img_base = 'https://i.pximg.net/img-original/img/' class pixifModel(BaseModel): post_ids: List[int] phpsessid: str class PixifDownloadModel(BaseModel): posts: Dict[str, str] headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", 'referer': 'https://www.pixiv.net/', } app = FastAPI() async def fetch_page(session, url): async with session.get(url) as response: data = await response.json() return data def build_cookies(phpsessid: str) -> Dict[str, str]: return {"PHPSESSID": phpsessid} def base26(n): if n == 0: return "A" b26 = "" while n > 0: n, remainder = divmod(n, 26) b26 = chr(97 + remainder) + b26 return b26 def base26_time(): return base26(int(time.time())) def determine_exif_type(metadata): if metadata is None: return None elif metadata == b'TitleAI generated image': return "novelai" elif metadata.startswith(b"parameter"): return "sd" elif b'{"' in metadata: return "comfy" elif b"Dig" in metadata: return "mj" elif metadata.startswith(b"SoftwareCelsys"): return "celsys" else: return "photoshop" async def get_exif(url, session): start_range = 0 end_range = 512 headers = { "Referer": "https://www.pixiv.net/", "Range": f"bytes={start_range}-{end_range}" } async with session.get(url, headers=headers) as response: data = await response.read() return parse_png_metadata(data) def parse_png_metadata(data): index = 8 while index < len(data): if index + 8 > len(data): break chunk_len = int.from_bytes(data[index:index+4], 'big') chunk_type = data[index+4:index+8].decode('ascii') index += 8 if chunk_type in ['tEXt', 'iTXt']: content = data[index:index+chunk_len] if chunk_type == 'tEXt': return content.replace(b'\0', b'') elif chunk_type == 'iTXt': return content.strip() index += chunk_len + 4 return None async def process_post(post_id, session, semaphore): async with semaphore: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] print(image_urls) initial_offsets = [1, 5, 5, 10, 10, 10] chunks = [] start = 0 for offset in initial_offsets: end = start + offset if end > len(image_urls): end = len(image_urls) chunks.append((start, end)) start = end while start < len(image_urls): end = min(start + 10, len(image_urls)) chunks.append((start, end)) start = end for s, e in chunks: chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] results = await asyncio.gather(*chunk_tasks) for image_url, metadata in zip(image_urls[s:e], results): exif_type = determine_exif_type(metadata) if exif_type not in ['photoshop', 'celsys', None]: return post_id, image_url return post_id, None except Exception: return post_id, None @app.get("/allimages") async def all_images( only_first = Query("0", description="Only fetch the first image of each post."), post_ids: List[int] = Query(..., alias='post_ids'), phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") ): cookies = build_cookies(phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [] for pid in post_ids: tasks.append(fetch_page(session, f"https://www.pixiv.net/ajax/illust/{pid}/pages")) results = await asyncio.gather(*tasks) all_image_urls = [] for data in results: if "body" in data: for page in data["body"]: if "urls" in page and "original" in page["urls"]: all_image_urls.append(page["urls"]["original"]) if only_first != "0": break return {"image_urls": all_image_urls} @app.get("/allimage") async def all_image( post_id: int = Query(..., description="The post ID to fetch all images from."), phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") ): cookies = build_cookies(phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body']] return {"post_id": post_id, "image_urls": image_urls} except Exception as e: return {"error": str(e)} @app.post("/pixif") async def pixif( items: pixifModel ): post_ids = items.post_ids semaphore = asyncio.Semaphore(8) cookies = build_cookies(items.phpsessid) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] results = await asyncio.gather(*tasks) image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} print(image_exifs) return image_exifs @app.get("/") async def read_root(): return {"message": "Hello, World!"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=7860)