|
|
from fastapi import FastAPI, Query |
|
|
from fastapi.responses import FileResponse |
|
|
import aiohttp |
|
|
import asyncio |
|
|
import requests.utils |
|
|
import time |
|
|
import os |
|
|
import io |
|
|
from pydantic import BaseModel |
|
|
from typing import List, Dict |
|
|
import tarfile |
|
|
import tempfile |
|
|
import zstandard as zstd |
|
|
|
|
|
img_base = 'https://i.pximg.net/img-original/img/' |
|
|
|
|
|
class pixifModel(BaseModel): |
|
|
post_ids: List[int] |
|
|
|
|
|
class PixifDownloadModel(BaseModel): |
|
|
posts: Dict[str, str] |
|
|
|
|
|
env_path = os.path.dirname(os.path.realpath(__file__)) + "/../.env" |
|
|
|
|
|
PHPSESSID = os.getenv("PHPSESSID") |
|
|
|
|
|
cookies = {"PHPSESSID": PHPSESSID} |
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", |
|
|
'referer': 'https://www.pixiv.net/', |
|
|
} |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
async def fetch_page(session, url): |
|
|
async with session.get(url) as response: |
|
|
data = await response.json() |
|
|
return data |
|
|
|
|
|
async def search(raw, pages, ai_only=True, cookies=None, headers=None): |
|
|
keywords = raw.split('tags/')[-1].split('/')[0] |
|
|
url = f"https://www.pixiv.net/ajax/search/artworks/{keywords}?word={keywords}" |
|
|
if "?" in raw: |
|
|
params = raw.split('?')[1] |
|
|
url += f"&{params}" |
|
|
if "s_mode" not in url: |
|
|
url += "&s_mode=s_tag_full" |
|
|
|
|
|
post_ids = [] |
|
|
tasks = [] |
|
|
prev_first_id = None |
|
|
|
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
for page in range(1, pages + 1): |
|
|
page_url = f"{url}&p={page}" |
|
|
task = fetch_page(session, page_url) |
|
|
tasks.append(task) |
|
|
|
|
|
responses = await asyncio.gather(*tasks) |
|
|
for data in responses: |
|
|
if ai_only: |
|
|
print(data['body']['illustManga']['data']) |
|
|
posts = [post for post in data['body']['illustManga']['data'] if post['aiType'] == 2] |
|
|
else: |
|
|
posts = data['body']['illustManga']['data'] |
|
|
if not posts: |
|
|
break |
|
|
current_first_id = posts[0]['id'] |
|
|
if prev_first_id and current_first_id == prev_first_id: |
|
|
break |
|
|
prev_first_id = current_first_id |
|
|
post_ids.extend([post['id'] for post in posts]) |
|
|
|
|
|
return post_ids, requests.utils.unquote(keywords, encoding='utf-8') |
|
|
|
|
|
def base26(n): |
|
|
if n == 0: |
|
|
return "A" |
|
|
|
|
|
b26 = "" |
|
|
while n > 0: |
|
|
n, remainder = divmod(n, 26) |
|
|
b26 = chr(97 + remainder) + b26 |
|
|
|
|
|
return b26 |
|
|
|
|
|
def base26_time(): |
|
|
return base26(int(time.time())) |
|
|
|
|
|
@app.get("/search") |
|
|
async def search_endpoint( |
|
|
raw: str = Query(..., description="The raw URL to search."), |
|
|
pages: int = Query(1, description="Number of pages to fetch."), |
|
|
ai_only: bool = Query(True, description="Filter for AI-generated content.") |
|
|
): |
|
|
try: |
|
|
post_ids, keywords = await search(raw, pages, ai_only, cookies=cookies, headers=headers) |
|
|
return {"post_ids": post_ids, "filename": base26_time() + "_" + keywords} |
|
|
except Exception as e: |
|
|
return {"error": str(e)} |
|
|
|
|
|
@app.get("/user") |
|
|
async def user( |
|
|
user_id: int = Query(..., description="The user ID to fetch.") |
|
|
): |
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{user_id}/profile/all') |
|
|
posts = data["body"]["illusts"].keys() |
|
|
try: |
|
|
username = data['body']['pickup'][0]['userName'] |
|
|
except (KeyError, IndexError): |
|
|
user_data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}") |
|
|
username = user_data['body']['name'] |
|
|
|
|
|
return {"post_ids": list(posts), "filename": base26_time() + "_" + username.replace("|", "")} |
|
|
|
|
|
@app.get("/users") |
|
|
async def users( |
|
|
user_ids: List[int] = Query(..., description="List of user IDs to fetch.", alias="user_ids") |
|
|
): |
|
|
async def fetch_user_data(session, uid): |
|
|
try: |
|
|
data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{uid}/profile/all') |
|
|
posts = list(data["body"]["illusts"].keys()) |
|
|
try: |
|
|
username = data['body']['pickup'][0]['userName'] |
|
|
except (KeyError, IndexError): |
|
|
user_data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{uid}") |
|
|
username = user_data['body']['name'] |
|
|
filename = base26_time() + "_" + username.replace("|", "") |
|
|
return {"post_ids": posts, "filename": filename} |
|
|
except Exception as e: |
|
|
return {"user_id": uid, "error": str(e)} |
|
|
|
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
tasks = [fetch_user_data(session, uid) for uid in user_ids] |
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
return results |
|
|
|
|
|
def determine_exif_type(metadata): |
|
|
if metadata is None: |
|
|
return None |
|
|
elif metadata == b'TitleAI generated image': |
|
|
return "novelai" |
|
|
elif metadata.startswith(b"parameter"): |
|
|
return "sd" |
|
|
elif b'{"' in metadata: |
|
|
return "comfy" |
|
|
elif b"Dig" in metadata: |
|
|
return "mj" |
|
|
elif metadata.startswith(b"SoftwareCelsys"): |
|
|
return "celsys" |
|
|
else: |
|
|
return "photoshop" |
|
|
|
|
|
async def get_exif(url, session): |
|
|
start_range = 0 |
|
|
end_range = 512 |
|
|
|
|
|
headers = { |
|
|
"Referer": "https://www.pixiv.net/", |
|
|
"Range": f"bytes={start_range}-{end_range}" |
|
|
} |
|
|
|
|
|
async with session.get(url, headers=headers) as response: |
|
|
data = await response.read() |
|
|
return parse_png_metadata(data) |
|
|
|
|
|
def parse_png_metadata(data): |
|
|
index = 8 |
|
|
|
|
|
while index < len(data): |
|
|
if index + 8 > len(data): |
|
|
break |
|
|
chunk_len = int.from_bytes(data[index:index+4], 'big') |
|
|
chunk_type = data[index+4:index+8].decode('ascii') |
|
|
index += 8 |
|
|
|
|
|
if chunk_type in ['tEXt', 'iTXt']: |
|
|
content = data[index:index+chunk_len] |
|
|
if chunk_type == 'tEXt': |
|
|
return content.replace(b'\0', b'') |
|
|
elif chunk_type == 'iTXt': |
|
|
return content.strip() |
|
|
|
|
|
index += chunk_len + 4 |
|
|
return None |
|
|
|
|
|
async def process_post(post_id, session, semaphore): |
|
|
async with semaphore: |
|
|
try: |
|
|
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") |
|
|
image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] |
|
|
|
|
|
initial_chunks = [ |
|
|
(0, 1), |
|
|
(1, 6), |
|
|
(6, 10), |
|
|
(10, 21), |
|
|
(21, 31), |
|
|
(31, 41), |
|
|
] |
|
|
|
|
|
chunks = initial_chunks[:] |
|
|
start = 41 |
|
|
while start < len(image_urls): |
|
|
end = min(start + 10, len(image_urls)) |
|
|
chunks.append((start, end)) |
|
|
start = end |
|
|
|
|
|
exif_data_list = [] |
|
|
for s, e in chunks: |
|
|
chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] |
|
|
exif_data_list.extend(await asyncio.gather(*chunk_tasks)) |
|
|
|
|
|
for image_url, metadata in zip(image_urls, exif_data_list): |
|
|
exif_type = determine_exif_type(metadata) |
|
|
if exif_type not in ['photoshop', 'celsys', None]: |
|
|
return post_id, image_url |
|
|
|
|
|
return post_id, None |
|
|
except: |
|
|
return post_id, None |
|
|
|
|
|
@app.post("/pixif") |
|
|
async def pixif( |
|
|
items: pixifModel |
|
|
): |
|
|
post_ids = items.post_ids |
|
|
semaphore = asyncio.Semaphore(1000) |
|
|
|
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] |
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} |
|
|
return image_exifs |
|
|
|
|
|
async def generate_zstd_archive(posts, session): |
|
|
semaphore = asyncio.Semaphore(1000) |
|
|
images = {} |
|
|
|
|
|
async def fetch_image(post_id, image_url): |
|
|
async with semaphore: |
|
|
url = f"{img_base}{image_url}" |
|
|
async with session.get(url) as response: |
|
|
image_data = await response.read() |
|
|
return post_id, image_data |
|
|
|
|
|
tasks = [fetch_image(post_id, image_url) for post_id, image_url in posts.items()] |
|
|
results = await asyncio.gather(*tasks) |
|
|
images = {post_id: image_data for post_id, image_data in results} |
|
|
|
|
|
tar_buffer = io.BytesIO() |
|
|
with tarfile.open(fileobj=tar_buffer, mode="w") as tar: |
|
|
for post_id, image_data in images.items(): |
|
|
image_name = f"{post_id}.png" |
|
|
file_info = tarfile.TarInfo(name=image_name) |
|
|
file_info.size = len(image_data) |
|
|
tar.addfile(tarinfo=file_info, fileobj=io.BytesIO(image_data)) |
|
|
tar_buffer.seek(0) |
|
|
|
|
|
cctx = zstd.ZstdCompressor(level=-1) |
|
|
compressed = cctx.compress(tar_buffer.read()) |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".tar.zstd") |
|
|
temp_file.write(compressed) |
|
|
temp_file.flush() |
|
|
temp_file.seek(0) |
|
|
return temp_file |
|
|
|
|
|
@app.post("/download") |
|
|
async def download( |
|
|
items: PixifDownloadModel, |
|
|
): |
|
|
posts = items.posts |
|
|
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
|
|
temp_file = await generate_zstd_archive(posts, session) |
|
|
|
|
|
filename = f"{base26_time()}.zstd" |
|
|
|
|
|
return FileResponse( |
|
|
path=temp_file.name, |
|
|
media_type="application/zstd", |
|
|
filename=filename |
|
|
) |
|
|
|
|
|
@app.get("/") |
|
|
async def read_root(): |
|
|
return {"message": "Hello, World!"} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host="127.0.0.1", port=7860) |
|
|
|