import asyncio import io import json import os import tempfile import time from pathlib import Path from urllib.parse import parse_qs, parse_qsl, quote, unquote, urlencode, urlsplit try: from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parent.parent / ".env") except ImportError: pass import aiohttp import httpx import numpy as np from fastapi import BackgroundTasks, FastAPI, HTTPException, Request from fastapi.responses import FileResponse, Response, StreamingResponse from fastapi.staticfiles import StaticFiles from PIL import Image from pydantic import BaseModel TURSO_DB_URL = os.getenv("TURSO_DB_URL", "").strip() TURSO_AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN_WRITE", "").strip() DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "").strip() PHPSESSID = os.getenv("PHPSESSID", "") IMG_BASE = "https://i.pximg.net/img-original/img/" PIXIV_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0", "referer": "https://www.pixiv.net/", } AI_TAGS = { "stablediffusion", "ai-generated", "novelai", "novelaidiffusionai", "aiart", "ai", "comfyui", } EXIF_TYPE_ORDER = ("novelai", "sd", "comfy", "mj", "celsys", "photoshop", "stealth") EXIF_TYPE_TO_CODE = {name: idx + 1 for idx, name in enumerate(EXIF_TYPE_ORDER)} POST_SCAN_LIMIT = 64 EXIF_RANGE_LIMIT = 96 FULL_IMAGE_LIMIT = 32 PAGE_SIZE = 60 SEARCH_PAGE_SIZE = 10 THUMB_MAX_AGE = 1800 THUMB_DIR = Path(tempfile.gettempdir()) / "pixif2-thumbs" PAGE_URL_CACHE_MAX_AGE = 1800 WEBP_SCALE = 0.4 WEBP_QUALITY = 82 app = FastAPI() ACTIVE_TASKS = {} TASK_EVENT_QUEUES = set() PAGE_URL_CACHE = {} FRONTEND_DIR = os.path.join(os.getcwd(), "frontend") def base26_time(): x = "" n = int(time.time() * 100) while n: x = chr(97 + n % 26) + x n //= 26 return x def base26_to_time(value): n = 0 for c in value: if c < "a" or c > "z": return 0 n = n * 26 + ord(c) - 97 return n // 100 def turso_url(): base = TURSO_DB_URL.rstrip("/") if not base.startswith("http"): base = "https://" + base return base async def turso_execute(stmts): url = turso_url() + "/v2/pipeline" headers = { "Authorization": f"Bearer {TURSO_AUTH_TOKEN}", "Content-Type": "application/json", } body = { "requests": [{"type": "execute", "stmt": s} for s in stmts] + [{"type": "close"}] } async with httpx.AsyncClient(timeout=60) as client: r = await client.post(url, json=body, headers=headers) r.raise_for_status() return r.json() async def turso_batch(stmts): url = turso_url() + "/v2/pipeline" headers = { "Authorization": f"Bearer {TURSO_AUTH_TOKEN}", "Content-Type": "application/json", } body = { "requests": [{"type": "execute", "stmt": s} for s in stmts] + [{"type": "close"}] } async with httpx.AsyncClient(timeout=120) as client: r = await client.post(url, json=body, headers=headers) r.raise_for_status() return r.json() async def init_db(): await turso_execute( [ { "sql": "CREATE TABLE IF NOT EXISTS pi_searches (id TEXT PRIMARY KEY, post_ids TEXT)" }, { "sql": "CREATE TABLE IF NOT EXISTS pi_scans (post_id TEXT PRIMARY KEY, url TEXT, exif_type INTEGER)" }, ] ) for sql in ( "DROP INDEX IF EXISTS pi_searches_created_at_idx", "DROP INDEX IF EXISTS pi_search_posts_search_pos_idx", "DROP INDEX IF EXISTS pi_search_posts_post_idx", "DROP TABLE IF EXISTS pi_search_posts", "ALTER TABLE pi_searches DROP COLUMN api_url", "ALTER TABLE pi_searches DROP COLUMN created_at", ): try: await turso_execute([{"sql": sql}]) except httpx.HTTPStatusError as e: text = e.response.text.casefold() if "no such column" not in text and "no such index" not in text: raise async def discord_notify(msg): if not DISCORD_WEBHOOK_URL: print("WARN: DISCORD_WEBHOOK_URL not set, skipping notify") return try: async with aiohttp.ClientSession() as session: async with session.post( DISCORD_WEBHOOK_URL.rstrip("/") + "/webhook-forward", json={"content": msg}, timeout=aiohttp.ClientTimeout(total=15), ) as r: if r.status >= 400: body = await r.text() print(f"Discord webhook failed ({r.status}): {body}") except Exception as e: print(f"Discord webhook error: {repr(e)}") async def publish_task_event(search_id): data = {"id": search_id, "at": int(time.time())} for queue in list(TASK_EVENT_QUEUES): if queue.full(): try: queue.get_nowait() except asyncio.QueueEmpty: pass queue.put_nowait(data) async def finish_task(search_id): ACTIVE_TASKS.pop(search_id, None) await publish_task_event(search_id) def is_ai_post(post): if post.get("aiType") == 2: return True tags = post.get("tags") or [] for tag in tags: name = ( tag if isinstance(tag, str) else (tag.get("tag") or tag.get("name") or "") if isinstance(tag, dict) else "" ) if name and name.casefold() in AI_TAGS: return True return False def get_search_keywords(raw): parts = urlsplit(raw) path_parts = [unquote(p) for p in parts.path.split("/") if p] if "tags" in path_parts: idx = path_parts.index("tags") + 1 if idx < len(path_parts): return path_parts[idx] query = parse_qs(parts.query) words = query.get("word") or query.get("q") if words: return words[0] return raw.strip() def get_search_params(raw, keywords): params = [] for key, value in parse_qsl(urlsplit(raw).query, keep_blank_values=True): if key in ("p", "q", "word", "type"): continue if key == "s_mode": if value == "tag": value = "s_tag" elif value == "tag_full": value = "s_tag_full" params.append((key, value)) params.append(("word", keywords)) if not any(k == "s_mode" for k, _ in params): params.append(("s_mode", "s_tag")) return urlencode(params) def get_search_api_url(raw, keywords): encoded = quote(keywords, safe="") params = get_search_params(raw, keywords) return f"https://www.pixiv.net/ajax/search/artworks/{encoded}?{params}" async def save_search(search_id, post_ids): if not post_ids: return stmt = { "sql": "INSERT OR REPLACE INTO pi_searches (id, post_ids) VALUES (?, ?)", "args": [ {"type": "text", "value": search_id}, {"type": "text", "value": json.dumps(post_ids)}, ], } await turso_execute([stmt]) def user_search_id(user_id, username): label = str(username or user_id).strip() or str(user_id) return f"{base26_time()}_{label}" async def pixiv_user_name(user_id, session): data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}") body = data.get("body") or {} return (body.get("name") or body.get("account") or "").strip() async def pixiv_user_names(user_ids, phpsessid): cookies = {"PHPSESSID": phpsessid} async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session: async def load_name(uid): try: return uid, await pixiv_user_name(uid, session) except Exception: return uid, "" return dict(await asyncio.gather(*(load_name(uid) for uid in user_ids))) async def pixiv_search_live(url, pages, mode, phpsessid, search_id): keywords = get_search_keywords(url) api_url = get_search_api_url(url, keywords) first_url = f"{api_url}&p=1" cookies = {"PHPSESSID": phpsessid} post_ids = [] seen = set() done = 0 async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session: tasks = [fetch_page(session, f"{api_url}&p={p}") for p in range(1, pages + 1)] for coro in asyncio.as_completed(tasks): data = await coro done += 1 if search_id in ACTIVE_TASKS: ACTIVE_TASKS[search_id].update({"total": pages, "done": done}) if data.get("error"): continue body = data.get("body") or {} posts = (body.get("illustManga") or {}).get("data") or [] if mode == "ai": posts = [p for p in posts if is_ai_post(p)] elif mode == "real": posts = [p for p in posts if not is_ai_post(p)] for post in posts: pid = str(post.get("id") or "") if pid and pid not in seen: seen.add(pid) post_ids.append(pid) if done % 25 == 0: await save_search(search_id, post_ids) await save_search(search_id, post_ids) return post_ids, keywords, first_url async def pixiv_user_posts(user_id, phpsessid): cookies = {"PHPSESSID": phpsessid} async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session: data = await fetch_page( session, f"https://www.pixiv.net/ajax/user/{user_id}/profile/all" ) body = data.get("body") or {} posts = list((body.get("illusts") or {}).keys()) username = "" pickup = body.get("pickup") or [] if pickup: username = (pickup[0] or {}).get("userName") or "" if not username: username = await pixiv_user_name(user_id, session) return {"user_id": user_id, "post_ids": posts, "username": username} async def fetch_page(session, url): async with session.get(url) as r: return await r.json() async def get_post_pages(post_id, session): key = str(post_id) cached = PAGE_URL_CACHE.get(key) now = time.time() if cached and now - cached["time"] < PAGE_URL_CACHE_MAX_AGE: return cached["pages"] data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages") pages = [] for page in data.get("body") or []: urls = page.get("urls") or {} pages.append( { "original": urls.get("original") or "", "regular": urls.get("regular") or "", "small": urls.get("small") or "", "thumb_mini": urls.get("thumb_mini") or "", } ) PAGE_URL_CACHE[key] = {"time": now, "pages": pages} return pages def parse_png_metadata(data): index = 8 while index < len(data): if index + 8 > len(data): break chunk_len = int.from_bytes(data[index : index + 4], "big") chunk_type = data[index + 4 : index + 8].decode("ascii", errors="ignore") index += 8 if chunk_type in ("tEXt", "iTXt"): content = data[index : index + chunk_len] return ( content.replace(b"\0", b"") if chunk_type == "tEXt" else content.strip() ) index += chunk_len + 4 return None def determine_exif_type(metadata): if metadata is None: return None if metadata == b"TitleAI generated image": return "novelai" if metadata.startswith(b"parameter"): return "sd" if b'{"' in metadata: return "comfy" if metadata.startswith(b"SoftwareCelsys"): return "celsys" return "photoshop" def byteize(alpha): alpha = alpha.T.reshape((-1,)) alpha = alpha[: (alpha.shape[0] // 8) * 8] alpha = np.bitwise_and(alpha, 1) alpha = alpha.reshape((-1, 8)) return np.packbits(alpha, axis=1) def has_stealth_png_bytes(data): try: image = Image.open(io.BytesIO(data)) if "A" not in image.getbands(): return False alpha = np.array(image.getchannel("A")) arr = byteize(alpha).flatten() magic = b"stealth_pngcomp" return bytes(arr[: len(magic)]) == magic except Exception: return False async def scan_post(post_id, session, post_sem, exif_sem, img_sem): async with post_sem: try: pages = await get_post_pages(post_id, session) image_urls = [p["original"] for p in pages if "png" in p["original"]] for url in image_urls: metadata = await get_exif_range(url, session, exif_sem) exif_type = determine_exif_type(metadata) if exif_type not in ("photoshop", "celsys", None): code = EXIF_TYPE_TO_CODE.get(exif_type) return post_id, url, code for url in image_urls: img_data = await fetch_image(session, url, img_sem) if img_data and has_stealth_png_bytes(img_data): return post_id, url, EXIF_TYPE_TO_CODE.get("stealth") return post_id, None, None except Exception: return post_id, None, None async def get_exif_range(url, session, sem): hdrs = {"Referer": "https://www.pixiv.net/", "Range": "bytes=0-512"} if sem: async with sem: async with session.get(url, headers=hdrs) as r: data = await r.read() else: async with session.get(url, headers=hdrs) as r: data = await r.read() return parse_png_metadata(data) async def fetch_image(session, url, sem): if sem: async with sem: async with session.get(url) as r: return await r.read() async with session.get(url) as r: return await r.read() async def run_scan(post_ids, phpsessid, task_id=None, save_live=False): post_sem = asyncio.Semaphore(POST_SCAN_LIMIT) exif_sem = asyncio.Semaphore(EXIF_RANGE_LIMIT) img_sem = asyncio.Semaphore(FULL_IMAGE_LIMIT) cookies = {"PHPSESSID": phpsessid} results = [] async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session: tasks = [ scan_post(pid, session, post_sem, exif_sem, img_sem) for pid in post_ids ] pending = [] for coro in asyncio.as_completed(tasks): result = await coro results.append(result) pending.append(result) if task_id and task_id in ACTIVE_TASKS: ACTIVE_TASKS[task_id]["done"] = len(results) if save_live and len(pending) >= 20: await save_scan_results(pending) pending = [] if save_live and pending: await save_scan_results(pending) return results async def save_scan_results(results): stmts = [] for post_id, url, exif_type in results: short_url = url.replace(IMG_BASE, "", 1) if url else "" stmts.append( { "sql": "INSERT OR REPLACE INTO pi_scans (post_id, url, exif_type) VALUES (?, ?, ?)", "args": [ {"type": "text", "value": str(post_id)}, {"type": "text", "value": short_url}, {"type": "integer", "value": str(exif_type)} if exif_type else {"type": "null"}, ], } ) if stmts: for i in range(0, len(stmts), 200): await turso_batch(stmts[i : i + 200]) async def get_scanned_post_ids(post_ids): if not post_ids: return {} chunks = [post_ids[i : i + 500] for i in range(0, len(post_ids), 500)] scanned = {} stmts = [] for chunk in chunks: placeholders = ",".join("?" for _ in chunk) stmts.append( { "sql": f"SELECT post_id, url, exif_type FROM pi_scans WHERE post_id IN ({placeholders})", "args": [{"type": "text", "value": str(pid)} for pid in chunk], } ) resp = await turso_execute(stmts) for result in resp.get("results") or []: if "response" not in result: continue rows = result["response"].get("result", {}).get("rows", []) for row in rows: pid = row[0].get("value") url_val = row[1].get("value") if row[1].get("type") != "null" else "" et = row[2].get("value") if row[2].get("type") != "null" else None scanned[pid] = {"url": url_val, "exif_type": int(et) if et else None} return scanned def exif_items(post_ids, scanned, exif_types=None): allowed = set(exif_types) if exif_types is not None else None items = [] for pid in post_ids: s = scanned.get(pid) if not s: continue exif_type = s.get("exif_type") if exif_type is None and (allowed is None or 0 not in allowed): continue if exif_type is not None and allowed is not None and exif_type not in allowed: continue items.append( { "post_id": pid, "url": s.get("url"), "exif_type": exif_type, "scanned": True, **image_links(pid, s.get("url")), } ) return items def cleanup_thumbs(): THUMB_DIR.mkdir(exist_ok=True) now = time.time() for path in THUMB_DIR.glob("*.webp"): try: if now - path.stat().st_atime > THUMB_MAX_AGE: path.unlink() except OSError: pass def page_num_from_url(url): if not url: return 0 name = url.rsplit("/", 1)[-1] if "_p" not in name: return 0 try: return int(name.rsplit("_p", 1)[1].split(".", 1)[0]) except ValueError: return 0 def media_type_from_url(url): ext = urlsplit(url).path.rsplit(".", 1)[-1].casefold() if ext in ("jpg", "jpeg"): return "image/jpeg" if ext == "png": return "image/png" if ext == "gif": return "image/gif" if ext == "webp": return "image/webp" return "application/octet-stream" async def get_pixiv_image_url(post_id, page, size, phpsessid): cookies = {"PHPSESSID": phpsessid} async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session: pages = await get_post_pages(post_id, session) if not pages: raise HTTPException(status_code=404, detail="image not found") page = min(max(page, 0), len(pages) - 1) urls = pages[page] if size in ("full", "orig"): url = urls.get("original") or urls.get("regular") or urls.get("small") else: url = urls.get("regular") or urls.get("small") or urls.get("original") if not url: raise HTTPException(status_code=404, detail="image not found") return url async def fetch_pixiv_bytes(url, phpsessid): cookies = {"PHPSESSID": phpsessid} async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session: async with session.get(url) as r: if r.status >= 400: raise HTTPException(status_code=r.status, detail="image fetch failed") return await r.read(), r.headers.get("Content-Type") or media_type_from_url(url) async def create_webp(post_id, image_url, phpsessid, page=0, kind="t"): cleanup_thumbs() scale_tag = int(WEBP_SCALE * 100) out = THUMB_DIR / f"{post_id}_p{page}_{kind}{scale_tag}.webp" if out.exists(): os.utime(out, None) return out data, _ = await fetch_pixiv_bytes(image_url, phpsessid) if not data: raise HTTPException(status_code=404, detail="image not found") image = Image.open(io.BytesIO(data)) image = image.resize( (max(int(image.width * WEBP_SCALE), 1), max(int(image.height * WEBP_SCALE), 1)) ) if image.mode not in ("RGB", "RGBA"): image = image.convert("RGB") image.save(out, "WEBP", quality=WEBP_QUALITY) return out def image_links(post_id, url): page = page_num_from_url(url) suffix = f"?p={page}" pid = quote(str(post_id), safe="") webp_url = f"/api/i/{pid}/v{suffix}" return { "image_url": webp_url, "preview_url": webp_url, "download_url": f"/api/i/{pid}/o{suffix}", "full_image_url": webp_url, "page": page, } async def bg_search_task(search_id, url, pages, mode, phpsessid): ACTIVE_TASKS[search_id] = { "type": "search", "phase": "searching", "total": pages, "done": 0, } await discord_notify(f"`{search_id}` started") try: post_ids, _, _ = await pixiv_search_live(url, pages, mode, phpsessid, search_id) await discord_notify(f"`{search_id}` completed - {len(post_ids)} posts found") except Exception as e: await discord_notify(f"`{search_id}` failed: {e}") finally: await finish_task(search_id) async def bg_user_task(search_id, user_id, phpsessid): ACTIVE_TASKS[search_id] = { "type": "user_search", "phase": "searching", "total": 1, "done": 0, } await discord_notify(f"`{search_id}` started (user {user_id})") try: result = await pixiv_user_posts(user_id, phpsessid) post_ids = list(dict.fromkeys(result["post_ids"])) ACTIVE_TASKS[search_id]["done"] = 1 if not post_ids: await discord_notify(f"`{search_id}` completed - no posts, not saved") return await save_search(search_id, post_ids) already = await get_scanned_post_ids(post_ids) to_scan = [pid for pid in post_ids if pid not in already] if to_scan: ACTIVE_TASKS[search_id].update( {"phase": "scanning", "total": len(to_scan), "done": 0} ) await run_scan(to_scan, phpsessid, task_id=search_id, save_live=True) await discord_notify( f"`{search_id}` completed - {len(post_ids)} posts from user {user_id}" ) except Exception as e: await discord_notify(f"`{search_id}` failed: {e}") finally: await finish_task(search_id) async def bg_user_batch_task(jobs, phpsessid): await asyncio.gather( *(bg_user_task(search_id, user_id, phpsessid) for search_id, user_id in jobs) ) async def bg_scan_task(search_id, post_ids, phpsessid): ACTIVE_TASKS[search_id] = { "type": "scan", "phase": "scanning", "total": len(post_ids), "done": 0, } await discord_notify(f"`{search_id}` scan started ({len(post_ids)} posts)") try: results = await run_scan(post_ids, phpsessid, task_id=search_id, save_live=True) found = sum(1 for _, url, _ in results if url) await discord_notify( f"`{search_id}` scan completed - {found}/{len(post_ids)} have exif" ) except Exception as e: await discord_notify(f"`{search_id}` scan failed: {e}") finally: await finish_task(search_id) async def bg_search_and_scan_task(search_id, url, pages, mode, phpsessid): ACTIVE_TASKS[search_id] = { "type": "search+scan", "phase": "searching", "total": pages, "done": 0, } await discord_notify(f"`{search_id}` search+scan started") try: post_ids, _, _ = await pixiv_search_live(url, pages, mode, phpsessid, search_id) await discord_notify( f"`{search_id}` search done - {len(post_ids)} posts, scanning..." ) already = await get_scanned_post_ids(post_ids) to_scan = [pid for pid in post_ids if pid not in already] if to_scan: ACTIVE_TASKS[search_id].update( {"phase": "scanning", "total": len(to_scan), "done": 0} ) results = await run_scan( to_scan, phpsessid, task_id=search_id, save_live=True ) found = sum(1 for _, url, _ in results if url) await discord_notify( f"`{search_id}` scan completed - {found}/{len(to_scan)} new exif" ) else: await discord_notify(f"`{search_id}` all {len(post_ids)} already scanned") except Exception as e: await discord_notify(f"`{search_id}` failed: {e}") finally: await finish_task(search_id) class SearchRequest(BaseModel): url: str pages: int = 30 mode: str = "ai" action: str = "search" class UserSearchRequest(BaseModel): user_ids: list action: str = "search" class ScanRequest(BaseModel): search_id: str class RenameRequest(BaseModel): new_id: str @app.on_event("startup") async def startup(): if not TURSO_DB_URL: print("WARN: TURSO_DB_URL not set, skipping DB init") return try: await init_db() except Exception as e: print(f"WARN: DB init failed ({e}), will retry on first request") @app.post("/api/submit") async def submit_search(req: SearchRequest, bg: BackgroundTasks): search_id = base26_time() phpsessid = PHPSESSID bg.add_task( bg_search_and_scan_task, search_id, req.url, req.pages, req.mode, phpsessid ) return {"id": search_id, "status": "started"} @app.post("/api/submit_users") async def submit_users(req: UserSearchRequest, bg: BackgroundTasks): phpsessid = PHPSESSID user_ids = list(dict.fromkeys(int(u) for u in req.user_ids)) names = await pixiv_user_names(user_ids, phpsessid) jobs = [(user_search_id(uid, names.get(uid)), uid) for uid in user_ids] bg.add_task(bg_user_batch_task, jobs, phpsessid) return {"ids": [search_id for search_id, _ in jobs], "status": "started"} @app.post("/api/scan") async def scan_search(req: ScanRequest, bg: BackgroundTasks): phpsessid = PHPSESSID if req.search_id in ACTIVE_TASKS: return {"status": "active", **ACTIVE_TASKS[req.search_id]} resp = await turso_execute( [ { "sql": "SELECT post_ids FROM pi_searches WHERE id = ?", "args": [{"type": "text", "value": req.search_id}], } ] ) results = resp.get("results") or [] if not results or "response" not in results[0]: return {"error": "not found"} rows = results[0]["response"].get("result", {}).get("rows", []) if not rows: return {"error": "not found"} post_ids = json.loads(rows[0][0].get("value", "[]")) already = await get_scanned_post_ids(post_ids) to_scan = [pid for pid in post_ids if pid not in already] if not to_scan: return {"status": "already_scanned", "count": len(post_ids)} bg.add_task(bg_scan_task, req.search_id, to_scan, phpsessid) return {"status": "scanning", "total": len(post_ids), "to_scan": len(to_scan)} @app.get("/api/searches") async def list_searches(page: int = 1): page = max(page, 1) offset = (page - 1) * SEARCH_PAGE_SIZE resp = await turso_execute( [ { "sql": "SELECT COUNT(*) FROM pi_searches WHERE post_ids != '[]'" }, { "sql": "SELECT id, post_ids FROM pi_searches WHERE post_ids != '[]' " "ORDER BY id DESC LIMIT ? OFFSET ?", "args": [ {"type": "integer", "value": str(SEARCH_PAGE_SIZE)}, {"type": "integer", "value": str(offset)}, ], }, ] ) results = resp.get("results") or [] if len(results) < 2 or "response" not in results[1]: return {"items": [], "total": 0, "page": page, "pages": 1} count_rows = results[0].get("response", {}).get("result", {}).get("rows", []) total = int(count_rows[0][0].get("value", "0")) if count_rows else 0 rows = results[1]["response"].get("result", {}).get("rows", []) search_posts = [] all_post_ids = [] seen = set() for row in rows: post_ids = json.loads(row[1].get("value", "[]")) search_posts.append(post_ids) for pid in post_ids: if pid not in seen: seen.add(pid) all_post_ids.append(pid) scanned = await get_scanned_post_ids(all_post_ids) items = [] for row, post_ids in zip(rows, search_posts): search_id = row[0].get("value") items.append( { "id": search_id, "created_at": base26_to_time(search_id), "found_exif": sum( 1 for pid in post_ids if scanned.get(pid, {}).get("exif_type") ), "total_searched": len(post_ids), } ) return { "items": items, "total": total, "page": page, "page_size": SEARCH_PAGE_SIZE, "pages": max((total + SEARCH_PAGE_SIZE - 1) // SEARCH_PAGE_SIZE, 1), } @app.get("/api/search/{search_id}") async def get_search(search_id: str): resp = await turso_execute( [ { "sql": "SELECT id, post_ids FROM pi_searches WHERE id = ?", "args": [{"type": "text", "value": search_id}], } ] ) results = resp.get("results") or [] if not results or "response" not in results[0]: return {"error": "not found"} rows = results[0]["response"].get("result", {}).get("rows", []) if not rows: return {"error": "not found"} row = rows[0] post_ids = json.loads(row[1].get("value", "[]")) scanned = await get_scanned_post_ids(post_ids) return { "id": row[0].get("value"), "post_ids": post_ids, "created_at": base26_to_time(row[0].get("value")), "scanned": scanned, } @app.get("/api/results/{search_id}") async def get_results(search_id: str, page: int = 1, exif_types: str = ""): resp = await turso_execute( [ { "sql": "SELECT post_ids FROM pi_searches WHERE id = ?", "args": [{"type": "text", "value": search_id}], } ] ) results = resp.get("results") or [] if not results or "response" not in results[0]: return {"error": "not found"} rows = results[0]["response"].get("result", {}).get("rows", []) if not rows: return {"error": "not found"} post_ids = json.loads(rows[0][0].get("value", "[]")) scanned = await get_scanned_post_ids(post_ids) allowed = [ int(x) for x in exif_types.split(",") if x.isdigit() and int(x) in (*EXIF_TYPE_TO_CODE.values(), 0) ] source = exif_items(post_ids, scanned, allowed if exif_types != "" else None) page = max(page, 1) total = len(source) start = (page - 1) * PAGE_SIZE items = source[start : start + PAGE_SIZE] return { "search_id": search_id, "items": items, "total": total, "page": page, "page_size": PAGE_SIZE, "pages": max((total + PAGE_SIZE - 1) // PAGE_SIZE, 1), "raw_total": len(post_ids), "scanned_count": len(scanned), } @app.get("/api/i/{post_id}/t") async def get_image_thumb(post_id: str, p=0): p = int(p or 0) image_url = await get_pixiv_image_url(post_id, p, "thumb", PHPSESSID) path = await create_webp(post_id, image_url, PHPSESSID, p, "t") return FileResponse( path, media_type="image/webp", headers={"Cache-Control": f"public, max-age={THUMB_MAX_AGE}"}, ) @app.get("/api/i/{post_id}/v") async def get_image_preview(post_id: str, p=0): p = int(p or 0) image_url = await get_pixiv_image_url(post_id, p, "full", PHPSESSID) path = await create_webp(post_id, image_url, PHPSESSID, p, "v") return FileResponse( path, media_type="image/webp", headers={"Cache-Control": f"public, max-age={THUMB_MAX_AGE}"}, ) @app.get("/api/i/{post_id}/o") async def get_image_original(post_id: str, p=0): p = int(p or 0) image_url = await get_pixiv_image_url(post_id, p, "orig", PHPSESSID) data, content_type = await fetch_pixiv_bytes(image_url, PHPSESSID) filename = urlsplit(image_url).path.rsplit("/", 1)[-1] or f"{post_id}_p{p}.png" return Response( data, media_type=content_type, headers={ "Cache-Control": f"public, max-age={THUMB_MAX_AGE}", "Content-Disposition": f'attachment; filename="{filename}"', }, ) @app.get("/api/image/{post_id}/thumb") async def get_long_image_thumb(post_id: str, page: int = 0, p=None): return await get_image_thumb(post_id, page if p is None else p) @app.get("/api/image/{post_id}/full") async def get_long_image_full(post_id: str, page: int = 0, p=None): return await get_image_preview(post_id, page if p is None else p) @app.get("/api/thumb/{post_id}") async def get_thumb(post_id: str): return await get_image_thumb(post_id) @app.delete("/api/search/{search_id}") async def delete_search(search_id: str): await turso_execute( [ { "sql": "DELETE FROM pi_searches WHERE id = ?", "args": [{"type": "text", "value": search_id}], } ] ) return {"status": "deleted"} @app.patch("/api/search/{search_id}") async def rename_search(search_id: str, req: RenameRequest): resp = await turso_execute( [ { "sql": "SELECT post_ids FROM pi_searches WHERE id = ?", "args": [{"type": "text", "value": search_id}], } ] ) results = resp.get("results") or [] if not results or "response" not in results[0]: return {"error": "not found"} rows = results[0]["response"].get("result", {}).get("rows", []) if not rows: return {"error": "not found"} post_ids_val = rows[0][0].get("value", "[]") await turso_execute( [ { "sql": "DELETE FROM pi_searches WHERE id = ?", "args": [{"type": "text", "value": search_id}], }, { "sql": "INSERT INTO pi_searches (id, post_ids) VALUES (?, ?)", "args": [ {"type": "text", "value": req.new_id}, {"type": "text", "value": post_ids_val}, ], }, ] ) return {"status": "renamed", "new_id": req.new_id} @app.get("/api/progress") async def get_progress(): return [{"id": k, **v} for k, v in ACTIVE_TASKS.items()] @app.get("/api/events") async def events(request: Request): queue = asyncio.Queue(maxsize=16) TASK_EVENT_QUEUES.add(queue) async def stream(): try: while not await request.is_disconnected(): try: data = await asyncio.wait_for(queue.get(), timeout=25) yield f"data: {json.dumps(data)}\n\n" except asyncio.TimeoutError: yield ": keepalive\n\n" finally: TASK_EVENT_QUEUES.discard(queue) return StreamingResponse(stream(), media_type="text/event-stream") app.mount("/", StaticFiles(directory=FRONTEND_DIR, html=True), name="frontend")