P2 / backend /app.py
q6's picture
Show ten explorer searches per page
19ce679
import asyncio
import io
import json
import os
import tempfile
import time
from pathlib import Path
from urllib.parse import parse_qs, parse_qsl, quote, unquote, urlencode, urlsplit
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
except ImportError:
pass
import aiohttp
import httpx
import numpy as np
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, Response, StreamingResponse
from fastapi.staticfiles import StaticFiles
from PIL import Image
from pydantic import BaseModel
TURSO_DB_URL = os.getenv("TURSO_DB_URL", "").strip()
TURSO_AUTH_TOKEN = os.getenv("TURSO_AUTH_TOKEN_WRITE", "").strip()
DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL", "").strip()
PHPSESSID = os.getenv("PHPSESSID", "")
IMG_BASE = "https://i.pximg.net/img-original/img/"
PIXIV_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:126.0) Gecko/20100101 Firefox/126.0",
"referer": "https://www.pixiv.net/",
}
AI_TAGS = {
"stablediffusion",
"ai-generated",
"novelai",
"novelaidiffusionai",
"aiart",
"ai",
"comfyui",
}
EXIF_TYPE_ORDER = ("novelai", "sd", "comfy", "mj", "celsys", "photoshop", "stealth")
EXIF_TYPE_TO_CODE = {name: idx + 1 for idx, name in enumerate(EXIF_TYPE_ORDER)}
POST_SCAN_LIMIT = 64
EXIF_RANGE_LIMIT = 96
FULL_IMAGE_LIMIT = 32
PAGE_SIZE = 60
SEARCH_PAGE_SIZE = 10
THUMB_MAX_AGE = 1800
THUMB_DIR = Path(tempfile.gettempdir()) / "pixif2-thumbs"
PAGE_URL_CACHE_MAX_AGE = 1800
WEBP_SCALE = 0.4
WEBP_QUALITY = 82
app = FastAPI()
ACTIVE_TASKS = {}
TASK_EVENT_QUEUES = set()
PAGE_URL_CACHE = {}
FRONTEND_DIR = os.path.join(os.getcwd(), "frontend")
def base26_time():
x = ""
n = int(time.time() * 100)
while n:
x = chr(97 + n % 26) + x
n //= 26
return x
def base26_to_time(value):
n = 0
for c in value:
if c < "a" or c > "z":
return 0
n = n * 26 + ord(c) - 97
return n // 100
def turso_url():
base = TURSO_DB_URL.rstrip("/")
if not base.startswith("http"):
base = "https://" + base
return base
async def turso_execute(stmts):
url = turso_url() + "/v2/pipeline"
headers = {
"Authorization": f"Bearer {TURSO_AUTH_TOKEN}",
"Content-Type": "application/json",
}
body = {
"requests": [{"type": "execute", "stmt": s} for s in stmts]
+ [{"type": "close"}]
}
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(url, json=body, headers=headers)
r.raise_for_status()
return r.json()
async def turso_batch(stmts):
url = turso_url() + "/v2/pipeline"
headers = {
"Authorization": f"Bearer {TURSO_AUTH_TOKEN}",
"Content-Type": "application/json",
}
body = {
"requests": [{"type": "execute", "stmt": s} for s in stmts]
+ [{"type": "close"}]
}
async with httpx.AsyncClient(timeout=120) as client:
r = await client.post(url, json=body, headers=headers)
r.raise_for_status()
return r.json()
async def init_db():
await turso_execute(
[
{
"sql": "CREATE TABLE IF NOT EXISTS pi_searches (id TEXT PRIMARY KEY, post_ids TEXT)"
},
{
"sql": "CREATE TABLE IF NOT EXISTS pi_scans (post_id TEXT PRIMARY KEY, url TEXT, exif_type INTEGER)"
},
]
)
for sql in (
"DROP INDEX IF EXISTS pi_searches_created_at_idx",
"DROP INDEX IF EXISTS pi_search_posts_search_pos_idx",
"DROP INDEX IF EXISTS pi_search_posts_post_idx",
"DROP TABLE IF EXISTS pi_search_posts",
"ALTER TABLE pi_searches DROP COLUMN api_url",
"ALTER TABLE pi_searches DROP COLUMN created_at",
):
try:
await turso_execute([{"sql": sql}])
except httpx.HTTPStatusError as e:
text = e.response.text.casefold()
if "no such column" not in text and "no such index" not in text:
raise
async def discord_notify(msg):
if not DISCORD_WEBHOOK_URL:
print("WARN: DISCORD_WEBHOOK_URL not set, skipping notify")
return
try:
async with aiohttp.ClientSession() as session:
async with session.post(
DISCORD_WEBHOOK_URL.rstrip("/") + "/webhook-forward",
json={"content": msg},
timeout=aiohttp.ClientTimeout(total=15),
) as r:
if r.status >= 400:
body = await r.text()
print(f"Discord webhook failed ({r.status}): {body}")
except Exception as e:
print(f"Discord webhook error: {repr(e)}")
async def publish_task_event(search_id):
data = {"id": search_id, "at": int(time.time())}
for queue in list(TASK_EVENT_QUEUES):
if queue.full():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
pass
queue.put_nowait(data)
async def finish_task(search_id):
ACTIVE_TASKS.pop(search_id, None)
await publish_task_event(search_id)
def is_ai_post(post):
if post.get("aiType") == 2:
return True
tags = post.get("tags") or []
for tag in tags:
name = (
tag
if isinstance(tag, str)
else (tag.get("tag") or tag.get("name") or "")
if isinstance(tag, dict)
else ""
)
if name and name.casefold() in AI_TAGS:
return True
return False
def get_search_keywords(raw):
parts = urlsplit(raw)
path_parts = [unquote(p) for p in parts.path.split("/") if p]
if "tags" in path_parts:
idx = path_parts.index("tags") + 1
if idx < len(path_parts):
return path_parts[idx]
query = parse_qs(parts.query)
words = query.get("word") or query.get("q")
if words:
return words[0]
return raw.strip()
def get_search_params(raw, keywords):
params = []
for key, value in parse_qsl(urlsplit(raw).query, keep_blank_values=True):
if key in ("p", "q", "word", "type"):
continue
if key == "s_mode":
if value == "tag":
value = "s_tag"
elif value == "tag_full":
value = "s_tag_full"
params.append((key, value))
params.append(("word", keywords))
if not any(k == "s_mode" for k, _ in params):
params.append(("s_mode", "s_tag"))
return urlencode(params)
def get_search_api_url(raw, keywords):
encoded = quote(keywords, safe="")
params = get_search_params(raw, keywords)
return f"https://www.pixiv.net/ajax/search/artworks/{encoded}?{params}"
async def save_search(search_id, post_ids):
if not post_ids:
return
stmt = {
"sql": "INSERT OR REPLACE INTO pi_searches (id, post_ids) VALUES (?, ?)",
"args": [
{"type": "text", "value": search_id},
{"type": "text", "value": json.dumps(post_ids)},
],
}
await turso_execute([stmt])
def user_search_id(user_id, username):
label = str(username or user_id).strip() or str(user_id)
return f"{base26_time()}_{label}"
async def pixiv_user_name(user_id, session):
data = await fetch_page(session, f"https://www.pixiv.net/ajax/user/{user_id}")
body = data.get("body") or {}
return (body.get("name") or body.get("account") or "").strip()
async def pixiv_user_names(user_ids, phpsessid):
cookies = {"PHPSESSID": phpsessid}
async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session:
async def load_name(uid):
try:
return uid, await pixiv_user_name(uid, session)
except Exception:
return uid, ""
return dict(await asyncio.gather(*(load_name(uid) for uid in user_ids)))
async def pixiv_search_live(url, pages, mode, phpsessid, search_id):
keywords = get_search_keywords(url)
api_url = get_search_api_url(url, keywords)
first_url = f"{api_url}&p=1"
cookies = {"PHPSESSID": phpsessid}
post_ids = []
seen = set()
done = 0
async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session:
tasks = [fetch_page(session, f"{api_url}&p={p}") for p in range(1, pages + 1)]
for coro in asyncio.as_completed(tasks):
data = await coro
done += 1
if search_id in ACTIVE_TASKS:
ACTIVE_TASKS[search_id].update({"total": pages, "done": done})
if data.get("error"):
continue
body = data.get("body") or {}
posts = (body.get("illustManga") or {}).get("data") or []
if mode == "ai":
posts = [p for p in posts if is_ai_post(p)]
elif mode == "real":
posts = [p for p in posts if not is_ai_post(p)]
for post in posts:
pid = str(post.get("id") or "")
if pid and pid not in seen:
seen.add(pid)
post_ids.append(pid)
if done % 25 == 0:
await save_search(search_id, post_ids)
await save_search(search_id, post_ids)
return post_ids, keywords, first_url
async def pixiv_user_posts(user_id, phpsessid):
cookies = {"PHPSESSID": phpsessid}
async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session:
data = await fetch_page(
session, f"https://www.pixiv.net/ajax/user/{user_id}/profile/all"
)
body = data.get("body") or {}
posts = list((body.get("illusts") or {}).keys())
username = ""
pickup = body.get("pickup") or []
if pickup:
username = (pickup[0] or {}).get("userName") or ""
if not username:
username = await pixiv_user_name(user_id, session)
return {"user_id": user_id, "post_ids": posts, "username": username}
async def fetch_page(session, url):
async with session.get(url) as r:
return await r.json()
async def get_post_pages(post_id, session):
key = str(post_id)
cached = PAGE_URL_CACHE.get(key)
now = time.time()
if cached and now - cached["time"] < PAGE_URL_CACHE_MAX_AGE:
return cached["pages"]
data = await fetch_page(session, f"https://www.pixiv.net/ajax/illust/{post_id}/pages")
pages = []
for page in data.get("body") or []:
urls = page.get("urls") or {}
pages.append(
{
"original": urls.get("original") or "",
"regular": urls.get("regular") or "",
"small": urls.get("small") or "",
"thumb_mini": urls.get("thumb_mini") or "",
}
)
PAGE_URL_CACHE[key] = {"time": now, "pages": pages}
return pages
def parse_png_metadata(data):
index = 8
while index < len(data):
if index + 8 > len(data):
break
chunk_len = int.from_bytes(data[index : index + 4], "big")
chunk_type = data[index + 4 : index + 8].decode("ascii", errors="ignore")
index += 8
if chunk_type in ("tEXt", "iTXt"):
content = data[index : index + chunk_len]
return (
content.replace(b"\0", b"") if chunk_type == "tEXt" else content.strip()
)
index += chunk_len + 4
return None
def determine_exif_type(metadata):
if metadata is None:
return None
if metadata == b"TitleAI generated image":
return "novelai"
if metadata.startswith(b"parameter"):
return "sd"
if b'{"' in metadata:
return "comfy"
if metadata.startswith(b"SoftwareCelsys"):
return "celsys"
return "photoshop"
def byteize(alpha):
alpha = alpha.T.reshape((-1,))
alpha = alpha[: (alpha.shape[0] // 8) * 8]
alpha = np.bitwise_and(alpha, 1)
alpha = alpha.reshape((-1, 8))
return np.packbits(alpha, axis=1)
def has_stealth_png_bytes(data):
try:
image = Image.open(io.BytesIO(data))
if "A" not in image.getbands():
return False
alpha = np.array(image.getchannel("A"))
arr = byteize(alpha).flatten()
magic = b"stealth_pngcomp"
return bytes(arr[: len(magic)]) == magic
except Exception:
return False
async def scan_post(post_id, session, post_sem, exif_sem, img_sem):
async with post_sem:
try:
pages = await get_post_pages(post_id, session)
image_urls = [p["original"] for p in pages if "png" in p["original"]]
for url in image_urls:
metadata = await get_exif_range(url, session, exif_sem)
exif_type = determine_exif_type(metadata)
if exif_type not in ("photoshop", "celsys", None):
code = EXIF_TYPE_TO_CODE.get(exif_type)
return post_id, url, code
for url in image_urls:
img_data = await fetch_image(session, url, img_sem)
if img_data and has_stealth_png_bytes(img_data):
return post_id, url, EXIF_TYPE_TO_CODE.get("stealth")
return post_id, None, None
except Exception:
return post_id, None, None
async def get_exif_range(url, session, sem):
hdrs = {"Referer": "https://www.pixiv.net/", "Range": "bytes=0-512"}
if sem:
async with sem:
async with session.get(url, headers=hdrs) as r:
data = await r.read()
else:
async with session.get(url, headers=hdrs) as r:
data = await r.read()
return parse_png_metadata(data)
async def fetch_image(session, url, sem):
if sem:
async with sem:
async with session.get(url) as r:
return await r.read()
async with session.get(url) as r:
return await r.read()
async def run_scan(post_ids, phpsessid, task_id=None, save_live=False):
post_sem = asyncio.Semaphore(POST_SCAN_LIMIT)
exif_sem = asyncio.Semaphore(EXIF_RANGE_LIMIT)
img_sem = asyncio.Semaphore(FULL_IMAGE_LIMIT)
cookies = {"PHPSESSID": phpsessid}
results = []
async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session:
tasks = [
scan_post(pid, session, post_sem, exif_sem, img_sem) for pid in post_ids
]
pending = []
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
pending.append(result)
if task_id and task_id in ACTIVE_TASKS:
ACTIVE_TASKS[task_id]["done"] = len(results)
if save_live and len(pending) >= 20:
await save_scan_results(pending)
pending = []
if save_live and pending:
await save_scan_results(pending)
return results
async def save_scan_results(results):
stmts = []
for post_id, url, exif_type in results:
short_url = url.replace(IMG_BASE, "", 1) if url else ""
stmts.append(
{
"sql": "INSERT OR REPLACE INTO pi_scans (post_id, url, exif_type) VALUES (?, ?, ?)",
"args": [
{"type": "text", "value": str(post_id)},
{"type": "text", "value": short_url},
{"type": "integer", "value": str(exif_type)}
if exif_type
else {"type": "null"},
],
}
)
if stmts:
for i in range(0, len(stmts), 200):
await turso_batch(stmts[i : i + 200])
async def get_scanned_post_ids(post_ids):
if not post_ids:
return {}
chunks = [post_ids[i : i + 500] for i in range(0, len(post_ids), 500)]
scanned = {}
stmts = []
for chunk in chunks:
placeholders = ",".join("?" for _ in chunk)
stmts.append(
{
"sql": f"SELECT post_id, url, exif_type FROM pi_scans WHERE post_id IN ({placeholders})",
"args": [{"type": "text", "value": str(pid)} for pid in chunk],
}
)
resp = await turso_execute(stmts)
for result in resp.get("results") or []:
if "response" not in result:
continue
rows = result["response"].get("result", {}).get("rows", [])
for row in rows:
pid = row[0].get("value")
url_val = row[1].get("value") if row[1].get("type") != "null" else ""
et = row[2].get("value") if row[2].get("type") != "null" else None
scanned[pid] = {"url": url_val, "exif_type": int(et) if et else None}
return scanned
def exif_items(post_ids, scanned, exif_types=None):
allowed = set(exif_types) if exif_types is not None else None
items = []
for pid in post_ids:
s = scanned.get(pid)
if not s:
continue
exif_type = s.get("exif_type")
if exif_type is None and (allowed is None or 0 not in allowed):
continue
if exif_type is not None and allowed is not None and exif_type not in allowed:
continue
items.append(
{
"post_id": pid,
"url": s.get("url"),
"exif_type": exif_type,
"scanned": True,
**image_links(pid, s.get("url")),
}
)
return items
def cleanup_thumbs():
THUMB_DIR.mkdir(exist_ok=True)
now = time.time()
for path in THUMB_DIR.glob("*.webp"):
try:
if now - path.stat().st_atime > THUMB_MAX_AGE:
path.unlink()
except OSError:
pass
def page_num_from_url(url):
if not url:
return 0
name = url.rsplit("/", 1)[-1]
if "_p" not in name:
return 0
try:
return int(name.rsplit("_p", 1)[1].split(".", 1)[0])
except ValueError:
return 0
def media_type_from_url(url):
ext = urlsplit(url).path.rsplit(".", 1)[-1].casefold()
if ext in ("jpg", "jpeg"):
return "image/jpeg"
if ext == "png":
return "image/png"
if ext == "gif":
return "image/gif"
if ext == "webp":
return "image/webp"
return "application/octet-stream"
async def get_pixiv_image_url(post_id, page, size, phpsessid):
cookies = {"PHPSESSID": phpsessid}
async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session:
pages = await get_post_pages(post_id, session)
if not pages:
raise HTTPException(status_code=404, detail="image not found")
page = min(max(page, 0), len(pages) - 1)
urls = pages[page]
if size in ("full", "orig"):
url = urls.get("original") or urls.get("regular") or urls.get("small")
else:
url = urls.get("regular") or urls.get("small") or urls.get("original")
if not url:
raise HTTPException(status_code=404, detail="image not found")
return url
async def fetch_pixiv_bytes(url, phpsessid):
cookies = {"PHPSESSID": phpsessid}
async with aiohttp.ClientSession(cookies=cookies, headers=PIXIV_HEADERS) as session:
async with session.get(url) as r:
if r.status >= 400:
raise HTTPException(status_code=r.status, detail="image fetch failed")
return await r.read(), r.headers.get("Content-Type") or media_type_from_url(url)
async def create_webp(post_id, image_url, phpsessid, page=0, kind="t"):
cleanup_thumbs()
scale_tag = int(WEBP_SCALE * 100)
out = THUMB_DIR / f"{post_id}_p{page}_{kind}{scale_tag}.webp"
if out.exists():
os.utime(out, None)
return out
data, _ = await fetch_pixiv_bytes(image_url, phpsessid)
if not data:
raise HTTPException(status_code=404, detail="image not found")
image = Image.open(io.BytesIO(data))
image = image.resize(
(max(int(image.width * WEBP_SCALE), 1), max(int(image.height * WEBP_SCALE), 1))
)
if image.mode not in ("RGB", "RGBA"):
image = image.convert("RGB")
image.save(out, "WEBP", quality=WEBP_QUALITY)
return out
def image_links(post_id, url):
page = page_num_from_url(url)
suffix = f"?p={page}"
pid = quote(str(post_id), safe="")
webp_url = f"/api/i/{pid}/v{suffix}"
return {
"image_url": webp_url,
"preview_url": webp_url,
"download_url": f"/api/i/{pid}/o{suffix}",
"full_image_url": webp_url,
"page": page,
}
async def bg_search_task(search_id, url, pages, mode, phpsessid):
ACTIVE_TASKS[search_id] = {
"type": "search",
"phase": "searching",
"total": pages,
"done": 0,
}
await discord_notify(f"`{search_id}` started")
try:
post_ids, _, _ = await pixiv_search_live(url, pages, mode, phpsessid, search_id)
await discord_notify(f"`{search_id}` completed - {len(post_ids)} posts found")
except Exception as e:
await discord_notify(f"`{search_id}` failed: {e}")
finally:
await finish_task(search_id)
async def bg_user_task(search_id, user_id, phpsessid):
ACTIVE_TASKS[search_id] = {
"type": "user_search",
"phase": "searching",
"total": 1,
"done": 0,
}
await discord_notify(f"`{search_id}` started (user {user_id})")
try:
result = await pixiv_user_posts(user_id, phpsessid)
post_ids = list(dict.fromkeys(result["post_ids"]))
ACTIVE_TASKS[search_id]["done"] = 1
if not post_ids:
await discord_notify(f"`{search_id}` completed - no posts, not saved")
return
await save_search(search_id, post_ids)
already = await get_scanned_post_ids(post_ids)
to_scan = [pid for pid in post_ids if pid not in already]
if to_scan:
ACTIVE_TASKS[search_id].update(
{"phase": "scanning", "total": len(to_scan), "done": 0}
)
await run_scan(to_scan, phpsessid, task_id=search_id, save_live=True)
await discord_notify(
f"`{search_id}` completed - {len(post_ids)} posts from user {user_id}"
)
except Exception as e:
await discord_notify(f"`{search_id}` failed: {e}")
finally:
await finish_task(search_id)
async def bg_user_batch_task(jobs, phpsessid):
await asyncio.gather(
*(bg_user_task(search_id, user_id, phpsessid) for search_id, user_id in jobs)
)
async def bg_scan_task(search_id, post_ids, phpsessid):
ACTIVE_TASKS[search_id] = {
"type": "scan",
"phase": "scanning",
"total": len(post_ids),
"done": 0,
}
await discord_notify(f"`{search_id}` scan started ({len(post_ids)} posts)")
try:
results = await run_scan(post_ids, phpsessid, task_id=search_id, save_live=True)
found = sum(1 for _, url, _ in results if url)
await discord_notify(
f"`{search_id}` scan completed - {found}/{len(post_ids)} have exif"
)
except Exception as e:
await discord_notify(f"`{search_id}` scan failed: {e}")
finally:
await finish_task(search_id)
async def bg_search_and_scan_task(search_id, url, pages, mode, phpsessid):
ACTIVE_TASKS[search_id] = {
"type": "search+scan",
"phase": "searching",
"total": pages,
"done": 0,
}
await discord_notify(f"`{search_id}` search+scan started")
try:
post_ids, _, _ = await pixiv_search_live(url, pages, mode, phpsessid, search_id)
await discord_notify(
f"`{search_id}` search done - {len(post_ids)} posts, scanning..."
)
already = await get_scanned_post_ids(post_ids)
to_scan = [pid for pid in post_ids if pid not in already]
if to_scan:
ACTIVE_TASKS[search_id].update(
{"phase": "scanning", "total": len(to_scan), "done": 0}
)
results = await run_scan(
to_scan, phpsessid, task_id=search_id, save_live=True
)
found = sum(1 for _, url, _ in results if url)
await discord_notify(
f"`{search_id}` scan completed - {found}/{len(to_scan)} new exif"
)
else:
await discord_notify(f"`{search_id}` all {len(post_ids)} already scanned")
except Exception as e:
await discord_notify(f"`{search_id}` failed: {e}")
finally:
await finish_task(search_id)
class SearchRequest(BaseModel):
url: str
pages: int = 30
mode: str = "ai"
action: str = "search"
class UserSearchRequest(BaseModel):
user_ids: list
action: str = "search"
class ScanRequest(BaseModel):
search_id: str
class RenameRequest(BaseModel):
new_id: str
@app.on_event("startup")
async def startup():
if not TURSO_DB_URL:
print("WARN: TURSO_DB_URL not set, skipping DB init")
return
try:
await init_db()
except Exception as e:
print(f"WARN: DB init failed ({e}), will retry on first request")
@app.post("/api/submit")
async def submit_search(req: SearchRequest, bg: BackgroundTasks):
search_id = base26_time()
phpsessid = PHPSESSID
bg.add_task(
bg_search_and_scan_task, search_id, req.url, req.pages, req.mode, phpsessid
)
return {"id": search_id, "status": "started"}
@app.post("/api/submit_users")
async def submit_users(req: UserSearchRequest, bg: BackgroundTasks):
phpsessid = PHPSESSID
user_ids = list(dict.fromkeys(int(u) for u in req.user_ids))
names = await pixiv_user_names(user_ids, phpsessid)
jobs = [(user_search_id(uid, names.get(uid)), uid) for uid in user_ids]
bg.add_task(bg_user_batch_task, jobs, phpsessid)
return {"ids": [search_id for search_id, _ in jobs], "status": "started"}
@app.post("/api/scan")
async def scan_search(req: ScanRequest, bg: BackgroundTasks):
phpsessid = PHPSESSID
if req.search_id in ACTIVE_TASKS:
return {"status": "active", **ACTIVE_TASKS[req.search_id]}
resp = await turso_execute(
[
{
"sql": "SELECT post_ids FROM pi_searches WHERE id = ?",
"args": [{"type": "text", "value": req.search_id}],
}
]
)
results = resp.get("results") or []
if not results or "response" not in results[0]:
return {"error": "not found"}
rows = results[0]["response"].get("result", {}).get("rows", [])
if not rows:
return {"error": "not found"}
post_ids = json.loads(rows[0][0].get("value", "[]"))
already = await get_scanned_post_ids(post_ids)
to_scan = [pid for pid in post_ids if pid not in already]
if not to_scan:
return {"status": "already_scanned", "count": len(post_ids)}
bg.add_task(bg_scan_task, req.search_id, to_scan, phpsessid)
return {"status": "scanning", "total": len(post_ids), "to_scan": len(to_scan)}
@app.get("/api/searches")
async def list_searches(page: int = 1):
page = max(page, 1)
offset = (page - 1) * SEARCH_PAGE_SIZE
resp = await turso_execute(
[
{
"sql": "SELECT COUNT(*) FROM pi_searches WHERE post_ids != '[]'"
},
{
"sql": "SELECT id, post_ids FROM pi_searches WHERE post_ids != '[]' "
"ORDER BY id DESC LIMIT ? OFFSET ?",
"args": [
{"type": "integer", "value": str(SEARCH_PAGE_SIZE)},
{"type": "integer", "value": str(offset)},
],
},
]
)
results = resp.get("results") or []
if len(results) < 2 or "response" not in results[1]:
return {"items": [], "total": 0, "page": page, "pages": 1}
count_rows = results[0].get("response", {}).get("result", {}).get("rows", [])
total = int(count_rows[0][0].get("value", "0")) if count_rows else 0
rows = results[1]["response"].get("result", {}).get("rows", [])
search_posts = []
all_post_ids = []
seen = set()
for row in rows:
post_ids = json.loads(row[1].get("value", "[]"))
search_posts.append(post_ids)
for pid in post_ids:
if pid not in seen:
seen.add(pid)
all_post_ids.append(pid)
scanned = await get_scanned_post_ids(all_post_ids)
items = []
for row, post_ids in zip(rows, search_posts):
search_id = row[0].get("value")
items.append(
{
"id": search_id,
"created_at": base26_to_time(search_id),
"found_exif": sum(
1 for pid in post_ids if scanned.get(pid, {}).get("exif_type")
),
"total_searched": len(post_ids),
}
)
return {
"items": items,
"total": total,
"page": page,
"page_size": SEARCH_PAGE_SIZE,
"pages": max((total + SEARCH_PAGE_SIZE - 1) // SEARCH_PAGE_SIZE, 1),
}
@app.get("/api/search/{search_id}")
async def get_search(search_id: str):
resp = await turso_execute(
[
{
"sql": "SELECT id, post_ids FROM pi_searches WHERE id = ?",
"args": [{"type": "text", "value": search_id}],
}
]
)
results = resp.get("results") or []
if not results or "response" not in results[0]:
return {"error": "not found"}
rows = results[0]["response"].get("result", {}).get("rows", [])
if not rows:
return {"error": "not found"}
row = rows[0]
post_ids = json.loads(row[1].get("value", "[]"))
scanned = await get_scanned_post_ids(post_ids)
return {
"id": row[0].get("value"),
"post_ids": post_ids,
"created_at": base26_to_time(row[0].get("value")),
"scanned": scanned,
}
@app.get("/api/results/{search_id}")
async def get_results(search_id: str, page: int = 1, exif_types: str = ""):
resp = await turso_execute(
[
{
"sql": "SELECT post_ids FROM pi_searches WHERE id = ?",
"args": [{"type": "text", "value": search_id}],
}
]
)
results = resp.get("results") or []
if not results or "response" not in results[0]:
return {"error": "not found"}
rows = results[0]["response"].get("result", {}).get("rows", [])
if not rows:
return {"error": "not found"}
post_ids = json.loads(rows[0][0].get("value", "[]"))
scanned = await get_scanned_post_ids(post_ids)
allowed = [
int(x)
for x in exif_types.split(",")
if x.isdigit() and int(x) in (*EXIF_TYPE_TO_CODE.values(), 0)
]
source = exif_items(post_ids, scanned, allowed if exif_types != "" else None)
page = max(page, 1)
total = len(source)
start = (page - 1) * PAGE_SIZE
items = source[start : start + PAGE_SIZE]
return {
"search_id": search_id,
"items": items,
"total": total,
"page": page,
"page_size": PAGE_SIZE,
"pages": max((total + PAGE_SIZE - 1) // PAGE_SIZE, 1),
"raw_total": len(post_ids),
"scanned_count": len(scanned),
}
@app.get("/api/i/{post_id}/t")
async def get_image_thumb(post_id: str, p=0):
p = int(p or 0)
image_url = await get_pixiv_image_url(post_id, p, "thumb", PHPSESSID)
path = await create_webp(post_id, image_url, PHPSESSID, p, "t")
return FileResponse(
path,
media_type="image/webp",
headers={"Cache-Control": f"public, max-age={THUMB_MAX_AGE}"},
)
@app.get("/api/i/{post_id}/v")
async def get_image_preview(post_id: str, p=0):
p = int(p or 0)
image_url = await get_pixiv_image_url(post_id, p, "full", PHPSESSID)
path = await create_webp(post_id, image_url, PHPSESSID, p, "v")
return FileResponse(
path,
media_type="image/webp",
headers={"Cache-Control": f"public, max-age={THUMB_MAX_AGE}"},
)
@app.get("/api/i/{post_id}/o")
async def get_image_original(post_id: str, p=0):
p = int(p or 0)
image_url = await get_pixiv_image_url(post_id, p, "orig", PHPSESSID)
data, content_type = await fetch_pixiv_bytes(image_url, PHPSESSID)
filename = urlsplit(image_url).path.rsplit("/", 1)[-1] or f"{post_id}_p{p}.png"
return Response(
data,
media_type=content_type,
headers={
"Cache-Control": f"public, max-age={THUMB_MAX_AGE}",
"Content-Disposition": f'attachment; filename="{filename}"',
},
)
@app.get("/api/image/{post_id}/thumb")
async def get_long_image_thumb(post_id: str, page: int = 0, p=None):
return await get_image_thumb(post_id, page if p is None else p)
@app.get("/api/image/{post_id}/full")
async def get_long_image_full(post_id: str, page: int = 0, p=None):
return await get_image_preview(post_id, page if p is None else p)
@app.get("/api/thumb/{post_id}")
async def get_thumb(post_id: str):
return await get_image_thumb(post_id)
@app.delete("/api/search/{search_id}")
async def delete_search(search_id: str):
await turso_execute(
[
{
"sql": "DELETE FROM pi_searches WHERE id = ?",
"args": [{"type": "text", "value": search_id}],
}
]
)
return {"status": "deleted"}
@app.patch("/api/search/{search_id}")
async def rename_search(search_id: str, req: RenameRequest):
resp = await turso_execute(
[
{
"sql": "SELECT post_ids FROM pi_searches WHERE id = ?",
"args": [{"type": "text", "value": search_id}],
}
]
)
results = resp.get("results") or []
if not results or "response" not in results[0]:
return {"error": "not found"}
rows = results[0]["response"].get("result", {}).get("rows", [])
if not rows:
return {"error": "not found"}
post_ids_val = rows[0][0].get("value", "[]")
await turso_execute(
[
{
"sql": "DELETE FROM pi_searches WHERE id = ?",
"args": [{"type": "text", "value": search_id}],
},
{
"sql": "INSERT INTO pi_searches (id, post_ids) VALUES (?, ?)",
"args": [
{"type": "text", "value": req.new_id},
{"type": "text", "value": post_ids_val},
],
},
]
)
return {"status": "renamed", "new_id": req.new_id}
@app.get("/api/progress")
async def get_progress():
return [{"id": k, **v} for k, v in ACTIVE_TASKS.items()]
@app.get("/api/events")
async def events(request: Request):
queue = asyncio.Queue(maxsize=16)
TASK_EVENT_QUEUES.add(queue)
async def stream():
try:
while not await request.is_disconnected():
try:
data = await asyncio.wait_for(queue.get(), timeout=25)
yield f"data: {json.dumps(data)}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
TASK_EVENT_QUEUES.discard(queue)
return StreamingResponse(stream(), media_type="text/event-stream")
app.mount("/", StaticFiles(directory=FRONTEND_DIR, html=True), name="frontend")