agentsay's picture
Update app.py
2295e56 verified
import fitz # PyMuPDF
import pytesseract
import camelot
import re
import shutil
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from pdf2image import convert_from_path
import os
import uuid
import tempfile
from typing import List
from loguru import logger
import logging
# Configure logging
logger.add("app.log", rotation="500 MB", level="INFO")
app = FastAPI(title="Policy Verification API", version="1.0.0")
# ========== Core Utilities ==========
def extract_text_from_pdf(pdf_path: str) -> str:
"""
Extracts text from a PDF using PyMuPDF; falls back to OCR if text is insufficient.
Args:
pdf_path (str): Path to the PDF file.
Returns:
str: Extracted text.
Raises:
RuntimeError: If text extraction or OCR fails.
"""
try:
logger.info(f"Extracting text from PDF: {pdf_path}")
doc = fitz.open(pdf_path)
text = "".join(page.get_text() for page in doc)
doc.close()
if len(text.strip()) < 50:
logger.warning("Insufficient text extracted, falling back to OCR")
text = ocr_pdf(pdf_path)
return text
except Exception as e:
logger.error(f"Failed to extract text from PDF: {e}")
raise RuntimeError(f"Error reading PDF with PyMuPDF: {e}")
def ocr_pdf(pdf_path: str) -> str:
"""
Performs OCR on a PDF using Tesseract and pdf2image.
Args:
pdf_path (str): Path to the PDF file.
Returns:
str: OCR-extracted text.
Raises:
RuntimeError: If OCR process fails.
"""
try:
logger.info(f"Performing OCR on PDF: {pdf_path}")
images = convert_from_path(pdf_path)
text = "".join(pytesseract.image_to_string(img) for img in images)
logger.info("OCR completed successfully")
return text
except Exception as e:
logger.error(f"OCR failed: {e}")
raise RuntimeError(f"OCR failed: {e}")
def find_uins(text: str) -> List[str]:
"""
Finds UINs matching IRDA format in the provided text.
Args:
text (str): Text to search for UINs.
Returns:
List[str]: List of unique, normalized UINs.
"""
pattern = r"UIN[:\-]?\s*(IRDAN\d+\s*[A-Z0-9]+V\d{2,})"
matches = re.findall(pattern, text, flags=re.IGNORECASE)
normalized = [re.sub(r"\s+", "", m.strip()) for m in matches]
unique_uins = list(dict.fromkeys(normalized))
logger.info(f"Found {len(unique_uins)} unique UINs")
return unique_uins
def extract_uin_list_from_pdf_db(pdf_db_path: str) -> List[str]:
"""
Extracts UINs from a database-like PDF using Camelot.
Args:
pdf_db_path (str): Path to the database PDF file.
Returns:
List[str]: List of unique, normalized UINs.
Raises:
RuntimeError: If UIN extraction fails.
"""
try:
logger.info(f"Extracting UINs from DB PDF: {pdf_db_path}")
tables = camelot.read_pdf(pdf_db_path, pages='all', flavor='stream')
all_uins = []
for table in tables:
df = table.df
for row in df.values:
for cell in row:
matches = re.findall(r"IRDAN\d+\s*[A-Z0-9]+V\d{2,}", str(cell), flags=re.IGNORECASE)
all_uins.extend(re.sub(r"\s+", "", match.strip()) for match in matches)
unique_uins = list(dict.fromkeys(all_uins))
logger.info(f"Extracted {len(unique_uins)} unique UINs from DB")
return unique_uins
except Exception as e:
logger.error(f"Error extracting UINs from DB PDF: {e}")
raise RuntimeError(f"Error extracting UINs from DB PDF: {e}")
def match_found(policy_uins: List[str], db_uins: List[str]) -> bool:
"""
Checks if any policy UIN exists in the database UINs.
Args:
policy_uins (List[str]): List of policy UINs.
db_uins (List[str]): List of database UINs.
Returns:
bool: True if a match is found, False otherwise.
"""
db_set = {u.replace(" ", "").replace("-", "").lower() for u in db_uins}
for u in policy_uins:
normalized = u.replace(" ", "").replace("-", "").lower()
if normalized in db_set:
logger.info(f"UIN match found: {normalized}")
return True
logger.info("No UIN matches found")
return False
# ========== FastAPI Endpoint ==========
@app.post("/verify-policy")
async def verify_policy(
policy_file: UploadFile = File(...)
):
"""
Verifies if a policy PDF's UIN exists in a database PDF.
"""
logger.info("Received policy verification request")
db_file_path = "/app/list-of-products.pdf"
# Validate uploaded policy file
if not policy_file.filename.endswith(".pdf"):
logger.error("Invalid file type for policy")
raise HTTPException(status_code=400, detail="Policy file must be a PDF.")
if not os.path.exists(db_file_path):
logger.error(f"Database PDF file not found at {db_file_path}")
raise HTTPException(status_code=500, detail="Database file missing on server.")
try:
with tempfile.TemporaryDirectory() as tmpdirname:
# Save uploaded policy file
policy_path = os.path.join(tmpdirname, f"{uuid.uuid4()}_policy.pdf")
logger.info(f"Saving policy file to: {policy_path}")
with open(policy_path, "wb") as f:
shutil.copyfileobj(policy_file.file, f)
# Use static DB file path directly
logger.info(f"Using DB file from: {db_file_path}")
policy_text = extract_text_from_pdf(policy_path)
policy_uins = find_uins(policy_text)
db_uins = extract_uin_list_from_pdf_db(db_file_path)
is_matched = match_found(policy_uins, db_uins)
return JSONResponse(
status_code=200,
content={
"match": is_matched,
"policy-text":policy_text,
}
)
except Exception as e:
logger.error(f"Verification failed: {e}")
raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}")
# ========== Run Application ==========
if __name__ == "__main__":
import uvicorn
logger.info("Starting FastAPI server")
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)