p / API /app.py
q6's picture
server client
bd75875
raw
history blame
6.04 kB
from fastapi import FastAPI, Query
import aiohttp
import asyncio
import time
from pydantic import BaseModel
from typing import List, Dict
img_base = 'https://i.pximg.net/img-original/img/'
class pixifModel(BaseModel):
post_ids: List[int]
phpsessid: str
class PixifDownloadModel(BaseModel):
posts: Dict[str, str]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
'referer': 'https://www.pixiv.net/',
}
app = FastAPI()
async def fetch_page(session, url):
async with session.get(url) as response:
data = await response.json()
return data
def build_cookies(phpsessid: str) -> Dict[str, str]:
return {"PHPSESSID": phpsessid}
def base26(n):
if n == 0:
return "A"
b26 = ""
while n > 0:
n, remainder = divmod(n, 26)
b26 = chr(97 + remainder) + b26
return b26
def base26_time():
return base26(int(time.time()))
def determine_exif_type(metadata):
if metadata is None:
return None
elif metadata == b'TitleAI generated image':
return "novelai"
elif metadata.startswith(b"parameter"):
return "sd"
elif b'{"' in metadata:
return "comfy"
elif b"Dig" in metadata:
return "mj"
elif metadata.startswith(b"SoftwareCelsys"):
return "celsys"
else:
return "photoshop"
async def get_exif(url, session):
start_range = 0
end_range = 512
headers = {
"Referer": "https://www.pixiv.net/",
"Range": f"bytes={start_range}-{end_range}"
}
async with session.get(url, headers=headers) as response:
data = await response.read()
return parse_png_metadata(data)
def parse_png_metadata(data):
index = 8
while index < len(data):
if index + 8 > len(data):
break
chunk_len = int.from_bytes(data[index:index+4], 'big')
chunk_type = data[index+4:index+8].decode('ascii')
index += 8
if chunk_type in ['tEXt', 'iTXt']:
content = data[index:index+chunk_len]
if chunk_type == 'tEXt':
return content.replace(b'\0', b'')
elif chunk_type == 'iTXt':
return content.strip()
index += chunk_len + 4
return None
async def process_post(post_id, session, semaphore):
async with semaphore:
try:
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages")
image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']]
print(image_urls)
initial_offsets = [1, 5, 5, 10, 10, 10]
chunks = []
start = 0
for offset in initial_offsets:
end = start + offset
if end > len(image_urls):
end = len(image_urls)
chunks.append((start, end))
start = end
while start < len(image_urls):
end = min(start + 10, len(image_urls))
chunks.append((start, end))
start = end
for s, e in chunks:
chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)]
results = await asyncio.gather(*chunk_tasks)
for image_url, metadata in zip(image_urls[s:e], results):
exif_type = determine_exif_type(metadata)
if exif_type not in ['photoshop', 'celsys', None]:
return post_id, image_url
return post_id, None
except Exception:
return post_id, None
@app.get("/allimages")
async def all_images(
only_first = Query("0", description="Only fetch the first image of each post."),
post_ids: List[int] = Query(..., alias='post_ids'),
phpsessid: str = Query(..., description="Pixiv PHPSESSID value.")
):
cookies = build_cookies(phpsessid)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
tasks = []
for pid in post_ids:
tasks.append(fetch_page(session, f"https://www.pixiv.net/ajax/illust/{pid}/pages"))
results = await asyncio.gather(*tasks)
all_image_urls = []
for data in results:
if "body" in data:
for page in data["body"]:
if "urls" in page and "original" in page["urls"]:
all_image_urls.append(page["urls"]["original"])
if only_first != "0":
break
return {"image_urls": all_image_urls}
@app.get("/allimage")
async def all_image(
post_id: int = Query(..., description="The post ID to fetch all images from."),
phpsessid: str = Query(..., description="Pixiv PHPSESSID value.")
):
cookies = build_cookies(phpsessid)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
try:
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages")
image_urls = [page['urls']['original'] for page in data['body']]
return {"post_id": post_id, "image_urls": image_urls}
except Exception as e:
return {"error": str(e)}
@app.post("/pixif")
async def pixif(
items: pixifModel
):
post_ids = items.post_ids
semaphore = asyncio.Semaphore(8)
cookies = build_cookies(items.phpsessid)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
tasks = [process_post(post_id, session, semaphore) for post_id in post_ids]
results = await asyncio.gather(*tasks)
image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url}
print(image_exifs)
return image_exifs
@app.get("/")
async def read_root():
return {"message": "Hello, World!"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=7860)