p / Client /Scripts /Extract Pixiv /pixiv_api.py
q6's picture
Huge Fix
a082713
import aiohttp
import asyncio
import requests.utils
import time
import os
import re
from urllib.parse import parse_qsl, parse_qs, urlencode, urlsplit
from dotenv import load_dotenv
img_base = 'https://i.pximg.net/img-original/img/'
load_dotenv()
PHPSESSID = os.getenv("PHPSESSID")
cookies = {"PHPSESSID": PHPSESSID}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
'referer': 'https://www.pixiv.net/',
}
AI_TAGS = {
"aiイラスト",
"ai生成",
"stablediffusion",
"ai-generated",
"novelai",
"novelaidiffusionai",
"aiart",
"ai",
"comfyui"
}
BAD_FILENAME_CHARS = r'[<>:"/\\|?*\x00-\x1f]+'
def sanitize_filename(name):
return re.sub(BAD_FILENAME_CHARS, "_", name).strip(" ._") or "pixiv"
def get_search_keywords(raw):
parts = urlsplit(raw)
path_parts = [requests.utils.unquote(part, encoding='utf-8') for part in parts.path.split('/') if part]
if 'tags' in path_parts:
index = path_parts.index('tags') + 1
if index < len(path_parts):
return path_parts[index]
query = parse_qs(parts.query)
words = query.get('word') or query.get('q')
if words:
return words[0]
return raw.strip()
def get_search_params(raw, keywords):
params = []
for key, value in parse_qsl(urlsplit(raw).query, keep_blank_values=True):
if key in ("q", "word", "type"):
continue
if key == "s_mode":
if value == "tag":
value = "s_tag"
elif value == "tag_full":
value = "s_tag_full"
params.append((key, value))
params.append(("word", keywords))
if not any(key == "s_mode" for key, _ in params):
params.append(("s_mode", "s_tag"))
return urlencode(params)
def is_ai_post(post):
if post.get("aiType") == 2:
return True
tags = post.get("tags") or []
for tag in tags:
if isinstance(tag, str) and tag.casefold() in AI_TAGS:
return True
return False
async def fetch_page(session, url):
async with session.get(url) as response:
data = await response.json()
return data
async def search(raw, pages, ai_only=True, real_only=True, cookies=None, headers=None):
keywords = get_search_keywords(raw)
encoded_keywords = requests.utils.quote(keywords, safe='')
params = get_search_params(raw, keywords)
url = f"https://www.pixiv.net/ajax/search/artworks/{encoded_keywords}?{params}"
post_ids = []
tasks = []
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
for page in range(1, pages + 1):
page_url = f"{url.strip()}&p={page}"
task = fetch_page(session, page_url)
tasks.append(task)
responses = await asyncio.gather(*tasks)
for data in responses:
posts = data['body']['illustManga']['data']
if not posts:
break
if ai_only:
posts = [post for post in posts if is_ai_post(post)]
elif real_only:
posts = [post for post in posts if not is_ai_post(post)]
post_ids.extend([post['id'] for post in posts])
return post_ids, keywords
def base26_time():
x=''
n=int(time.time()*100)
while n:
x=chr(97+n%26)+x
n//=26
return x
async def get_user(user_id, session):
data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{user_id}/profile/all')
posts = data["body"]["illusts"].keys()
try:
username = data['body']['pickup'][0]['userName']
except (KeyError, IndexError):
user_data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}")
username = user_data['body']['name']
return {"post_ids": list(posts), "filename": base26_time() + "_" + username.replace("|", "")}
async def get_users(user_ids):
async def fetch_user_data(session, uid):
try:
return await get_user(uid, session)
except Exception as e:
return {"user_id": uid, "error": str(e)}
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
tasks = [fetch_user_data(session, uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
def determine_exif_type(metadata):
if metadata is None:
return None
elif metadata == b'TitleAI generated image':
return "novelai"
elif metadata.startswith(b"parameter"):
return "sd"
elif b'{"' in metadata:
return "comfy"
elif b"Dig" in metadata:
return "mj"
elif metadata.startswith(b"SoftwareCelsys"):
return "celsys"
else:
return "photoshop"
async def get_exif(url, session):
start_range = 0
end_range = 512
headers = {
"Referer": "https://www.pixiv.net/",
"Range": f"bytes={start_range}-{end_range}"
}
async with session.get(url, headers=headers) as response:
data = await response.read()
return parse_png_metadata(data)
def parse_png_metadata(data):
index = 8
while index < len(data):
if index + 8 > len(data):
break
chunk_len = int.from_bytes(data[index:index+4], 'big')
chunk_type = data[index+4:index+8].decode('ascii')
index += 8
if chunk_type in ['tEXt', 'iTXt']:
content = data[index:index+chunk_len]
if chunk_type == 'tEXt':
return content.replace(b'\0', b'')
elif chunk_type == 'iTXt':
return content.strip()
index += chunk_len + 4
return None
async def process_post(post_id, session, semaphore):
async with semaphore:
try:
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages")
image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']]
initial_offsets = [1, 5, 5, 10, 10, 10]
chunks = []
start = 0
for offset in initial_offsets:
end = start + offset
if end > len(image_urls):
end = len(image_urls)
chunks.append((start, end))
start = end
while start < len(image_urls):
end = min(start + 10, len(image_urls))
chunks.append((start, end))
start = end
for s, e in chunks:
chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)]
results = await asyncio.gather(*chunk_tasks)
for image_url, metadata in zip(image_urls[s:e], results):
exif_type = determine_exif_type(metadata)
if exif_type not in ['photoshop', 'celsys', None]:
return post_id, image_url
return post_id, None
except Exception:
return post_id, None
async def get_pixif_data(post_ids):
semaphore = asyncio.Semaphore(5)
async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session:
tasks = [process_post(post_id, session, semaphore) for post_id in post_ids]
results = await asyncio.gather(*tasks)
image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url}
return image_exifs