Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import json | |
| import shutil | |
| import asyncio | |
| import re | |
| from typing import List | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import JSONResponse | |
| import google.generativeai as genai | |
| from google.api_core import exceptions as google_exceptions | |
| from pypdf import PdfReader, PdfWriter | |
| # --- IMPORT PROMPTS --- | |
| try: | |
| from prompts import prompts | |
| SYSTEM_PROMPT = prompts.get("VERIFY_BATCH", "") | |
| except ImportError: | |
| SYSTEM_PROMPT = "You are a Subtitle QA Specialist. Compare PDF vs SRT." | |
| app = FastAPI() | |
| TEMP_DIR = os.environ.get('TMPDIR', '/tmp/verifier_uploads') | |
| if not os.path.exists(TEMP_DIR): | |
| os.makedirs(TEMP_DIR) | |
| # --- 1. SRT SPLITTER --- | |
| def parse_and_split_srt(srt_content: str, chunk_size: int) -> List[str]: | |
| """ | |
| Parses SRT content and chunks it to match PDF pages. | |
| Assumption: 1 PDF Page = 1 SRT Block (or strict alignment). | |
| """ | |
| content = srt_content.replace('\r\n', '\n').replace('\r', '\n') | |
| raw_blocks = re.split(r'\n\s*\n', content.strip()) | |
| blocks = [b for b in raw_blocks if b.strip()] | |
| srt_chunks = [] | |
| total_blocks = len(blocks) | |
| for i in range(0, total_blocks, chunk_size): | |
| chunk_blocks = blocks[i : i + chunk_size] | |
| chunk_text = "\n\n".join(chunk_blocks) | |
| srt_chunks.append(chunk_text) | |
| return srt_chunks | |
| # --- 2. PDF SPLITTER --- | |
| def split_pdf_sync(pdf_path: str, chunk_size: int) -> List[str]: | |
| reader = PdfReader(pdf_path) | |
| total_pages = len(reader.pages) | |
| chunk_paths = [] | |
| base_name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| for i in range(0, total_pages, chunk_size): | |
| writer = PdfWriter() | |
| end_page = min(i + chunk_size, total_pages) | |
| for page_num in range(i, end_page): | |
| writer.add_page(reader.pages[page_num]) | |
| chunk_name = f"{base_name}_part_{i}_{end_page}.pdf" | |
| chunk_path = os.path.join(TEMP_DIR, chunk_name) | |
| with open(chunk_path, "wb") as f: | |
| writer.write(f) | |
| chunk_paths.append(chunk_path) | |
| return chunk_paths | |
| # --- 3. WORKER LOGIC --- | |
| async def api_worker( | |
| worker_id: int, | |
| api_key: str, | |
| queue: asyncio.Queue, | |
| final_results: list, | |
| model_name: str | |
| ): | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel(model_name=model_name) | |
| while True: | |
| try: | |
| # FIX: Use get_nowait() to stop correctly when empty | |
| queue_item = queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| break | |
| # Unpack tuple | |
| pdf_path, srt_chunk_content = queue_item | |
| filename = os.path.basename(pdf_path) | |
| print(f"[Worker {worker_id}] Processing: {filename}") | |
| try: | |
| # A. Upload | |
| file_ref = genai.upload_file(path=pdf_path) | |
| while file_ref.state.name == "PROCESSING": | |
| await asyncio.sleep(1) | |
| file_ref = genai.get_file(file_ref.name) | |
| if file_ref.state.name == "FAILED": | |
| raise Exception("Gemini File Processing Failed") | |
| # B. Generate | |
| prompt = f""" | |
| {SYSTEM_PROMPT} | |
| **TASK:** | |
| The attached PDF pages correspond EXACTLY to the SRT lines provided below. | |
| Verify the text in the PDF matches the text in the SRT. | |
| **MATCHING SRT CHUNK:** | |
| ``` | |
| {srt_chunk_content} | |
| ``` | |
| """ | |
| response = model.generate_content( | |
| [file_ref, prompt], | |
| generation_config={"response_mime_type": "application/json"} | |
| ) | |
| # C. Save Result | |
| response_json = json.loads(response.text) | |
| items = [] | |
| if isinstance(response_json, list): items = response_json | |
| elif "corrections" in response_json: items = response_json["corrections"] | |
| elif "items" in response_json: items = response_json["items"] | |
| final_results.extend(items) | |
| try: genai.delete_file(file_ref.name) | |
| except: pass | |
| # Clean up local file on success | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| except google_exceptions.ResourceExhausted: | |
| # OPTIMIZED 429 LOGIC | |
| print(f"[Worker {worker_id}] 429 Rate Limit. Re-queueing {filename}...") | |
| # 1. Put chunk back for another worker | |
| await queue.put(queue_item) | |
| # 2. Sleep THIS worker | |
| await asyncio.sleep(30) | |
| # 3. Skip task_done() because we re-queued | |
| continue | |
| except Exception as e: | |
| print(f"[Worker {worker_id}] Error: {e}") | |
| final_results.append({"error": f"Error in {filename}: {str(e)}"}) | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| queue.task_done() | |
| # --- 4. MAIN ENDPOINT --- | |
| async def verify_batch( | |
| pdf: UploadFile = File(...), | |
| srt: UploadFile = File(...), | |
| api_keys: str = Form(...), | |
| pages_per_request: int = Form(10), | |
| model_name: str = Form("gemini-1.5-flash") | |
| ): | |
| keys_list = [k.strip() for k in api_keys.split('\n') if k.strip()] | |
| if not keys_list: | |
| raise HTTPException(status_code=400, detail="No API Keys provided") | |
| session_id = str(int(time.time())) | |
| pdf_path = os.path.join(TEMP_DIR, f"{session_id}_{pdf.filename}") | |
| try: | |
| with open(pdf_path, "wb") as f: | |
| shutil.copyfileobj(pdf.file, f) | |
| full_srt_content = (await srt.read()).decode('utf-8') | |
| # Step 1: Split Both Files | |
| pdf_chunks = split_pdf_sync(pdf_path, pages_per_request) | |
| srt_chunks = parse_and_split_srt(full_srt_content, pages_per_request) | |
| # Step 2: Fill Queue | |
| queue = asyncio.Queue() | |
| limit = min(len(pdf_chunks), len(srt_chunks)) | |
| for i in range(limit): | |
| queue.put_nowait((pdf_chunks[i], srt_chunks[i])) | |
| # Step 3: Launch Workers | |
| final_results = [] | |
| workers = [] | |
| for i, key in enumerate(keys_list): | |
| worker_task = asyncio.create_task( | |
| api_worker(i, key, queue, final_results, model_name) | |
| ) | |
| workers.append(worker_task) | |
| # Step 4: Wait | |
| await asyncio.gather(*workers) | |
| # Step 5: Aggregate | |
| clean_corrections = [] | |
| system_errors = [] | |
| for item in final_results: | |
| if "error" in item: | |
| system_errors.append(item["error"]) | |
| else: | |
| clean_corrections.append(item) | |
| try: | |
| clean_corrections.sort(key=lambda x: int(x.get('id', 0)) if x.get('id') else 0) | |
| except: | |
| pass | |
| return { | |
| "status": "success", | |
| "total_chunks": limit, | |
| "corrections": clean_corrections, | |
| "system_errors": system_errors | |
| } | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| finally: | |
| if os.path.exists(pdf_path): | |
| os.remove(pdf_path) | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") |