| from fastapi import FastAPI, Query |
| from fastapi.responses import FileResponse |
| import aiohttp |
| import requests |
| import asyncio |
| import time |
| import uvicorn |
| from dotenv import load_dotenv |
| import os |
| from aiofiles import open as aio_open |
| from pydantic import BaseModel |
| from typing import List, Dict |
| from zipfile import ZipFile |
|
|
| img_base = 'https://i.pximg.net/img-original/' |
| class pixifModel(BaseModel): |
| post_ids: List[int] |
|
|
| class PixifDownloadModel(BaseModel): |
| posts: Dict[str, str] |
|
|
| os.makedirs('Stash', exist_ok=True) |
|
|
| env_path = os.path.dirname(os.path.realpath(__file__)) + "/../.env" |
| |
| |
| if os.path.exists(env_path): |
| load_dotenv(env_path) |
| PHPSESSID = os.getenv("PHPSESSID") |
|
|
| print(PHPSESSID) |
| cookies = {"PHPSESSID": PHPSESSID} |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", |
| 'referer': 'https://www.pixiv.net/', |
| } |
|
|
| app = FastAPI() |
|
|
|
|
| async def fetch_page(session, url): |
| async with session.get(url) as response: |
| data = await response.json() |
| return data |
|
|
| async def search(raw, pages, ai_only=True, cookies=None, headers=None): |
| keywords = raw.split('tags/')[-1].split('/')[0] |
| url = f"https://www.pixiv.net/ajax/search/artworks/{keywords}?word={keywords}" |
| if "?" in raw: |
| params = raw.split('?')[1] |
| url += f"&{params}" |
| if "s_mode" not in url: |
| url += "&s_mode=s_tag_full" |
|
|
| post_ids = [] |
| tasks = [] |
| prev_first_id = None |
|
|
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| for page in range(1, pages + 1): |
| page_url = f"{url}&p={page}" |
| task = fetch_page(session, page_url) |
| tasks.append(task) |
|
|
| responses = await asyncio.gather(*tasks) |
| for data in responses: |
| if ai_only: |
| print(data['body']['illustManga']['data']) |
| posts = [post for post in data['body']['illustManga']['data'] if post['aiType'] == 2] |
| else: |
| posts = data['body']['illustManga']['data'] |
| if not posts: |
| break |
| current_first_id = posts[0]['id'] |
| if prev_first_id and current_first_id == prev_first_id: |
| break |
| prev_first_id = current_first_id |
| post_ids.extend([post['id'] for post in posts]) |
|
|
| return post_ids, requests.utils.unquote(keywords, encoding='utf-8') |
|
|
| def base26(n): |
| if n == 0: |
| return "A" |
|
|
| b26 = "" |
| while n > 0: |
| n, remainder = divmod(n, 26) |
| b26 = chr(97 + remainder) + b26 |
|
|
| return b26 |
|
|
| def base26_time(): |
| return base26(int(time.time())) |
|
|
| @app.get("/search") |
| async def search_endpoint( |
| raw: str = Query(..., description="The raw URL to search."), |
| pages: int = Query(1, description="Number of pages to fetch."), |
| ai_only: bool = Query(True, description="Filter for AI-generated content.") |
| ): |
| try: |
| post_ids, keywords = await search(raw, pages, ai_only, cookies=cookies, headers=headers) |
| return {"post_ids": post_ids, "filename": base26_time() + "_" + keywords} |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| @app.get("/user") |
| async def user( |
| user_id: int = Query(..., description="The user ID to fetch.") |
| ): |
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{user_id}/profile/all') |
| posts = data["body"]["illusts"].keys() |
| try: |
| username = data['body']['pickup'][0]['userName'] |
| except (KeyError, IndexError): |
| username = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}")['body']['name'] |
| |
| return {"post_ids": list(posts), "filename": base26_time() + "_" + username.replace("|", "")} |
|
|
| @app.get("/users") |
| async def users( |
| user_ids: List[int] = Query(..., description="List of user IDs to fetch.", alias="user_ids") |
| ): |
| async def fetch_user_data(session, uid): |
| try: |
| data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{uid}/profile/all') |
| posts = list(data["body"]["illusts"].keys()) |
| try: |
| username = data['body']['pickup'][0]['userName'] |
| except (KeyError, IndexError): |
| user_data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{uid}") |
| username = user_data['body']['name'] |
| filename = base26_time() + "_" + username.replace("|", "") |
| return {"post_ids": posts, "filename": filename} |
| except Exception as e: |
| return {"user_id": uid, "error": str(e)} |
|
|
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| tasks = [fetch_user_data(session, uid) for uid in user_ids] |
| results = await asyncio.gather(*tasks) |
|
|
| return results |
|
|
| def determine_exif_type(metadata): |
| if metadata is None: |
| return None |
| elif metadata == b'TitleAI generated image': |
| return "novelai" |
| elif metadata.startswith(b"parameter"): |
| return "sd" |
| elif b'{"' in metadata: |
| return "comfy" |
| elif b"Dig" in metadata: |
| return "mj" |
| elif metadata.startswith(b"SoftwareCelsys"): |
| return "celsys" |
| else: |
| return "photoshop" |
|
|
| async def get_exif(url, session): |
| start_range = 0 |
| end_range = 1024 |
|
|
| headers = { |
| "Referer": "https://www.pixiv.net/", |
| "Range": f"bytes={start_range}-{end_range}" |
| } |
|
|
| async with session.get(url, headers=headers) as response: |
| data = await response.read() |
| return parse_png_metadata(data) |
|
|
| def parse_png_metadata(data): |
| index = 8 |
|
|
| while index < len(data): |
| if index + 8 > len(data): |
| break |
| chunk_len = int.from_bytes(data[index:index+4], 'big') |
| chunk_type = data[index+4:index+8].decode('ascii') |
| index += 8 |
|
|
| if chunk_type in ['tEXt', 'iTXt']: |
| content = data[index:index+chunk_len] |
| if chunk_type == 'tEXt': |
| return content.replace(b'\0', b'') |
| elif chunk_type == 'iTXt': |
| return content.strip() |
|
|
| index += chunk_len + 4 |
| return None |
|
|
| async def process_post(post_id, session, semaphore): |
| async with semaphore: |
| try: |
| data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") |
| image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']][:12] |
| for image_url in image_urls: |
| metadata = await get_exif(image_url, session) |
| exif_type = determine_exif_type(metadata) |
| if exif_type not in ['photoshop', 'celsys', None]: |
| return post_id, image_url |
| return post_id, None |
| except Exception as e: |
| return post_id, None |
|
|
| @app.post("/pixif") |
| async def pixif( |
| items: pixifModel |
| ): |
| post_ids = items.post_ids |
| semaphore = asyncio.Semaphore(100) |
|
|
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] |
| results = await asyncio.gather(*tasks) |
|
|
| image_exifs = {post_id: image_url.replace('https://i.pximg.net/img-original/', '', 1) for post_id, image_url in results if image_url} |
| return image_exifs |
|
|
| async def download_image(session, post_id, post_url): |
| url = f"{img_base}{post_url}" |
| if os.path.exists(f"Stash/{post_id}.png"): |
| return |
| async with session.get(url) as response: |
| content = await response.read() |
| async with aio_open(f"Stash/{post_id}.png", "wb") as f: |
| await f.write(content) |
|
|
|
|
| @app.post("/download") |
| async def download( |
| items: PixifDownloadModel |
| ): |
| posts = items.posts |
| |
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| tasks = [download_image(session, post_id, image_url) for post_id, image_url in posts.items()] |
| await asyncio.gather(*tasks) |
|
|
| timezip = base26_time() |
| with ZipFile(f"Stash/{timezip}.zip", "w") as zipf: |
| for post_id in posts: |
| zipf.write(f"Stash/{post_id}.png", post_id + ".png") |
| |
| return FileResponse(path=f"Stash/{timezip}.zip", media_type="application/zip", filename=f"{timezip}.zip") |
|
|
| @app.get("/") |
| async def read_root(): |
| return {"message": "Hello, World!"} |
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="127.0.0.1", port=7860) |
|
|