p / API /app.py
q6's picture
AI
3b91e61
from fastapi import FastAPI, Query, BackgroundTasks
from fastapi.responses import FileResponse
import aiohttp
import asyncio
import time
import tempfile
import zipfile
import os
from pydantic import BaseModel
from typing import List, Dict
img_base = 'https://i.pximg.net/img-original/img/'
class pixifModel(BaseModel):
post_ids: List[int]
phpsessid: str
class PixifDownloadModel(BaseModel):
posts: Dict[str, str]
class PixifZipModel(BaseModel):
d: Dict[str, str]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
'referer': 'https://www.pixiv.net/',
}
app = FastAPI()
async def fetch_page(session, url):
async with session.get(url) as response:
data = await response.json()
return data
def build_cookies(phpsessid: str) -> Dict[str, str]:
return {"PHPSESSID": phpsessid}
def base26_time():
x=''
n=int(time.time()*100)
while n:
x=chr(97+n%26)+x
n//=26
return x
def determine_exif_type(metadata):
if metadata is None:
return None
elif metadata == b'TitleAI generated image':
return "novelai"
elif metadata.startswith(b"parameter"):
return "sd"
elif b'{"' in metadata:
return "comfy"
elif b"Dig" in metadata:
return "mj"
elif metadata.startswith(b"SoftwareCelsys"):
return "celsys"
else:
return "photoshop"
async def get_exif(url, session):
start_range = 0
end_range = 512
headers = {
"Referer": "https://www.pixiv.net/",
"Range": f"bytes={start_range}-{end_range}"
}
async with session.get(url, headers=headers) as response:
data = await response.read()
return parse_png_metadata(data)
def parse_png_metadata(data):
index = 8
while index < len(data):
if index + 8 > len(data):
break
chunk_len = int.from_bytes(data[index:index+4], 'big')
chunk_type = data[index+4:index+8].decode('ascii')
index += 8
if chunk_type in ['tEXt', 'iTXt']:
content = data[index:index+chunk_len]
if chunk_type == 'tEXt':
return content.replace(b'\0', b'')
elif chunk_type == 'iTXt':
return content.strip()
index += chunk_len + 4
return None
async def process_post(post_id, session, semaphore):
async with semaphore:
try:
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages")
image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']]
initial_offsets = [1, 5, 5, 10, 10, 10]
chunks = []
start = 0
for offset in initial_offsets:
end = start + offset
if end > len(image_urls):
end = len(image_urls)
chunks.append((start, end))
start = end
while start < len(image_urls):
end = min(start + 10, len(image_urls))
chunks.append((start, end))
start = end
for s, e in chunks:
chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)]
results = await asyncio.gather(*chunk_tasks)
for image_url, metadata in zip(image_urls[s:e], results):
exif_type = determine_exif_type(metadata)
if exif_type not in ['photoshop', 'celsys', None]:
return post_id, image_url
return post_id, None
except Exception:
return post_id, None
async def fetch_image_bytes(session, url, post_id, semaphore):
async with semaphore:
async with session.get(url) as response:
return post_id, await response.read()
@app.get("/allimages")
async def all_images(
only_first = Query("0", description="Only fetch the first image of each post."),
post_ids: List[int] = Query(..., alias='post_ids'),
phpsessid: str = Query(..., description="Pixiv PHPSESSID value.")
):
cookies = build_cookies(phpsessid)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
tasks = []
for pid in post_ids:
tasks.append(fetch_page(session, f"https://www.pixiv.net/ajax/illust/{pid}/pages"))
results = await asyncio.gather(*tasks)
all_image_urls = []
for data in results:
if "body" in data:
for page in data["body"]:
if "urls" in page and "original" in page["urls"]:
all_image_urls.append(page["urls"]["original"])
if only_first != "0":
break
return {"image_urls": all_image_urls}
@app.get("/allimage")
async def all_image(
post_id: int = Query(..., description="The post ID to fetch all images from."),
phpsessid: str = Query(..., description="Pixiv PHPSESSID value.")
):
cookies = build_cookies(phpsessid)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
try:
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages")
image_urls = [page['urls']['original'] for page in data['body']]
return {"post_id": post_id, "image_urls": image_urls}
except Exception as e:
return {"error": str(e)}
@app.post("/pixif")
async def pixif(
items: pixifModel
):
post_ids = items.post_ids
semaphore = asyncio.Semaphore(100)
cookies = build_cookies(items.phpsessid)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
tasks = [process_post(post_id, session, semaphore) for post_id in post_ids]
results = await asyncio.gather(*tasks)
image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url}
return image_exifs
@app.post("/pixif_zip")
async def pixif_zip(items: PixifZipModel, background_tasks: BackgroundTasks):
downloads = items.d
if not downloads:
return {"detail": "No downloads requested."}
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip")
tmp_path = tmp_file.name
tmp_file.close()
connector = aiohttp.TCPConnector(limit=20)
semaphore = asyncio.Semaphore(20)
async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
tasks = []
for post_id, url in downloads.items():
full_url = url if url.startswith("http") else img_base + url
tasks.append(asyncio.create_task(fetch_image_bytes(session, full_url, post_id, semaphore)))
with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for task in asyncio.as_completed(tasks):
try:
post_id, data = await task
except Exception:
continue
if data:
zf.writestr(f"{post_id}.png", data)
background_tasks.add_task(os.remove, tmp_path)
filename = f"pixif_{base26_time()}.zip"
return FileResponse(tmp_path, media_type="application/zip", filename=filename)
@app.get("/")
async def read_root():
return {"message": "Hello, World!"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=7860)