newtest / app.py
bigbossmonster's picture
Update app.py
ba37bc5 verified
import os
import time
import json
import shutil
import asyncio
import re
from typing import List
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
import google.generativeai as genai
from google.api_core import exceptions as google_exceptions
from pypdf import PdfReader, PdfWriter
# --- IMPORT PROMPTS ---
try:
from prompts import prompts
SYSTEM_PROMPT = prompts.get("VERIFY_BATCH", "")
except ImportError:
SYSTEM_PROMPT = "You are a Subtitle QA Specialist. Compare PDF vs SRT."
app = FastAPI()
TEMP_DIR = os.environ.get('TMPDIR', '/tmp/verifier_uploads')
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
# --- 1. SRT SPLITTER ---
def parse_and_split_srt(srt_content: str, chunk_size: int) -> List[str]:
"""
Parses SRT content and chunks it to match PDF pages.
Assumption: 1 PDF Page = 1 SRT Block (or strict alignment).
"""
content = srt_content.replace('\r\n', '\n').replace('\r', '\n')
raw_blocks = re.split(r'\n\s*\n', content.strip())
blocks = [b for b in raw_blocks if b.strip()]
srt_chunks = []
total_blocks = len(blocks)
for i in range(0, total_blocks, chunk_size):
chunk_blocks = blocks[i : i + chunk_size]
chunk_text = "\n\n".join(chunk_blocks)
srt_chunks.append(chunk_text)
return srt_chunks
# --- 2. PDF SPLITTER ---
def split_pdf_sync(pdf_path: str, chunk_size: int) -> List[str]:
reader = PdfReader(pdf_path)
total_pages = len(reader.pages)
chunk_paths = []
base_name = os.path.splitext(os.path.basename(pdf_path))[0]
for i in range(0, total_pages, chunk_size):
writer = PdfWriter()
end_page = min(i + chunk_size, total_pages)
for page_num in range(i, end_page):
writer.add_page(reader.pages[page_num])
chunk_name = f"{base_name}_part_{i}_{end_page}.pdf"
chunk_path = os.path.join(TEMP_DIR, chunk_name)
with open(chunk_path, "wb") as f:
writer.write(f)
chunk_paths.append(chunk_path)
return chunk_paths
# --- 3. WORKER LOGIC ---
async def api_worker(
worker_id: int,
api_key: str,
queue: asyncio.Queue,
final_results: list,
model_name: str
):
genai.configure(api_key=api_key)
model = genai.GenerativeModel(model_name=model_name)
while True:
try:
# FIX: Use get_nowait() to stop correctly when empty
queue_item = queue.get_nowait()
except asyncio.QueueEmpty:
break
# Unpack tuple
pdf_path, srt_chunk_content = queue_item
filename = os.path.basename(pdf_path)
print(f"[Worker {worker_id}] Processing: {filename}")
try:
# A. Upload
file_ref = genai.upload_file(path=pdf_path)
while file_ref.state.name == "PROCESSING":
await asyncio.sleep(1)
file_ref = genai.get_file(file_ref.name)
if file_ref.state.name == "FAILED":
raise Exception("Gemini File Processing Failed")
# B. Generate
prompt = f"""
{SYSTEM_PROMPT}
**TASK:**
The attached PDF pages correspond EXACTLY to the SRT lines provided below.
Verify the text in the PDF matches the text in the SRT.
**MATCHING SRT CHUNK:**
```
{srt_chunk_content}
```
"""
response = model.generate_content(
[file_ref, prompt],
generation_config={"response_mime_type": "application/json"}
)
# C. Save Result
response_json = json.loads(response.text)
items = []
if isinstance(response_json, list): items = response_json
elif "corrections" in response_json: items = response_json["corrections"]
elif "items" in response_json: items = response_json["items"]
final_results.extend(items)
try: genai.delete_file(file_ref.name)
except: pass
# Clean up local file on success
if os.path.exists(pdf_path):
os.remove(pdf_path)
except google_exceptions.ResourceExhausted:
# OPTIMIZED 429 LOGIC
print(f"[Worker {worker_id}] 429 Rate Limit. Re-queueing {filename}...")
# 1. Put chunk back for another worker
await queue.put(queue_item)
# 2. Sleep THIS worker
await asyncio.sleep(30)
# 3. Skip task_done() because we re-queued
continue
except Exception as e:
print(f"[Worker {worker_id}] Error: {e}")
final_results.append({"error": f"Error in {filename}: {str(e)}"})
if os.path.exists(pdf_path):
os.remove(pdf_path)
queue.task_done()
# --- 4. MAIN ENDPOINT ---
@app.post("/verify_batch")
async def verify_batch(
pdf: UploadFile = File(...),
srt: UploadFile = File(...),
api_keys: str = Form(...),
pages_per_request: int = Form(10),
model_name: str = Form("gemini-1.5-flash")
):
keys_list = [k.strip() for k in api_keys.split('\n') if k.strip()]
if not keys_list:
raise HTTPException(status_code=400, detail="No API Keys provided")
session_id = str(int(time.time()))
pdf_path = os.path.join(TEMP_DIR, f"{session_id}_{pdf.filename}")
try:
with open(pdf_path, "wb") as f:
shutil.copyfileobj(pdf.file, f)
full_srt_content = (await srt.read()).decode('utf-8')
# Step 1: Split Both Files
pdf_chunks = split_pdf_sync(pdf_path, pages_per_request)
srt_chunks = parse_and_split_srt(full_srt_content, pages_per_request)
# Step 2: Fill Queue
queue = asyncio.Queue()
limit = min(len(pdf_chunks), len(srt_chunks))
for i in range(limit):
queue.put_nowait((pdf_chunks[i], srt_chunks[i]))
# Step 3: Launch Workers
final_results = []
workers = []
for i, key in enumerate(keys_list):
worker_task = asyncio.create_task(
api_worker(i, key, queue, final_results, model_name)
)
workers.append(worker_task)
# Step 4: Wait
await asyncio.gather(*workers)
# Step 5: Aggregate
clean_corrections = []
system_errors = []
for item in final_results:
if "error" in item:
system_errors.append(item["error"])
else:
clean_corrections.append(item)
try:
clean_corrections.sort(key=lambda x: int(x.get('id', 0)) if x.get('id') else 0)
except:
pass
return {
"status": "success",
"total_chunks": limit,
"corrections": clean_corrections,
"system_errors": system_errors
}
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
finally:
if os.path.exists(pdf_path):
os.remove(pdf_path)
app.mount("/", StaticFiles(directory="static", html=True), name="static")