import os import time import json import shutil import asyncio import re from typing import List from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse import google.generativeai as genai from google.api_core import exceptions as google_exceptions from pypdf import PdfReader, PdfWriter # --- IMPORT PROMPTS --- try: from prompts import prompts SYSTEM_PROMPT = prompts.get("VERIFY_BATCH", "") except ImportError: SYSTEM_PROMPT = "You are a Subtitle QA Specialist. Compare PDF vs SRT." app = FastAPI() TEMP_DIR = os.environ.get('TMPDIR', '/tmp/verifier_uploads') if not os.path.exists(TEMP_DIR): os.makedirs(TEMP_DIR) # --- 1. SRT SPLITTER --- def parse_and_split_srt(srt_content: str, chunk_size: int) -> List[str]: """ Parses SRT content and chunks it to match PDF pages. Assumption: 1 PDF Page = 1 SRT Block (or strict alignment). """ content = srt_content.replace('\r\n', '\n').replace('\r', '\n') raw_blocks = re.split(r'\n\s*\n', content.strip()) blocks = [b for b in raw_blocks if b.strip()] srt_chunks = [] total_blocks = len(blocks) for i in range(0, total_blocks, chunk_size): chunk_blocks = blocks[i : i + chunk_size] chunk_text = "\n\n".join(chunk_blocks) srt_chunks.append(chunk_text) return srt_chunks # --- 2. PDF SPLITTER --- def split_pdf_sync(pdf_path: str, chunk_size: int) -> List[str]: reader = PdfReader(pdf_path) total_pages = len(reader.pages) chunk_paths = [] base_name = os.path.splitext(os.path.basename(pdf_path))[0] for i in range(0, total_pages, chunk_size): writer = PdfWriter() end_page = min(i + chunk_size, total_pages) for page_num in range(i, end_page): writer.add_page(reader.pages[page_num]) chunk_name = f"{base_name}_part_{i}_{end_page}.pdf" chunk_path = os.path.join(TEMP_DIR, chunk_name) with open(chunk_path, "wb") as f: writer.write(f) chunk_paths.append(chunk_path) return chunk_paths # --- 3. WORKER LOGIC --- async def api_worker( worker_id: int, api_key: str, queue: asyncio.Queue, final_results: list, model_name: str ): genai.configure(api_key=api_key) model = genai.GenerativeModel(model_name=model_name) while True: try: # FIX: Use get_nowait() to stop correctly when empty queue_item = queue.get_nowait() except asyncio.QueueEmpty: break # Unpack tuple pdf_path, srt_chunk_content = queue_item filename = os.path.basename(pdf_path) print(f"[Worker {worker_id}] Processing: {filename}") try: # A. Upload file_ref = genai.upload_file(path=pdf_path) while file_ref.state.name == "PROCESSING": await asyncio.sleep(1) file_ref = genai.get_file(file_ref.name) if file_ref.state.name == "FAILED": raise Exception("Gemini File Processing Failed") # B. Generate prompt = f""" {SYSTEM_PROMPT} **TASK:** The attached PDF pages correspond EXACTLY to the SRT lines provided below. Verify the text in the PDF matches the text in the SRT. **MATCHING SRT CHUNK:** ``` {srt_chunk_content} ``` """ response = model.generate_content( [file_ref, prompt], generation_config={"response_mime_type": "application/json"} ) # C. Save Result response_json = json.loads(response.text) items = [] if isinstance(response_json, list): items = response_json elif "corrections" in response_json: items = response_json["corrections"] elif "items" in response_json: items = response_json["items"] final_results.extend(items) try: genai.delete_file(file_ref.name) except: pass # Clean up local file on success if os.path.exists(pdf_path): os.remove(pdf_path) except google_exceptions.ResourceExhausted: # OPTIMIZED 429 LOGIC print(f"[Worker {worker_id}] 429 Rate Limit. Re-queueing {filename}...") # 1. Put chunk back for another worker await queue.put(queue_item) # 2. Sleep THIS worker await asyncio.sleep(30) # 3. Skip task_done() because we re-queued continue except Exception as e: print(f"[Worker {worker_id}] Error: {e}") final_results.append({"error": f"Error in {filename}: {str(e)}"}) if os.path.exists(pdf_path): os.remove(pdf_path) queue.task_done() # --- 4. MAIN ENDPOINT --- @app.post("/verify_batch") async def verify_batch( pdf: UploadFile = File(...), srt: UploadFile = File(...), api_keys: str = Form(...), pages_per_request: int = Form(10), model_name: str = Form("gemini-1.5-flash") ): keys_list = [k.strip() for k in api_keys.split('\n') if k.strip()] if not keys_list: raise HTTPException(status_code=400, detail="No API Keys provided") session_id = str(int(time.time())) pdf_path = os.path.join(TEMP_DIR, f"{session_id}_{pdf.filename}") try: with open(pdf_path, "wb") as f: shutil.copyfileobj(pdf.file, f) full_srt_content = (await srt.read()).decode('utf-8') # Step 1: Split Both Files pdf_chunks = split_pdf_sync(pdf_path, pages_per_request) srt_chunks = parse_and_split_srt(full_srt_content, pages_per_request) # Step 2: Fill Queue queue = asyncio.Queue() limit = min(len(pdf_chunks), len(srt_chunks)) for i in range(limit): queue.put_nowait((pdf_chunks[i], srt_chunks[i])) # Step 3: Launch Workers final_results = [] workers = [] for i, key in enumerate(keys_list): worker_task = asyncio.create_task( api_worker(i, key, queue, final_results, model_name) ) workers.append(worker_task) # Step 4: Wait await asyncio.gather(*workers) # Step 5: Aggregate clean_corrections = [] system_errors = [] for item in final_results: if "error" in item: system_errors.append(item["error"]) else: clean_corrections.append(item) try: clean_corrections.sort(key=lambda x: int(x.get('id', 0)) if x.get('id') else 0) except: pass return { "status": "success", "total_chunks": limit, "corrections": clean_corrections, "system_errors": system_errors } except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)}) finally: if os.path.exists(pdf_path): os.remove(pdf_path) app.mount("/", StaticFiles(directory="static", html=True), name="static")