import os import threading import requests import shutil import time import re import logging from typing import List, Optional from fastapi import FastAPI, Query, BackgroundTasks, HTTPException from fastapi.responses import RedirectResponse, HTMLResponse from pydantic import BaseModel from db.database import get_session, init_db, DB_PATH, engine from db.config import sync_to_bucket, BUCKET_DIR, LOCAL_DIR, init_storage from db.models import Actress, ActressAlias, Video, VideoActress, Label, CrawlStatus from crawler.crawl import run_crawl, stop_crawl from crawler.cache import load_progress, clear_progress logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("sougouwiki-api") app = FastAPI(title="Sougouwiki Professional API", version="1.1.0") worker_lock = threading.Lock() master_thread: Optional[threading.Thread] = None def backfill_labels_task(): session = get_session() try: logger.info("Starting intelligent label backfill...") # Process in chunks. Don't use offset because updating label_id # removes records from the 'label_id == None' result set. limit = 5000 total_fixed = 0 while True: # Always take the top N records that still need fixing target_videos = session.query(Video).filter(Video.label_id == None).limit(limit).all() if not target_videos: break chunk_fixed = 0 for video in target_videos: if not video.title: continue match = re.search(r"[\uff08(](.+?)[\uff09)]", video.title) if not match: # Mark as 'checked' somehow or just ignore? # If we don't change label_id, it will stay in the set. # To prevent infinite loop on unmatchable titles, we can set label_id to a 'default' 0 if needed # but usually SeesaaWiki titles without brackets just don't have labels. # For now, we'll just skip them in this pass by ensuring we move the "window" # if no match is found, but that's hard without offset. # Better: If no match or no label found, we must mark it processed. continue raw_labels = match.group(1).strip() parts = [p.strip() for p in re.split(r"[//]", raw_labels)] best_label = None for part in parts: label = session.query(Label).filter_by(name=part).first() if label: best_label = label if best_label: video.label_id = best_label.id chunk_fixed += 1 else: # No known label found in the database for these strings # We should NOT leave label_id as None or we'll loop forever # Let's use a special "unlinked" state or just skip this logic's limitation pass session.commit() total_fixed += chunk_fixed # Since we can't easily mark "tried but failed" without a new column, # and offset is broken, we'll use a slightly different approach: # We'll process ALL videos in chunks regardless of label_id, but only once. break # Exit this loop and use the better one below except Exception as e: logger.error(f"Backfill error: {e}") session.rollback() finally: session.close() # Global state for monitoring backfill_status = {"is_running": False, "last_id": 0, "total_fixed": 0, "total_scanned": 0} from sqlalchemy import func def improved_backfill_task(): global backfill_status session = get_session() try: backfill_status["is_running"] = True logger.info("Starting improved full-table label scan...") batch_size = 5000 last_id = 0 total_fixed = 0 total_scanned = 0 # Diagnostic: print a few labels to log sample_labels = session.query(Label).limit(5).all() logger.info(f"DB Label samples: {[l.name for l in sample_labels]}") while True: videos = session.query(Video).filter(Video.id > last_id).order_by(Video.id).limit(batch_size).all() if not videos: break for video in videos: last_id = video.id total_scanned += 1 backfill_status["last_id"] = last_id backfill_status["total_scanned"] = total_scanned if video.label_id is not None: continue if not video.title: continue # More robust regex for any kind of brackets match = re.search(r"[\uff08\uff09\(\)\[\]\u3010\u3011](.+?)[\uff08\uff09\(\)\[\]\u3010\u3011]", video.title) if not match: # Try another one: anything at the end of the title in brackets match = re.search(r"[\uff08\(\[](.*?)[\uff09\)\ frontline]]", video.title) if not match: continue raw_content = match.group(1).strip() parts = [p.strip() for p in re.split(r"[//|]", raw_content)] best_label = None for part in parts: if not part: continue # Case-insensitive and whitespace-insensitive match l_obj = session.query(Label).filter(func.lower(Label.name) == func.lower(part)).first() if l_obj: best_label = l_obj break # Stop at the FIRST valid label (Manufacturer-first) if best_label: video.label_id = best_label.id total_fixed += 1 backfill_status["total_fixed"] = total_fixed session.commit() if total_scanned % 10000 == 0: logger.info(f"Scan progress: Scanned {total_scanned}, Fixed {total_fixed}") logger.info(f"Full scan finished. Total fixed: {total_fixed}") if total_fixed > 0: sync_to_bucket() except Exception as e: logger.error(f"Improved backfill error: {e}") session.rollback() finally: backfill_status["is_running"] = False session.close() @app.get("/api/admin/backfill/status", include_in_schema=False) def get_backfill_status(): return backfill_status @app.get("/api/admin/labels/search", include_in_schema=False) def search_label_samples(q: str = ""): session = get_session() try: query = session.query(Label) if q: query = query.filter(Label.name.like(f"%{q}%")) labels = query.limit(50).all() return [{"id": l.id, "name": l.name} for l in labels] finally: session.close() def master_background_worker(max_pages: int): global master_thread try: init_db() improved_backfill_task() logger.info(f"Starting crawler for {max_pages} pages...") run_crawl(max_pages=max_pages) sync_to_bucket() except Exception as e: logger.error(f"Worker error: {e}") finally: with worker_lock: master_thread = None def sync_heartbeat(): while True: time.sleep(120) sync_to_bucket() @app.on_event("startup") async def startup_event(): global master_thread init_storage() threading.Thread(target=sync_heartbeat, daemon=True).start() with worker_lock: if master_thread is None or not master_thread.is_alive(): master_thread = threading.Thread(target=master_background_worker, args=(342,)) master_thread.start() @app.get("/api/admin/db/export", include_in_schema=False) def export_database(): """Allows downloading the current production database file.""" if os.path.exists(DB_PATH): return FileResponse( DB_PATH, filename="sougouwiki_production.db", media_type="application/x-sqlite3" ) raise HTTPException(404, "Database file not found") @app.get("/", response_class=HTMLResponse, include_in_schema=False) def root_ui(): return """ Sougouwiki Hub

