# main.py - FastAPI Web Service for Advanced Multi-Language OCR # Integrates main6_pix2text.py functionality with REST API endpoints import os import json import shutil from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List import uvicorn from fastapi import FastAPI, File, UploadFile, HTTPException, status from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field # Import our OCR functionality from main6_pix2text import extract_all_text_advanced_pix2text, initialize_pix2text # Import evaluation functionality from eval import evaluate_ocr_accuracy, clean_control_characters # Initialize FastAPI app app = FastAPI( title="Advanced Multi-Language OCR API", description=""" 🔍 **Advanced OCR System for Multi-Language Text Extraction** This API provides sophisticated text extraction from PDF documents containing: - **English** text - **Bangla** (Bengali) text - **Mathematical expressions** and formulas ## Features - Upload PDF files for processing - Intelligent content classification - Pix2Text integration for advanced math extraction - Character-by-character analysis - Comprehensive extraction reports ## Usage 1. Upload a PDF using the `/extract` endpoint 2. Get extracted text with detailed analysis 3. Files are saved with organized naming convention """, version="1.0.0", contact={ "name": "Advanced OCR System", "url": "https://github.com/ashfaqbracu/aaladinai", }, ) # Add CORS middleware with proper encoding support app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Exception handler for JSON decode errors @app.exception_handler(422) async def json_decode_error_handler(request, exc): """ Handle JSON decode errors that occur due to control characters in request body. """ if "JSON decode error" in str(exc.detail): return JSONResponse( status_code=400, content={ "error": "Invalid characters in request", "message": "The request contains invalid control characters. Please ensure your text data is properly encoded and does not contain control characters.", "suggestion": "Try cleaning your input text to remove any control characters before sending the request.", "details": str(exc.detail) if hasattr(exc, "detail") else str(exc), }, ) # For other 422 errors, return the original error raise exc # Create directories def create_directories(): """Create necessary directories for file storage.""" directories = ["documents", "extracted"] for directory in directories: Path(directory).mkdir(exist_ok=True) print(f"✅ Created/verified directory: {directory}") # Initialize directories on startup create_directories() # Pydantic models for API responses class ExtractionResult(BaseModel): """Model for OCR extraction results.""" success: bool = Field(description="Whether the extraction was successful") filename: str = Field(description="Name of the processed file") extracted_text: str = Field(description="Full extracted text content") text_file_path: str = Field(description="Path to the saved text file") json_file_path: str = Field(description="Path to the detailed JSON results") analysis_file_path: str = Field(description="Path to the analysis report") extraction_summary: Dict[str, Any] = Field( description="Summary of extraction statistics" ) processing_time: float = Field(description="Time taken for processing (seconds)") error_message: Optional[str] = Field( default=None, description="Error message if extraction failed" ) class HealthCheck(BaseModel): """Model for health check response.""" status: str = Field(description="Service status") message: str = Field(description="Status message") pix2text_available: bool = Field(description="Whether Pix2Text is available") directories: Dict[str, bool] = Field(description="Directory availability status") class FileInfo(BaseModel): """Model for file information.""" filename: str = Field(description="Name of the file") size: int = Field(description="File size in bytes") content_type: str = Field(description="MIME content type") upload_time: str = Field(description="Upload timestamp") class EvaluationRequest(BaseModel): """Model for OCR evaluation request with optional name.""" evaluation_name: Optional[str] = Field( default=None, description="Optional name for the evaluation" ) class EvaluationResult(BaseModel): """Model for OCR evaluation results.""" success: bool = Field(description="Whether the evaluation was successful") evaluation_name: Optional[str] = Field( default=None, description="Name of the evaluation" ) overall_accuracy: float = Field(description="Overall accuracy score (0-100)") similarity_score: float = Field(description="Text similarity score (0-100)") character_metrics: Dict[str, Any] = Field( description="Character-level accuracy metrics" ) word_metrics: Dict[str, Any] = Field(description="Word-level accuracy metrics") line_metrics: Dict[str, Any] = Field(description="Line-level accuracy metrics") language_specific: Dict[str, Any] = Field( description="Language-specific accuracy metrics" ) text_statistics: Dict[str, Any] = Field(description="Text comparison statistics") detailed_diff: List[Dict[str, str]] = Field( description="Detailed diff showing changes" ) evaluation_summary: Dict[str, Any] = Field( description="Summary with grade and recommendations" ) processing_time: float = Field(description="Time taken for evaluation (seconds)") error_message: Optional[str] = Field( default=None, description="Error message if evaluation failed" ) # Utility functions def get_safe_filename(filename: str) -> str: """Generate a safe filename with timestamp.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") name, ext = os.path.splitext(filename) # Remove special characters and replace spaces safe_name = "".join(c for c in name if c.isalnum() or c in ("-", "_")).rstrip() return f"{safe_name}_{timestamp}{ext}" def get_extraction_filename(pdf_filename: str, file_type: str) -> str: """Generate extraction filename with convention: [pdf_filename]_extract.[extension]""" base_name = os.path.splitext(pdf_filename)[0] extensions = {"txt": "txt", "json": "json", "analysis": "json"} return f"{base_name}_extract.{extensions.get(file_type, 'txt')}" # API Routes @app.get("/", response_class=HTMLResponse, tags=["Main"]) async def root(): """ 🏠 **Main Route - Welcome Page** Returns a welcome page with information about the OCR API service. """ html_content = """ Advanced Multi-Language OCR API

