paddleocr / app.py
sidoutcome's picture
Update app.py
54daefe verified
import os, io, stat, logging, sys, asyncio
from typing import Any, Dict, Iterable, List, Tuple, Union
from fastapi import FastAPI, UploadFile, File, Form, Header, HTTPException, Security
from fastapi.security import APIKeyHeader
from fastapi.responses import JSONResponse
from PIL import Image, ImageEnhance, ImageFilter
import numpy as np
# Configure logging to stdout for HuggingFace Spaces
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Also set root logger to DEBUG
logging.getLogger().setLevel(logging.DEBUG)
# API Key Authentication
API_KEY = os.environ.get("API_KEY", None) # Set this in HuggingFace Spaces Secrets
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def verify_api_key(api_key: str = Security(api_key_header)):
"""Verify API key if authentication is enabled."""
if API_KEY is None:
# No API key configured - allow all requests
logger.warning("API_KEY not set - endpoint is unprotected!")
return None
if api_key is None:
logger.warning("Request missing API key")
raise HTTPException(
status_code=401,
detail="Missing API Key. Include 'X-API-Key' header."
)
if api_key != API_KEY:
logger.warning(f"Invalid API key attempt: {api_key[:10]}...")
raise HTTPException(
status_code=403,
detail="Invalid API Key"
)
return api_key
# -----------------------------------------------------------------------------
# Writable caches (HF/Docker safe) & clear thread envs (suppress OpenBLAS warn)
# -----------------------------------------------------------------------------
os.environ.setdefault("HOME", "/tmp")
os.environ.setdefault("TMPDIR", "/tmp")
os.environ.setdefault("XDG_CACHE_HOME", "/tmp/.cache")
os.environ.setdefault("PADDLE_HOME", "/tmp/.paddle")
os.environ.setdefault("PADDLEX_HOME", "/tmp/.paddlex")
for d in [
os.environ["XDG_CACHE_HOME"],
os.path.join(os.environ["XDG_CACHE_HOME"], "paddle"),
os.environ["PADDLE_HOME"],
os.path.join(os.environ["PADDLEX_HOME"], "temp"),
]:
try:
os.makedirs(d, exist_ok=True)
os.chmod(d, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
except Exception:
pass
# Unset any inherited BLAS/OMP thread caps BEFORE importing paddle/paddleocr
for v in ("OMP_NUM_THREADS", "OPENBLAS_NUM_THREADS", "MKL_NUM_THREADS", "NUMEXPR_NUM_THREADS"):
os.environ.pop(v, None)
logger.info("Environment setup complete. Cache directories configured.")
logger.info(f"PADDLE_HOME: {os.environ['PADDLE_HOME']}")
logger.info(f"XDG_CACHE_HOME: {os.environ['XDG_CACHE_HOME']}")
from paddleocr import PaddleOCR # import AFTER env cleanup
logger.info("PaddleOCR module imported successfully")
# =============================================================================
# THREAD-SAFE OCR POOL - NEW IMPLEMENTATION
# =============================================================================
class OCRPool:
"""
Thread-safe pool of PaddleOCR instances per language.
This class manages multiple PaddleOCR instances (one per language) and
ensures thread-safe access. It uses asyncio locks to prevent race conditions
when multiple concurrent requests arrive.
Features:
- Lazy initialization: Creates instances only when needed
- Thread-safe: Uses locks to prevent concurrent access issues
- GPU serialization: Ensures only one OCR operation runs at a time
- Language caching: Keeps models in memory for fast switching
"""
def __init__(self):
self._instances: Dict[str, PaddleOCR] = {}
self._pool_lock = asyncio.Lock() # Protects instance creation
self._gpu_lock = asyncio.Lock() # Serializes GPU access
logger.info("OCRPool initialized")
async def get_ocr(self, lang: str = "en") -> PaddleOCR:
"""
Get or create OCR instance for the specified language.
This method is thread-safe and uses double-checked locking to minimize
lock contention. If an instance already exists, it's returned immediately.
Otherwise, a new instance is created under lock protection.
Args:
lang: Language code (e.g., "en", "fr", "es", "zh")
Returns:
PaddleOCR instance configured for the specified language
"""
# Fast path: instance already exists (no lock needed)
if lang in self._instances:
logger.debug(f"Using cached OCR instance for language: {lang}")
return self._instances[lang]
# Slow path: need to create instance (acquire lock)
async with self._pool_lock:
# Double-check after acquiring lock (another request may have created it)
if lang in self._instances:
logger.debug(f"OCR instance for {lang} created by another request")
return self._instances[lang]
logger.info(f"Creating new OCR instance for language: {lang}")
try:
self._instances[lang] = PaddleOCR(
use_angle_cls=True,
lang=lang,
use_gpu=True,
gpu_mem=500 # GPU memory limit in MB
)
logger.info(f"✓ OCR instance created successfully for {lang}")
except Exception as e:
logger.error(f"Failed to create OCR instance for {lang}: {e}")
raise
return self._instances[lang]
async def run_ocr(self, lang: str, image_array: np.ndarray) -> List:
"""
Run OCR on an image array with GPU serialization.
This method ensures that only one OCR operation runs at a time on the GPU.
Even though we cache multiple language models, GPU operations are serialized
to prevent contention and maximize throughput on single-GPU systems.
Args:
lang: Language code for OCR
image_array: Numpy array of the image (HxWx3, RGB)
Returns:
PaddleOCR results (list of detections per page)
"""
# Get the OCR instance for this language
ocr = await self.get_ocr(lang)
# Serialize GPU access (only one OCR operation at a time)
async with self._gpu_lock:
logger.debug(f"Running OCR on GPU with {lang} model...")
# PaddleOCR is synchronous, so we run it directly
# (in production, you might want to use run_in_executor for CPU-heavy tasks)
results = ocr.ocr(image_array, cls=True)
logger.debug(f"OCR completed for {lang}")
return results
def get_stats(self) -> dict:
"""Get statistics about the OCR pool."""
return {
"cached_languages": list(self._instances.keys()),
"total_instances": len(self._instances),
}
# Initialize global OCR pool (this object itself is never reassigned, so it's safe)
ocr_pool = OCRPool()
logger.info("Global OCR pool created")
# =============================================================================
# FASTAPI APP INITIALIZATION
# =============================================================================
app = FastAPI(
title="PaddleOCR 2.8 API (GPU-Accelerated)",
version="2.8.1-gpu-threadsafe",
root_path="/",
docs_url="/docs",
openapi_url="/openapi.json"
)
logger.info("FastAPI app initialized")
@app.on_event("startup")
async def startup_event():
"""Log when application starts up."""
logger.info("="*50)
logger.info("PaddleOCR GPU API APPLICATION STARTED")
logger.info("PaddleOCR Version: 2.8.1 (Thread-Safe)")
logger.info("CUDA Version: 11.8")
logger.info("Source: PyPI (fast downloads)")
logger.info("Thread Safety: ENABLED (OCRPool)")
logger.info("="*50)
logger.info("Available endpoints:")
logger.info(" GET / - Health check")
logger.info(" GET /test - Test endpoint")
logger.info(" GET /stats - OCR pool statistics")
logger.info(" GET /docs - API documentation")
logger.info(" POST /ocr - OCR processing (thread-safe)")
logger.info("="*50)
# =============================================================================
# HELPER FUNCTIONS (unchanged, already thread-safe)
# =============================================================================
def _is_number(x: Any) -> bool:
"""Check if a value can be converted to float."""
try:
float(x)
return True
except Exception:
return False
def _is_point(pt: Any) -> bool:
"""Check if pt is a valid 2D point [x, y]."""
return (
isinstance(pt, (list, tuple)) and
len(pt) == 2 and
_is_number(pt[0]) and
_is_number(pt[1])
)
def _is_quad(box: Any) -> bool:
"""Check if box is a valid quadrilateral (4 points)."""
return (
isinstance(box, (list, tuple)) and
len(box) == 4 and
all(_is_point(p) for p in box)
)
def _coerce_box(box: Any) -> Union[List[List[float]], None]:
"""Try to coerce various box formats into a standard quad; return None if impossible."""
# Convert numpy array to list first
if isinstance(box, np.ndarray):
box = box.tolist()
# Already a proper quad?
if _is_quad(box):
return [[float(p[0]), float(p[1])] for p in box]
# Some variants: dict with 'points' or 'box'
if isinstance(box, dict):
for k in ("points", "box", "polygon"):
if k in box and _is_quad(box[k]):
return [[float(p[0]), float(p[1])] for p in box[k]]
# Some models may output rect [x_min, y_min, x_max, y_max]
if (
isinstance(box, (list, tuple)) and
len(box) == 4 and
all(_is_number(v) for v in box)
):
x1, y1, x2, y2 = map(float, box)
return [[x1, y1], [x2, y1], [x2, y2], [x1, y2]]
# Anything else: give up
return None
def _format_as_markdown(results: List[dict]) -> str:
"""Format OCR results as clean, readable markdown with table detection."""
if not results:
return ""
# Sort by Y position (top to bottom), then X position (left to right)
sorted_results = sorted(results, key=lambda x: (
min(p[1] for p in x["box"]), # Y position
min(p[0] for p in x["box"]) # X position
))
# Group into rows based on Y position
rows = []
current_row = []
last_y = None
y_threshold = 15 # Pixels - items within this are on same line
for item in sorted_results:
box = item["box"]
y_center = sum(p[1] for p in box) / 4
x_min = min(p[0] for p in box)
x_max = max(p[0] for p in box)
text = item["text"].strip()
if not text:
continue
# Check if we're on a new line
if last_y is None or abs(y_center - last_y) > y_threshold:
# Save previous line
if current_row:
rows.append(current_row)
current_row = [{
"text": text,
"x_min": x_min,
"x_max": x_max,
"x_center": (x_min + x_max) / 2,
"y_center": y_center
}]
last_y = y_center
else:
# Same line - add to current row
current_row.append({
"text": text,
"x_min": x_min,
"x_max": x_max,
"x_center": (x_min + x_max) / 2,
"y_center": y_center
})
# Don't forget the last row
if current_row:
rows.append(current_row)
# Sort items within each row by X position
for row in rows:
row.sort(key=lambda x: x["x_min"])
# Detect tables
markdown = []
i = 0
while i < len(rows):
row = rows[i]
# Only consider table if row has 2+ columns
if len(row) >= 2:
# Look ahead for similar column structure
table_rows = _detect_table(rows[i:])
if len(table_rows) >= 3: # Need at least 3 rows to be a table
# Format as table
markdown.append("") # Spacing before table
_add_table_to_markdown(table_rows, markdown)
markdown.append("") # Spacing after table
i += len(table_rows)
continue
# Not a table - format as regular text
line_text = " ".join(item["text"] for item in row)
# Format based on content
if not line_text.strip():
i += 1
continue
# Title (first line if short enough)
if i == 0 and len(line_text) < 100:
markdown.append(f"# {line_text}")
markdown.append("")
# Section headers (short lines with colons or all caps)
elif (len(line_text) < 60 and
(line_text.endswith(':') or line_text.isupper())):
if markdown:
markdown.append("") # Spacing before header
markdown.append(f"**{line_text}**")
markdown.append("")
# Numbered items
elif (len(line_text) <= 3 and
any(line_text.startswith(str(n)) for n in range(1, 20))):
markdown.append(f"\n{line_text}")
# Regular paragraph
else:
markdown.append(line_text)
i += 1
return "\n".join(markdown).strip()
def _detect_table(rows: List[List[dict]]) -> List[List[dict]]:
"""
Detect if rows form a table by checking for consistent column alignment.
Returns the rows that form a table (empty if not a table).
"""
if len(rows) < 3: # Need at least 3 rows for a table
return []
first_row = rows[0]
if len(first_row) < 2: # Need at least 2 columns
return []
# Extract column X positions from first row
col_positions = [item["x_center"] for item in first_row]
num_cols = len(col_positions)
table_rows = [first_row]
col_threshold = 40 # Pixels - columns must align within this
# Check subsequent rows for alignment
for row in rows[1:]:
if len(row) < 2: # Skip single-column rows
break
# Check if this row's columns align with the first row
if _row_aligns_with_columns(row, col_positions, col_threshold):
table_rows.append(row)
else:
# Stop at first non-aligning row
break
# Stop checking after 20 rows (max table size)
if len(table_rows) >= 20:
break
# Only return as table if we found at least 3 aligned rows
return table_rows if len(table_rows) >= 3 else []
def _row_aligns_with_columns(row: List[dict], col_positions: List[float], threshold: float) -> bool:
"""Check if a row's columns align with expected column positions."""
if len(row) != len(col_positions):
# Allow rows with fewer columns (merged cells)
if len(row) > len(col_positions):
return False
# Check if each item in the row aligns with a column position
for item in row:
item_x = item["x_center"]
# Find closest column position
min_distance = min(abs(item_x - col_x) for col_x in col_positions)
if min_distance > threshold:
return False
return True
def _add_table_to_markdown(table_rows: List[List[dict]], markdown: List[str]):
"""Add a formatted markdown table to the markdown list."""
if not table_rows:
return
# Determine max columns
max_cols = max(len(row) for row in table_rows)
# Format each row
for row_idx, row in enumerate(table_rows):
# Pad row to max columns
row_texts = [item["text"] for item in row]
while len(row_texts) < max_cols:
row_texts.append("")
# Add row
markdown.append("| " + " | ".join(row_texts) + " |")
# Add separator after first row (header)
if row_idx == 0:
markdown.append("| " + " | ".join(["---"] * max_cols) + " |")
# =============================================================================
# API ENDPOINTS
# =============================================================================
@app.get("/")
def health_check():
"""Health check endpoint - HuggingFace Spaces checks this."""
logger.info("Health check endpoint called")
stats = ocr_pool.get_stats()
return JSONResponse({
"status": "ok",
"engine": "PaddleOCR 2.8.1 (GPU-Accelerated, Thread-Safe)",
"version": "2.8.1-threadsafe",
"paddlepaddle_version": "2.6.2",
"cuda_version": "11.8",
"source": "PyPI",
"lang_default": "en",
"gpu_enabled": True,
"thread_safe": True,
"ocr_pool": stats,
"endpoints": {
"health": "/",
"ocr": "/ocr",
"stats": "/stats",
"docs": "/docs",
"test": "/test"
},
"cache": {
"XDG_CACHE_HOME": os.environ["XDG_CACHE_HOME"],
"PADDLE_HOME": os.environ["PADDLE_HOME"],
"PADDLEX_HOME": os.environ["PADDLEX_HOME"],
},
})
@app.get("/test")
def test_endpoint():
"""Simple test endpoint to verify routing."""
logger.info("Test endpoint called")
return JSONResponse({
"message": "Test endpoint works! (GPU mode, thread-safe)",
"timestamp": "2025-01-08",
"thread_safe": True
})
@app.get("/stats")
def stats_endpoint():
"""Get OCR pool statistics."""
logger.info("Stats endpoint called")
stats = ocr_pool.get_stats()
return JSONResponse({
"ocr_pool": stats,
"thread_safe": True,
"gpu_serialization": "enabled"
})
@app.post("/ocr")
async def ocr_endpoint(
file: UploadFile = File(...),
lang: str = Form("en"),
confidence_threshold: float = Form(0.4),
api_key: str = Security(verify_api_key),
):
"""
OCR endpoint for text detection and recognition (THREAD-SAFE).
This endpoint is fully thread-safe and can handle concurrent requests
with different languages without race conditions. Each language gets
its own cached OCR instance, and GPU access is serialized to prevent
contention.
Args:
file: Image file to process
lang: Language code (default: "en")
confidence_threshold: Minimum confidence score (0.0-1.0, default: 0.4)
api_key: API key for authentication (required if API_KEY is set)
Returns:
JSON with detected text, confidence scores, bounding boxes, and formatted markdown
"""
logger.info(f"[THREAD-SAFE] OCR request - filename: {file.filename}, lang: {lang}, threshold: {confidence_threshold}")
try:
# PHASE 1: Image preprocessing (can run in parallel, no shared state)
logger.debug("Reading image file...")
contents = await file.read()
logger.debug(f"Image file read - size: {len(contents)} bytes")
img = Image.open(io.BytesIO(contents)).convert("RGB")
logger.debug(f"Image opened - dimensions: {img.size}, mode: {img.mode}")
# Optimal preprocessing for OCR text detection
logger.debug("Applying OCR preprocessing...")
img = ImageEnhance.Contrast(img).enhance(1.2)
img = ImageEnhance.Sharpness(img).enhance(1.2)
arr = np.array(img)
logger.debug(f"Image converted to array - shape: {arr.shape}, dtype: {arr.dtype}")
# Ensure HxWx3 format
if arr.ndim == 2:
logger.debug("Converting grayscale to RGB")
arr = np.stack([arr, arr, arr], axis=-1)
elif arr.ndim == 3 and arr.shape[2] == 4:
logger.debug("Removing alpha channel")
arr = arr[:, :, :3]
logger.debug(f"Final array shape: {arr.shape}")
# PHASE 2: OCR execution (thread-safe via OCRPool)
logger.info(f"Running thread-safe OCR with language: {lang}")
results = await ocr_pool.run_ocr(lang, arr)
logger.info("OCR processing complete")
if not results or results is None:
logger.warning("No results returned from OCR")
return JSONResponse({
"results": [],
"markdown": "",
"summary": {
"total_detections": 0,
"average_confidence": 0
}
})
# PHASE 3: Result processing (no shared state, thread-safe)
out = []
detection_count = 0
skipped_count = 0
logger.debug("Processing OCR results...")
for page_idx, page_result in enumerate(results):
# Skip None pages
if page_result is None:
logger.debug(f"Page {page_idx}: No text detected")
continue
if not isinstance(page_result, list):
logger.warning(f"Page {page_idx}: Unexpected type {type(page_result)}, skipping")
skipped_count += 1
continue
logger.debug(f"Page {page_idx}: Processing {len(page_result)} detections")
for line_idx, line in enumerate(page_result):
if not (isinstance(line, (list, tuple)) and len(line) >= 2):
logger.warning(f"Page {page_idx}, Line {line_idx}: Invalid format")
skipped_count += 1
continue
box_raw = line[0]
info = line[1]
box = _coerce_box(box_raw)
if box is None:
logger.warning(f"Page {page_idx}, Line {line_idx}: Could not coerce box")
skipped_count += 1
continue
# Extract text and confidence
if isinstance(info, (list, tuple)) and len(info) >= 1:
text = str(info[0])
conf = None
if len(info) >= 2 and _is_number(info[1]):
conf = float(info[1])
else:
text, conf = str(info), None
# Skip empty text or low confidence
if not text.strip():
skipped_count += 1
continue
if conf is not None and conf < confidence_threshold:
skipped_count += 1
logger.debug(f"Skipping low confidence ({conf:.3f}): {text[:30]}")
continue
out.append({"text": text, "confidence": conf, "box": box})
detection_count += 1
logger.info(f"Results: {detection_count} detections, {skipped_count} skipped")
# Generate formatted markdown
markdown_text = _format_as_markdown(out)
logger.debug("Markdown generated")
return JSONResponse({
"results": out,
"markdown": markdown_text,
"summary": {
"total_detections": len(out),
"average_confidence": sum(item["confidence"] for item in out if item["confidence"]) / len(out) if out else 0
}
})
except Exception as e:
logger.error(f"Error processing OCR request: {str(e)}", exc_info=True)
return JSONResponse(
{
"error": str(e),
"results": [],
"markdown": "",
"summary": {
"total_detections": 0,
"average_confidence": 0
}
},
status_code=500
)