|
|
from fastapi import FastAPI, Query, BackgroundTasks |
|
|
from fastapi.responses import FileResponse |
|
|
import aiohttp |
|
|
import asyncio |
|
|
import time |
|
|
import tempfile |
|
|
import zipfile |
|
|
import os |
|
|
from pydantic import BaseModel |
|
|
from typing import List, Dict |
|
|
|
|
|
|
|
|
img_base = 'https://i.pximg.net/img-original/img/' |
|
|
|
|
|
class pixifModel(BaseModel): |
|
|
post_ids: List[int] |
|
|
phpsessid: str |
|
|
|
|
|
class PixifDownloadModel(BaseModel): |
|
|
posts: Dict[str, str] |
|
|
|
|
|
class PixifZipModel(BaseModel): |
|
|
d: Dict[str, str] |
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", |
|
|
'referer': 'https://www.pixiv.net/', |
|
|
} |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
async def fetch_page(session, url): |
|
|
async with session.get(url) as response: |
|
|
data = await response.json() |
|
|
return data |
|
|
|
|
|
def build_cookies(phpsessid: str) -> Dict[str, str]: |
|
|
return {"PHPSESSID": phpsessid} |
|
|
|
|
|
def base26_time(): |
|
|
x='' |
|
|
n=int(time.time()*100) |
|
|
while n: |
|
|
x=chr(97+n%26)+x |
|
|
n//=26 |
|
|
|
|
|
return x |
|
|
|
|
|
def determine_exif_type(metadata): |
|
|
if metadata is None: |
|
|
return None |
|
|
elif metadata == b'TitleAI generated image': |
|
|
return "novelai" |
|
|
elif metadata.startswith(b"parameter"): |
|
|
return "sd" |
|
|
elif b'{"' in metadata: |
|
|
return "comfy" |
|
|
elif b"Dig" in metadata: |
|
|
return "mj" |
|
|
elif metadata.startswith(b"SoftwareCelsys"): |
|
|
return "celsys" |
|
|
else: |
|
|
return "photoshop" |
|
|
|
|
|
async def get_exif(url, session): |
|
|
start_range = 0 |
|
|
end_range = 512 |
|
|
|
|
|
headers = { |
|
|
"Referer": "https://www.pixiv.net/", |
|
|
"Range": f"bytes={start_range}-{end_range}" |
|
|
} |
|
|
|
|
|
async with session.get(url, headers=headers) as response: |
|
|
data = await response.read() |
|
|
return parse_png_metadata(data) |
|
|
|
|
|
def parse_png_metadata(data): |
|
|
index = 8 |
|
|
|
|
|
while index < len(data): |
|
|
if index + 8 > len(data): |
|
|
break |
|
|
chunk_len = int.from_bytes(data[index:index+4], 'big') |
|
|
chunk_type = data[index+4:index+8].decode('ascii') |
|
|
index += 8 |
|
|
|
|
|
if chunk_type in ['tEXt', 'iTXt']: |
|
|
content = data[index:index+chunk_len] |
|
|
if chunk_type == 'tEXt': |
|
|
return content.replace(b'\0', b'') |
|
|
elif chunk_type == 'iTXt': |
|
|
return content.strip() |
|
|
|
|
|
index += chunk_len + 4 |
|
|
return None |
|
|
|
|
|
async def process_post(post_id, session, semaphore): |
|
|
async with semaphore: |
|
|
try: |
|
|
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") |
|
|
image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] |
|
|
|
|
|
initial_offsets = [1, 5, 5, 10, 10, 10] |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
for offset in initial_offsets: |
|
|
end = start + offset |
|
|
if end > len(image_urls): |
|
|
end = len(image_urls) |
|
|
chunks.append((start, end)) |
|
|
start = end |
|
|
|
|
|
while start < len(image_urls): |
|
|
end = min(start + 10, len(image_urls)) |
|
|
chunks.append((start, end)) |
|
|
start = end |
|
|
|
|
|
for s, e in chunks: |
|
|
chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] |
|
|
results = await asyncio.gather(*chunk_tasks) |
|
|
|
|
|
for image_url, metadata in zip(image_urls[s:e], results): |
|
|
exif_type = determine_exif_type(metadata) |
|
|
if exif_type not in ['photoshop', 'celsys', None]: |
|
|
return post_id, image_url |
|
|
|
|
|
return post_id, None |
|
|
except Exception: |
|
|
return post_id, None |
|
|
|
|
|
async def fetch_image_bytes(session, url, post_id, semaphore): |
|
|
async with semaphore: |
|
|
async with session.get(url) as response: |
|
|
return post_id, await response.read() |
|
|
|
|
|
@app.get("/allimages") |
|
|
async def all_images( |
|
|
only_first = Query("0", description="Only fetch the first image of each post."), |
|
|
post_ids: List[int] = Query(..., alias='post_ids'), |
|
|
phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") |
|
|
): |
|
|
cookies = build_cookies(phpsessid) |
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
tasks = [] |
|
|
for pid in post_ids: |
|
|
tasks.append(fetch_page(session, f"https://www.pixiv.net/ajax/illust/{pid}/pages")) |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
all_image_urls = [] |
|
|
for data in results: |
|
|
if "body" in data: |
|
|
for page in data["body"]: |
|
|
if "urls" in page and "original" in page["urls"]: |
|
|
all_image_urls.append(page["urls"]["original"]) |
|
|
|
|
|
if only_first != "0": |
|
|
break |
|
|
|
|
|
return {"image_urls": all_image_urls} |
|
|
|
|
|
@app.get("/allimage") |
|
|
async def all_image( |
|
|
post_id: int = Query(..., description="The post ID to fetch all images from."), |
|
|
phpsessid: str = Query(..., description="Pixiv PHPSESSID value.") |
|
|
): |
|
|
cookies = build_cookies(phpsessid) |
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
try: |
|
|
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") |
|
|
image_urls = [page['urls']['original'] for page in data['body']] |
|
|
return {"post_id": post_id, "image_urls": image_urls} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
@app.post("/pixif") |
|
|
async def pixif( |
|
|
items: pixifModel |
|
|
): |
|
|
post_ids = items.post_ids |
|
|
semaphore = asyncio.Semaphore(100) |
|
|
cookies = build_cookies(items.phpsessid) |
|
|
|
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] |
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} |
|
|
|
|
|
return image_exifs |
|
|
|
|
|
@app.post("/pixif_zip") |
|
|
async def pixif_zip(items: PixifZipModel, background_tasks: BackgroundTasks): |
|
|
downloads = items.d |
|
|
if not downloads: |
|
|
return {"detail": "No downloads requested."} |
|
|
|
|
|
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") |
|
|
tmp_path = tmp_file.name |
|
|
tmp_file.close() |
|
|
|
|
|
connector = aiohttp.TCPConnector(limit=20) |
|
|
semaphore = asyncio.Semaphore(20) |
|
|
async with aiohttp.ClientSession(headers=headers, connector=connector) as session: |
|
|
tasks = [] |
|
|
for post_id, url in downloads.items(): |
|
|
full_url = url if url.startswith("http") else img_base + url |
|
|
tasks.append(asyncio.create_task(fetch_image_bytes(session, full_url, post_id, semaphore))) |
|
|
|
|
|
with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: |
|
|
for task in asyncio.as_completed(tasks): |
|
|
try: |
|
|
post_id, data = await task |
|
|
except Exception: |
|
|
continue |
|
|
if data: |
|
|
zf.writestr(f"{post_id}.png", data) |
|
|
|
|
|
background_tasks.add_task(os.remove, tmp_path) |
|
|
filename = f"pixif_{base26_time()}.zip" |
|
|
return FileResponse(tmp_path, media_type="application/zip", filename=filename) |
|
|
|
|
|
@app.get("/") |
|
|
async def read_root(): |
|
|
return {"message": "Hello, World!"} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="127.0.0.1", port=7860) |
|
|
|