| import os |
| import threading |
| import requests |
| import shutil |
| import time |
| import re |
| import logging |
| from typing import List, Optional |
| from fastapi import FastAPI, Query, BackgroundTasks, HTTPException |
| from fastapi.responses import RedirectResponse, HTMLResponse |
| from pydantic import BaseModel |
|
|
| from db.database import get_session, init_db, DB_PATH, engine |
| from db.config import sync_to_bucket, BUCKET_DIR, LOCAL_DIR, init_storage |
| from db.models import Actress, ActressAlias, Video, VideoActress, Label, CrawlStatus |
| from crawler.crawl import run_crawl, stop_crawl |
| from crawler.cache import load_progress, clear_progress |
|
|
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger("sougouwiki-api") |
|
|
| app = FastAPI(title="Sougouwiki Professional API", version="1.1.0") |
|
|
| worker_lock = threading.Lock() |
| master_thread: Optional[threading.Thread] = None |
|
|
| def backfill_labels_task(): |
| session = get_session() |
| try: |
| logger.info("Starting intelligent label backfill...") |
| |
| |
| limit = 5000 |
| total_fixed = 0 |
| |
| while True: |
| |
| target_videos = session.query(Video).filter(Video.label_id == None).limit(limit).all() |
| if not target_videos: break |
| |
| chunk_fixed = 0 |
| for video in target_videos: |
| if not video.title: continue |
| match = re.search(r"[\uff08(](.+?)[\uff09)]", video.title) |
| if not match: |
| |
| |
| |
| |
| |
| |
| |
| continue |
| |
| raw_labels = match.group(1).strip() |
| parts = [p.strip() for p in re.split(r"[//]", raw_labels)] |
| |
| best_label = None |
| for part in parts: |
| label = session.query(Label).filter_by(name=part).first() |
| if label: |
| best_label = label |
| |
| if best_label: |
| video.label_id = best_label.id |
| chunk_fixed += 1 |
| else: |
| |
| |
| |
| pass |
| |
| session.commit() |
| total_fixed += chunk_fixed |
| |
| |
| |
| |
| break |
| |
| except Exception as e: |
| logger.error(f"Backfill error: {e}") |
| session.rollback() |
| finally: session.close() |
|
|
| |
| backfill_status = {"is_running": False, "last_id": 0, "total_fixed": 0, "total_scanned": 0} |
|
|
| from sqlalchemy import func |
|
|
| def improved_backfill_task(): |
| global backfill_status |
| session = get_session() |
| try: |
| backfill_status["is_running"] = True |
| logger.info("Starting improved full-table label scan...") |
| batch_size = 5000 |
| last_id = 0 |
| total_fixed = 0 |
| total_scanned = 0 |
| |
| |
| sample_labels = session.query(Label).limit(5).all() |
| logger.info(f"DB Label samples: {[l.name for l in sample_labels]}") |
| |
| while True: |
| videos = session.query(Video).filter(Video.id > last_id).order_by(Video.id).limit(batch_size).all() |
| if not videos: break |
| |
| for video in videos: |
| last_id = video.id |
| total_scanned += 1 |
| backfill_status["last_id"] = last_id |
| backfill_status["total_scanned"] = total_scanned |
| |
| if video.label_id is not None: continue |
| if not video.title: continue |
| |
| |
| match = re.search(r"[\uff08\uff09\(\)\[\]\u3010\u3011](.+?)[\uff08\uff09\(\)\[\]\u3010\u3011]", video.title) |
| if not match: |
| |
| match = re.search(r"[\uff08\(\[](.*?)[\uff09\)\ frontline]]", video.title) |
| |
| if not match: continue |
| |
| raw_content = match.group(1).strip() |
| parts = [p.strip() for p in re.split(r"[//|]", raw_content)] |
| |
| best_label = None |
| for part in parts: |
| if not part: continue |
| |
| l_obj = session.query(Label).filter(func.lower(Label.name) == func.lower(part)).first() |
| if l_obj: |
| best_label = l_obj |
| break |
| |
| if best_label: |
| video.label_id = best_label.id |
| total_fixed += 1 |
| backfill_status["total_fixed"] = total_fixed |
| |
| session.commit() |
| if total_scanned % 10000 == 0: |
| logger.info(f"Scan progress: Scanned {total_scanned}, Fixed {total_fixed}") |
| |
| logger.info(f"Full scan finished. Total fixed: {total_fixed}") |
| if total_fixed > 0: sync_to_bucket() |
| except Exception as e: |
| logger.error(f"Improved backfill error: {e}") |
| session.rollback() |
| finally: |
| backfill_status["is_running"] = False |
| session.close() |
|
|
| @app.get("/api/admin/backfill/status", include_in_schema=False) |
| def get_backfill_status(): |
| return backfill_status |
|
|
| @app.get("/api/admin/labels/search", include_in_schema=False) |
| def search_label_samples(q: str = ""): |
| session = get_session() |
| try: |
| query = session.query(Label) |
| if q: query = query.filter(Label.name.like(f"%{q}%")) |
| labels = query.limit(50).all() |
| return [{"id": l.id, "name": l.name} for l in labels] |
| finally: session.close() |
|
|
| def master_background_worker(max_pages: int): |
| global master_thread |
| try: |
| init_db() |
| improved_backfill_task() |
| logger.info(f"Starting crawler for {max_pages} pages...") |
| run_crawl(max_pages=max_pages) |
| sync_to_bucket() |
| except Exception as e: logger.error(f"Worker error: {e}") |
| finally: |
| with worker_lock: master_thread = None |
|
|
| def sync_heartbeat(): |
| while True: |
| time.sleep(120) |
| sync_to_bucket() |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| global master_thread |
| init_storage() |
| threading.Thread(target=sync_heartbeat, daemon=True).start() |
| with worker_lock: |
| if master_thread is None or not master_thread.is_alive(): |
| master_thread = threading.Thread(target=master_background_worker, args=(342,)) |
| master_thread.start() |
|
|
| @app.get("/api/admin/db/export", include_in_schema=False) |
| def export_database(): |
| """Allows downloading the current production database file.""" |
| if os.path.exists(DB_PATH): |
| return FileResponse( |
| DB_PATH, |
| filename="sougouwiki_production.db", |
| media_type="application/x-sqlite3" |
| ) |
| raise HTTPException(404, "Database file not found") |
|
|
| @app.get("/", response_class=HTMLResponse, include_in_schema=False) |
| def root_ui(): |
| return """ |
| <html> |
| <head><title>Sougouwiki Hub</title></head> |
| <body style="font-family:sans-serif; padding:40px; background:#f6f8fa;"> |
| <h1>Sougouwiki Manager</h1> |
| <div style="background:white; border:1px solid #e1e4e8; padding:20px; border-radius:6px;"> |
| <p id="st">Checking status...</p> |
| <button onclick="fetch('/api/admin/crawl/start', {method:'POST'})">Restart</button> |
| <div id="stats" style="margin-top:20px;"><pre>Loading...</pre></div> |
| </div> |
| <script> |
| async function update() { |
| const r = await fetch('/api/admin/crawl/status'); |
| const d = await r.json(); |
| document.getElementById('st').innerText = d.is_running ? '🟢 Running' : '💤 Idle'; |
| document.getElementById('stats').innerHTML = `<pre>${JSON.stringify(d, null, 2)}</pre>`; |
| } |
| setInterval(update, 5000); update(); |
| </script> |
| </body> |
| </html> |
| """ |
|
|
| @app.get("/api/admin/crawl/status") |
| def get_system_status(): |
| session = get_session() |
| try: |
| return { |
| "is_running": master_thread is not None and master_thread.is_alive(), |
| "progress": load_progress(), |
| "db_stats": { |
| "pages": session.query(CrawlStatus).count(), |
| "actresses": session.query(Actress).count(), |
| "videos": session.query(Video).count() |
| } |
| } |
| except Exception as e: return {"error": str(e)} |
| finally: session.close() |
|
|
| @app.post("/api/admin/crawl/start") |
| def api_manual_start(max_pages: int = 342): |
| global master_thread |
| with worker_lock: |
| if master_thread and master_thread.is_alive(): return {"message": "Already running"} |
| master_thread = threading.Thread(target=master_background_worker, args=(max_pages,)) |
| master_thread.start() |
| return {"message": "Started"} |
|
|
| @app.post("/api/admin/db/reset") |
| def api_system_reset(): |
| try: |
| if os.path.exists(DB_PATH): os.remove(DB_PATH) |
| bucket_db = os.path.join(BUCKET_DIR, "sougouwiki.db") |
| if os.path.exists(bucket_db): os.remove(bucket_db) |
| clear_progress() |
| init_storage() |
| init_db() |
| return {"message": "Reset done"} |
| except Exception as e: raise HTTPException(500, str(e)) |
|
|
| @app.get("/api/actress") |
| def search_actress(q: str = Query("")): |
| session = get_session() |
| try: |
| if not q: return {"actresses": []} |
| res = session.query(Actress).filter(Actress.name.like(f"%{q}%")).all() |
| if not res: |
| aliases = session.query(ActressAlias).filter(ActressAlias.alias_name.like(f"%{q}%")).all() |
| res = list(set([a.actress for a in aliases if a.actress])) |
| |
| results = [] |
| for a in res: |
| videos = session.query(Video).join(VideoActress).filter(VideoActress.actress_id == a.id).all() |
| alias_list = [al.alias_name for al in session.query(ActressAlias).filter_by(actress_id=a.id).all()] |
| video_list = [] |
| for v in sorted(videos, key=lambda x: x.release_date or "", reverse=True)[:20]: |
| video_list.append({"dvd_id": v.dvd_id, "title": v.title}) |
| |
| results.append({ |
| "id": a.id, "name": a.name, "name_kana": a.name_kana, |
| "height": a.height, "bust": a.bust, "waist": a.waist, "hip": a.hip, |
| "aliases": alias_list, |
| "video_count": len(videos), |
| "videos": video_list |
| }) |
| return {"actresses": results} |
| finally: session.close() |
|
|
| @app.get("/api/video") |
| def search_video(dvd_id: str = Query(""), q: str = Query("")): |
| session = get_session() |
| try: |
| query_val = dvd_id or q |
| if not query_val: return {"videos": []} |
| v_list = session.query(Video).filter(Video.dvd_id.like(f"%{query_val}%")).all() |
| if not v_list: v_list = session.query(Video).filter(Video.title.like(f"%{query_val}%")).all() |
| |
| results = [] |
| for v in v_list: |
| va_rows = session.query(VideoActress, Actress).join(Actress, VideoActress.actress_id == Actress.id).filter(VideoActress.video_id == v.id).all() |
| label = session.query(Label).filter_by(id=v.label_id).first() |
| results.append({ |
| "dvd_id": v.dvd_id, "title": v.title, "release_date": v.release_date, |
| "cover_url": v.cover_url, "dmm_url": v.dmm_url, "prefix": v.prefix, |
| "label": label.name if label else None, |
| "actresses": [{"id": a.id, "name": a.name, "role_name": va.role_name} for va, a in va_rows] |
| }) |
| return {"videos": results} |
| finally: session.close() |
|
|