Sougouwiki Manager

Checking status...

Loading...
""" @app.get("/api/admin/crawl/status") def get_system_status(): session = get_session() try: return { "is_running": master_thread is not None and master_thread.is_alive(), "progress": load_progress(), "db_stats": { "pages": session.query(CrawlStatus).count(), "actresses": session.query(Actress).count(), "videos": session.query(Video).count() } } except Exception as e: return {"error": str(e)} finally: session.close() @app.post("/api/admin/crawl/start") def api_manual_start(max_pages: int = 342): global master_thread with worker_lock: if master_thread and master_thread.is_alive(): return {"message": "Already running"} master_thread = threading.Thread(target=master_background_worker, args=(max_pages,)) master_thread.start() return {"message": "Started"} @app.post("/api/admin/db/reset") def api_system_reset(): try: if os.path.exists(DB_PATH): os.remove(DB_PATH) bucket_db = os.path.join(BUCKET_DIR, "sougouwiki.db") if os.path.exists(bucket_db): os.remove(bucket_db) clear_progress() init_storage() init_db() return {"message": "Reset done"} except Exception as e: raise HTTPException(500, str(e)) @app.get("/api/actress") def search_actress(q: str = Query("")): session = get_session() try: if not q: return {"actresses": []} res = session.query(Actress).filter(Actress.name.like(f"%{q}%")).all() if not res: aliases = session.query(ActressAlias).filter(ActressAlias.alias_name.like(f"%{q}%")).all() res = list(set([a.actress for a in aliases if a.actress])) results = [] for a in res: videos = session.query(Video).join(VideoActress).filter(VideoActress.actress_id == a.id).all() alias_list = [al.alias_name for al in session.query(ActressAlias).filter_by(actress_id=a.id).all()] video_list = [] for v in sorted(videos, key=lambda x: x.release_date or "", reverse=True)[:20]: video_list.append({"dvd_id": v.dvd_id, "title": v.title}) results.append({ "id": a.id, "name": a.name, "name_kana": a.name_kana, "height": a.height, "bust": a.bust, "waist": a.waist, "hip": a.hip, "aliases": alias_list, "video_count": len(videos), "videos": video_list }) return {"actresses": results} finally: session.close() @app.get("/api/video") def search_video(dvd_id: str = Query(""), q: str = Query("")): session = get_session() try: query_val = dvd_id or q if not query_val: return {"videos": []} v_list = session.query(Video).filter(Video.dvd_id.like(f"%{query_val}%")).all() if not v_list: v_list = session.query(Video).filter(Video.title.like(f"%{query_val}%")).all() results = [] for v in v_list: va_rows = session.query(VideoActress, Actress).join(Actress, VideoActress.actress_id == Actress.id).filter(VideoActress.video_id == v.id).all() label = session.query(Label).filter_by(id=v.label_id).first() results.append({ "dvd_id": v.dvd_id, "title": v.title, "release_date": v.release_date, "cover_url": v.cover_url, "dmm_url": v.dmm_url, "prefix": v.prefix, "label": label.name if label else None, "actresses": [{"id": a.id, "name": a.name, "role_name": va.role_name} for va, a in va_rows] }) return {"videos": results} finally: session.close()