import aiohttp import asyncio import requests.utils import time import os import re from urllib.parse import parse_qsl, parse_qs, urlencode, urlsplit from dotenv import load_dotenv img_base = 'https://i.pximg.net/img-original/img/' load_dotenv() PHPSESSID = os.getenv("PHPSESSID") cookies = {"PHPSESSID": PHPSESSID} headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", 'referer': 'https://www.pixiv.net/', } AI_TAGS = { "aiイラスト", "ai生成", "stablediffusion", "ai-generated", "novelai", "novelaidiffusionai", "aiart", "ai", "comfyui" } BAD_FILENAME_CHARS = r'[<>:"/\\|?*\x00-\x1f]+' def sanitize_filename(name): return re.sub(BAD_FILENAME_CHARS, "_", name).strip(" ._") or "pixiv" def get_search_keywords(raw): parts = urlsplit(raw) path_parts = [requests.utils.unquote(part, encoding='utf-8') for part in parts.path.split('/') if part] if 'tags' in path_parts: index = path_parts.index('tags') + 1 if index < len(path_parts): return path_parts[index] query = parse_qs(parts.query) words = query.get('word') or query.get('q') if words: return words[0] return raw.strip() def get_search_params(raw, keywords): params = [] for key, value in parse_qsl(urlsplit(raw).query, keep_blank_values=True): if key in ("q", "word", "type"): continue if key == "s_mode": if value == "tag": value = "s_tag" elif value == "tag_full": value = "s_tag_full" params.append((key, value)) params.append(("word", keywords)) if not any(key == "s_mode" for key, _ in params): params.append(("s_mode", "s_tag")) return urlencode(params) def is_ai_post(post): if post.get("aiType") == 2: return True tags = post.get("tags") or [] for tag in tags: if isinstance(tag, str) and tag.casefold() in AI_TAGS: return True return False async def fetch_page(session, url): async with session.get(url) as response: data = await response.json() return data async def search(raw, pages, ai_only=True, real_only=True, cookies=None, headers=None): keywords = get_search_keywords(raw) encoded_keywords = requests.utils.quote(keywords, safe='') params = get_search_params(raw, keywords) url = f"https://www.pixiv.net/ajax/search/artworks/{encoded_keywords}?{params}" post_ids = [] tasks = [] async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: for page in range(1, pages + 1): page_url = f"{url.strip()}&p={page}" task = fetch_page(session, page_url) tasks.append(task) responses = await asyncio.gather(*tasks) for data in responses: posts = data['body']['illustManga']['data'] if not posts: break if ai_only: posts = [post for post in posts if is_ai_post(post)] elif real_only: posts = [post for post in posts if not is_ai_post(post)] post_ids.extend([post['id'] for post in posts]) return post_ids, keywords def base26_time(): x='' n=int(time.time()*100) while n: x=chr(97+n%26)+x n//=26 return x async def get_user(user_id, session): data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{user_id}/profile/all') posts = data["body"]["illusts"].keys() try: username = data['body']['pickup'][0]['userName'] except (KeyError, IndexError): user_data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}") username = user_data['body']['name'] return {"post_ids": list(posts), "filename": base26_time() + "_" + username.replace("|", "")} async def get_users(user_ids): async def fetch_user_data(session, uid): try: return await get_user(uid, session) except Exception as e: return {"user_id": uid, "error": str(e)} async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [fetch_user_data(session, uid) for uid in user_ids] results = await asyncio.gather(*tasks) return results def determine_exif_type(metadata): if metadata is None: return None elif metadata == b'TitleAI generated image': return "novelai" elif metadata.startswith(b"parameter"): return "sd" elif b'{"' in metadata: return "comfy" elif b"Dig" in metadata: return "mj" elif metadata.startswith(b"SoftwareCelsys"): return "celsys" else: return "photoshop" async def get_exif(url, session): start_range = 0 end_range = 512 headers = { "Referer": "https://www.pixiv.net/", "Range": f"bytes={start_range}-{end_range}" } async with session.get(url, headers=headers) as response: data = await response.read() return parse_png_metadata(data) def parse_png_metadata(data): index = 8 while index < len(data): if index + 8 > len(data): break chunk_len = int.from_bytes(data[index:index+4], 'big') chunk_type = data[index+4:index+8].decode('ascii') index += 8 if chunk_type in ['tEXt', 'iTXt']: content = data[index:index+chunk_len] if chunk_type == 'tEXt': return content.replace(b'\0', b'') elif chunk_type == 'iTXt': return content.strip() index += chunk_len + 4 return None async def process_post(post_id, session, semaphore): async with semaphore: try: data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] initial_offsets = [1, 5, 5, 10, 10, 10] chunks = [] start = 0 for offset in initial_offsets: end = start + offset if end > len(image_urls): end = len(image_urls) chunks.append((start, end)) start = end while start < len(image_urls): end = min(start + 10, len(image_urls)) chunks.append((start, end)) start = end for s, e in chunks: chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] results = await asyncio.gather(*chunk_tasks) for image_url, metadata in zip(image_urls[s:e], results): exif_type = determine_exif_type(metadata) if exif_type not in ['photoshop', 'celsys', None]: return post_id, image_url return post_id, None except Exception: return post_id, None async def get_pixif_data(post_ids): semaphore = asyncio.Semaphore(5) async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] results = await asyncio.gather(*tasks) image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} return image_exifs