🔍 Advanced Multi-Language OCR API

Welcome to the most advanced OCR system for extracting text from PDFs containing mixed languages and mathematical expressions!

🌐 Multi-Language

English + Bangla + Math

🧠 AI-Powered

Pix2Text Integration

📊 Detailed Analysis

Character-level Classification

🚀 Key Features

📄 PDF Processing: Upload PDFs and get comprehensive text extraction
🔤 Language Detection: Automatic classification of English, Bangla, and mathematical content
🧮 Mathematical Expressions: Advanced LaTeX and formula recognition using Pix2Text
📈 Detailed Reports: Character analysis, confidence scores, and extraction statistics
💾 Organized Storage: Automatic file organization with clear naming conventions
📊 Accuracy Evaluation: Upload text files to compare OCR results with baseline and get detailed accuracy metrics

📋 API Endpoints

GET /: This welcome page

GET /health: Service health status

POST /extract: Upload PDF and extract text

POST /eval: Upload two text files to evaluate OCR accuracy

🔗 Quick Actions

📖 API Documentation (Swagger) 📚 Alternative Docs (ReDoc) 🏥 Health Check

Advanced Multi-Language OCR System
Powered by Tesseract, Pix2Text, and FastAPI
GitHub Repository

""" return html_content @app.get("/health", response_model=HealthCheck, tags=["System"]) async def health_check(): """ 🏥 **Health Check** Check the health status of the OCR service and its dependencies. """ try: # Check if Pix2Text is available pix2text_model = initialize_pix2text() pix2text_available = pix2text_model is not None # Check directories directories_status = { "documents": Path("documents").exists(), "extracted": Path("extracted").exists(), } # Create directories if they don't exist for directory in directories_status: if not directories_status[directory]: Path(directory).mkdir(exist_ok=True) directories_status[directory] = True return HealthCheck( status="healthy", message="OCR service is running and ready to process files", pix2text_available=pix2text_available, directories=directories_status, ) except Exception as e: return HealthCheck( status="unhealthy", message=f"Service health check failed: {str(e)}", pix2text_available=False, directories={"documents": False, "extracted": False}, ) @app.post("/extract", response_model=ExtractionResult, tags=["OCR Processing"]) async def extract_text_from_pdf( file: UploadFile = File( ..., description="PDF file to extract text from", media_type="application/pdf" ), ): """ 📄 **Extract Text from PDF** Upload a PDF file and extract text using advanced multi-language OCR. ## Process: 1. **Upload**: PDF is saved to `documents/` folder 2. **Processing**: Advanced OCR with language detection 3. **Extraction**: Text, JSON, and analysis files generated 4. **Storage**: Results saved to `extracted/` folder with naming convention ## Naming Convention: - Input: `document.pdf` → Output: `document_extract.txt`, `document_extract.json` ## Returns: - Complete extracted text - Detailed extraction metadata - Processing statistics and analysis """ start_time = datetime.now() try: # Validate file type if not file.content_type == "application/pdf": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid file type. Expected PDF, got {file.content_type}", ) if not file.filename.lower().endswith(".pdf"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="File must have .pdf extension", ) # Generate safe filename and save uploaded file safe_filename = get_safe_filename(file.filename) documents_path = Path("documents") / safe_filename # Save uploaded file with open(documents_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) print(f"📁 File saved to: {documents_path}") # Generate output filenames using naming convention text_filename = get_extraction_filename(safe_filename, "txt") json_filename = get_extraction_filename(safe_filename, "json") analysis_filename = get_extraction_filename(safe_filename, "analysis") # Create full paths for extracted files text_path = Path("extracted") / text_filename json_path = Path("extracted") / json_filename analysis_path = Path("extracted") / analysis_filename print(f"🔄 Starting OCR processing for: {safe_filename}") # Process the PDF using our advanced OCR system extract_all_text_advanced_pix2text( pdf_path=str(documents_path), output_text_file=str(text_path), output_json_file=str(json_path), output_analysis_file=str(analysis_path), ) # Read the extracted text with open(text_path, "r", encoding="utf-8") as f: extracted_text = f.read() # Read the analysis for summary with open(analysis_path, "r", encoding="utf-8") as f: analysis_data = json.load(f) # Calculate processing time end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() print(f"✅ OCR processing completed in {processing_time:.2f} seconds") print(f"📊 Extracted {len(extracted_text)} characters") return ExtractionResult( success=True, filename=file.filename, extracted_text=extracted_text, text_file_path=str(text_path), json_file_path=str(json_path), analysis_file_path=str(analysis_path), extraction_summary=analysis_data, processing_time=processing_time, error_message=None, ) except HTTPException: # Re-raise HTTP exceptions raise except Exception as e: # Handle unexpected errors error_message = f"OCR processing failed: {str(e)}" print(f"❌ {error_message}") processing_time = (datetime.now() - start_time).total_seconds() return ExtractionResult( success=False, filename=file.filename if file else "unknown", extracted_text="", text_file_path="", json_file_path="", analysis_file_path="", extraction_summary={}, processing_time=processing_time, error_message=error_message, ) @app.post("/eval", response_model=EvaluationResult, tags=["OCR Evaluation"]) async def evaluate_ocr_extraction( extracted_file: UploadFile = File( ..., description="Text file containing OCR extracted text" ), baseline_file: UploadFile = File( ..., description="Text file containing ground truth baseline text" ), evaluation_name: Optional[str] = None, ): """ 📊 **Evaluate OCR Extraction Accuracy** Compare extracted text file with ground truth baseline file to measure OCR accuracy. ## Features: - **File-based input**: Upload two text files for comparison - **Character-level accuracy**: Precise character matching and edit distance - **Word-level accuracy**: Word matching and error rates - **Line-level accuracy**: Line comparison and similarity scores - **Language-specific metrics**: Separate accuracy for English, Bangla, and Math - **Overall grading**: Letter grade system (A+ to F) - **Detailed diff**: Character-by-character comparison - **Recommendations**: Suggestions for improving OCR accuracy ## Input: - `extracted_file`: Text file with OCR extracted content (.txt format) - `baseline_file`: Text file with ground truth content (.txt format) - `evaluation_name`: Optional name for the evaluation ## Output: - Comprehensive accuracy metrics - Performance grading - Detailed comparison analysis - Improvement recommendations """ start_time = datetime.now() try: print(f"🔍 Starting OCR evaluation: {evaluation_name or 'Unnamed'}") # Validate file types if not extracted_file.filename.lower().endswith(".txt"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Extracted file must be a .txt file", ) if not baseline_file.filename.lower().endswith(".txt"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Baseline file must be a .txt file", ) # Read file contents try: extracted_content = await extracted_file.read() baseline_content = await baseline_file.read() # Decode with UTF-8 and handle potential encoding issues extracted_text = extracted_content.decode("utf-8", errors="replace") baseline_text = baseline_content.decode("utf-8", errors="replace") print(f"📁 Files read successfully:") print( f" Extracted file: {extracted_file.filename} ({len(extracted_text)} characters)" ) print( f" Baseline file: {baseline_file.filename} ({len(baseline_text)} characters)" ) except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Failed to read file contents: {str(e)}", ) # Validate that baseline text is not empty if not baseline_text.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Baseline file cannot be empty", ) # Clean input texts to prevent any issues from eval import clean_control_characters extracted_text_clean = clean_control_characters(extracted_text) baseline_text_clean = clean_control_characters(baseline_text) print( f"📝 Text lengths after cleaning - Extracted: {len(extracted_text_clean)}, Baseline: {len(baseline_text_clean)}" ) # Perform evaluation with cleaned texts evaluation_results = evaluate_ocr_accuracy( extracted_text=extracted_text_clean, baseline_text=baseline_text_clean, ) # Check for evaluation errors if "error" in evaluation_results: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=evaluation_results["error"], ) # Calculate processing time end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() print(f"✅ Evaluation completed in {processing_time:.3f} seconds") print(f"📊 Overall accuracy: {evaluation_results['overall_accuracy']:.2f}%") print(f"🎯 Grade: {evaluation_results['evaluation_summary']['grade']}") return EvaluationResult( success=True, evaluation_name=evaluation_name, overall_accuracy=evaluation_results["overall_accuracy"], similarity_score=evaluation_results["similarity_score"], character_metrics=evaluation_results["character_metrics"], word_metrics=evaluation_results["word_metrics"], line_metrics=evaluation_results["line_metrics"], language_specific=evaluation_results["language_specific"], text_statistics=evaluation_results["text_statistics"], detailed_diff=evaluation_results["detailed_diff"], evaluation_summary=evaluation_results["evaluation_summary"], processing_time=processing_time, error_message=None, ) except HTTPException: # Re-raise HTTP exceptions raise except Exception as e: # Handle unexpected errors error_message = f"Evaluation failed: {str(e)}" print(f"❌ {error_message}") processing_time = (datetime.now() - start_time).total_seconds() return EvaluationResult( success=False, evaluation_name=evaluation_name, overall_accuracy=0.0, similarity_score=0.0, character_metrics={}, word_metrics={}, line_metrics={}, language_specific={}, text_statistics={}, detailed_diff=[], evaluation_summary={ "grade": "F (Error)", "recommendations": [error_message], }, processing_time=processing_time, error_message=error_message, ) # Development server configuration if __name__ == "__main__": print("🚀 Starting Advanced Multi-Language OCR API Server...") print("📖 API Documentation: http://localhost:8000/docs") print("🏠 Main Page: http://localhost:8000/") print("🏥 Health Check: http://localhost:8000/health") # Get port from environment variable for Render deployment, default to 8000 for local development port = int(os.environ.get("PORT", 7860)) uvicorn.run("main:app", host="0.0.0.0", port=port, log_level="info")