| import aiohttp |
| import asyncio |
| import requests.utils |
| import time |
| import os |
| import re |
| from urllib.parse import parse_qsl, parse_qs, urlencode, urlsplit |
| from dotenv import load_dotenv |
|
|
| img_base = 'https://i.pximg.net/img-original/img/' |
|
|
| load_dotenv() |
|
|
| PHPSESSID = os.getenv("PHPSESSID") |
|
|
| cookies = {"PHPSESSID": PHPSESSID} |
|
|
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", |
| 'referer': 'https://www.pixiv.net/', |
| } |
|
|
| AI_TAGS = { |
| "aiイラスト", |
| "ai生成", |
| "stablediffusion", |
| "ai-generated", |
| "novelai", |
| "novelaidiffusionai", |
| "aiart", |
| "ai", |
| "comfyui" |
| } |
| BAD_FILENAME_CHARS = r'[<>:"/\\|?*\x00-\x1f]+' |
|
|
| def sanitize_filename(name): |
| return re.sub(BAD_FILENAME_CHARS, "_", name).strip(" ._") or "pixiv" |
|
|
| def get_search_keywords(raw): |
| parts = urlsplit(raw) |
| path_parts = [requests.utils.unquote(part, encoding='utf-8') for part in parts.path.split('/') if part] |
| if 'tags' in path_parts: |
| index = path_parts.index('tags') + 1 |
| if index < len(path_parts): |
| return path_parts[index] |
| query = parse_qs(parts.query) |
| words = query.get('word') or query.get('q') |
| if words: |
| return words[0] |
| return raw.strip() |
|
|
| def get_search_params(raw, keywords): |
| params = [] |
| for key, value in parse_qsl(urlsplit(raw).query, keep_blank_values=True): |
| if key in ("q", "word", "type"): |
| continue |
| if key == "s_mode": |
| if value == "tag": |
| value = "s_tag" |
| elif value == "tag_full": |
| value = "s_tag_full" |
| params.append((key, value)) |
| params.append(("word", keywords)) |
| if not any(key == "s_mode" for key, _ in params): |
| params.append(("s_mode", "s_tag")) |
| return urlencode(params) |
|
|
| def is_ai_post(post): |
| if post.get("aiType") == 2: |
| return True |
| tags = post.get("tags") or [] |
| for tag in tags: |
| if isinstance(tag, str) and tag.casefold() in AI_TAGS: |
| return True |
| return False |
|
|
| async def fetch_page(session, url): |
| async with session.get(url) as response: |
| data = await response.json() |
| return data |
|
|
| async def search(raw, pages, ai_only=True, real_only=True, cookies=None, headers=None): |
| keywords = get_search_keywords(raw) |
| encoded_keywords = requests.utils.quote(keywords, safe='') |
| params = get_search_params(raw, keywords) |
| url = f"https://www.pixiv.net/ajax/search/artworks/{encoded_keywords}?{params}" |
|
|
| post_ids = [] |
| tasks = [] |
|
|
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| for page in range(1, pages + 1): |
| page_url = f"{url.strip()}&p={page}" |
| task = fetch_page(session, page_url) |
| tasks.append(task) |
|
|
| responses = await asyncio.gather(*tasks) |
| for data in responses: |
| posts = data['body']['illustManga']['data'] |
| if not posts: |
| break |
| if ai_only: |
| posts = [post for post in posts if is_ai_post(post)] |
| elif real_only: |
| posts = [post for post in posts if not is_ai_post(post)] |
| post_ids.extend([post['id'] for post in posts]) |
|
|
| return post_ids, keywords |
|
|
| def base26_time(): |
| x='' |
| n=int(time.time()*100) |
| while n: |
| x=chr(97+n%26)+x |
| n//=26 |
| return x |
|
|
| async def get_user(user_id, session): |
| data = await fetch_page(session, f'https://www.pixiv.net/ajax/user/{user_id}/profile/all') |
| posts = data["body"]["illusts"].keys() |
| try: |
| username = data['body']['pickup'][0]['userName'] |
| except (KeyError, IndexError): |
| user_data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}") |
| username = user_data['body']['name'] |
| |
| return {"post_ids": list(posts), "filename": base26_time() + "_" + username.replace("|", "")} |
|
|
| async def get_users(user_ids): |
| async def fetch_user_data(session, uid): |
| try: |
| return await get_user(uid, session) |
| except Exception as e: |
| return {"user_id": uid, "error": str(e)} |
|
|
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| tasks = [fetch_user_data(session, uid) for uid in user_ids] |
| results = await asyncio.gather(*tasks) |
|
|
| return results |
|
|
| def determine_exif_type(metadata): |
| if metadata is None: |
| return None |
| elif metadata == b'TitleAI generated image': |
| return "novelai" |
| elif metadata.startswith(b"parameter"): |
| return "sd" |
| elif b'{"' in metadata: |
| return "comfy" |
| elif b"Dig" in metadata: |
| return "mj" |
| elif metadata.startswith(b"SoftwareCelsys"): |
| return "celsys" |
| else: |
| return "photoshop" |
|
|
| async def get_exif(url, session): |
| start_range = 0 |
| end_range = 512 |
|
|
| headers = { |
| "Referer": "https://www.pixiv.net/", |
| "Range": f"bytes={start_range}-{end_range}" |
| } |
|
|
| async with session.get(url, headers=headers) as response: |
| data = await response.read() |
| return parse_png_metadata(data) |
|
|
| def parse_png_metadata(data): |
| index = 8 |
|
|
| while index < len(data): |
| if index + 8 > len(data): |
| break |
| chunk_len = int.from_bytes(data[index:index+4], 'big') |
| chunk_type = data[index+4:index+8].decode('ascii') |
| index += 8 |
|
|
| if chunk_type in ['tEXt', 'iTXt']: |
| content = data[index:index+chunk_len] |
| if chunk_type == 'tEXt': |
| return content.replace(b'\0', b'') |
| elif chunk_type == 'iTXt': |
| return content.strip() |
|
|
| index += chunk_len + 4 |
| return None |
|
|
| async def process_post(post_id, session, semaphore): |
| async with semaphore: |
| try: |
| data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") |
| image_urls = [page['urls']['original'] for page in data['body'] if 'png' in page['urls']['original']] |
| |
| initial_offsets = [1, 5, 5, 10, 10, 10] |
| |
| chunks = [] |
| start = 0 |
| for offset in initial_offsets: |
| end = start + offset |
| if end > len(image_urls): |
| end = len(image_urls) |
| chunks.append((start, end)) |
| start = end |
| |
| while start < len(image_urls): |
| end = min(start + 10, len(image_urls)) |
| chunks.append((start, end)) |
| start = end |
|
|
| for s, e in chunks: |
| chunk_tasks = [get_exif(image_urls[i], session) for i in range(s, e)] |
| results = await asyncio.gather(*chunk_tasks) |
|
|
| for image_url, metadata in zip(image_urls[s:e], results): |
| exif_type = determine_exif_type(metadata) |
| if exif_type not in ['photoshop', 'celsys', None]: |
| return post_id, image_url |
| |
| return post_id, None |
| except Exception: |
| return post_id, None |
|
|
| async def get_pixif_data(post_ids): |
| semaphore = asyncio.Semaphore(5) |
| async with aiohttp.ClientSession(cookies=cookies, headers=headers) as session: |
| tasks = [process_post(post_id, session, semaphore) for post_id in post_ids] |
| results = await asyncio.gather(*tasks) |
|
|
| image_exifs = {post_id: image_url.replace(img_base, '', 1) for post_id, image_url in results if image_url} |
| return image_exifs |
|
|