fastapi-ocr / ocr_api /ocr_service.py
nagpalsumit247's picture
Upload 4 files
9a34207 verified
"""
OCR Service Module
Handles all OCR operations using PaddleOCR
"""
import os
import logging
from typing import Dict, List, Any, Tuple, Optional
import numpy as np
from PIL import Image
from paddleocr import PaddleOCR
import cv2
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class OCRService:
"""
Service class for OCR operations using PaddleOCR.
Supports text detection, recognition, layout parsing, and angle classification.
"""
# Configuration constants
MIN_FONT_SIZE = 8 # Minimum font size in points
MAX_FONT_SIZE = 72 # Maximum font size in points
DEFAULT_HEADER_MAX_LENGTH = 50 # Max characters for header detection
DEFAULT_VERTICAL_THRESHOLD_RATIO = 0.05 # Vertical grouping threshold as ratio of image height
def __init__(self, use_gpu: bool = False, lang: str = 'en'):
"""
Initialize OCR Service
Args:
use_gpu: Whether to use GPU for processing
lang: Language for OCR (default: 'en')
"""
self.use_gpu = use_gpu
self.lang = lang
# Initialize PaddleOCR with all features enabled
logger.info(f"Initializing PaddleOCR (GPU: {use_gpu}, Language: {lang})")
self.ocr_engine = PaddleOCR(
use_angle_cls=True, # Enable angle classification
lang=lang,
use_gpu=use_gpu,
show_log=False,
use_space_char=True
)
# Initialize structure parser for layout analysis
try:
from paddleocr import PPStructure
self.structure_engine = PPStructure(
use_gpu=use_gpu,
lang=lang,
show_log=False,
layout=True, # Enable layout analysis
table=False, # We'll handle tables separately if needed
ocr=False # We'll use our own OCR
)
except ImportError:
logger.warning("PPStructure not available, layout parsing will be limited")
self.structure_engine = None
def process_image(self, image_path: str) -> Dict[str, Any]:
"""
Process an image and return structured OCR results
Args:
image_path: Path to the image file
Returns:
Dictionary containing structured OCR results
"""
# Load image
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Cannot read image from {image_path}")
# Get image dimensions
height, width = image.shape[:2]
logger.info(f"Processing image: {width}x{height}")
# Perform OCR
ocr_result = self.ocr_engine.ocr(image_path, cls=True)
# Perform layout analysis if available
layout_result = None
if self.structure_engine:
try:
layout_result = self.structure_engine(image_path)
except Exception as e:
logger.warning(f"Layout analysis failed: {e}")
# Build structured response
structured_result = self._build_structured_response(
ocr_result,
layout_result,
width,
height
)
return structured_result
def _build_structured_response(
self,
ocr_result: List,
layout_result: Optional[List],
width: int,
height: int
) -> Dict[str, Any]:
"""
Build structured JSON response from OCR results
Args:
ocr_result: Raw OCR result from PaddleOCR
layout_result: Layout analysis result
width: Image width
height: Image height
Returns:
Structured dictionary matching required schema
"""
blocks = []
# Extract layout blocks if available
layout_blocks = self._extract_layout_blocks(layout_result) if layout_result else []
# Process OCR results
if ocr_result and ocr_result[0]:
# Group lines into blocks based on layout or proximity
if layout_blocks:
blocks = self._group_lines_by_layout(ocr_result[0], layout_blocks)
else:
blocks = self._group_lines_by_proximity(ocr_result[0])
return {
"image_width": width,
"image_height": height,
"blocks": blocks
}
def _extract_layout_blocks(self, layout_result: List) -> List[Dict]:
"""Extract layout blocks from structure parser result"""
blocks = []
for item in layout_result:
if isinstance(item, dict) and 'type' in item:
blocks.append({
'type': item.get('type', 'paragraph'),
'bbox': item.get('bbox', [0, 0, 0, 0])
})
return blocks
def _group_lines_by_layout(
self,
ocr_lines: List,
layout_blocks: List[Dict]
) -> List[Dict]:
"""Group OCR lines into layout blocks"""
blocks = []
# If no layout blocks, fall back to proximity grouping
if not layout_blocks:
return self._group_lines_by_proximity(ocr_lines)
# Assign lines to layout blocks
for idx, layout_block in enumerate(layout_blocks):
block_type = layout_block.get('type', 'paragraph')
layout_bbox = layout_block.get('bbox', [0, 0, 0, 0])
# Find lines that belong to this block
block_lines = []
for line_data in ocr_lines:
line_bbox = line_data[0]
line_center = self._get_bbox_center(line_bbox)
# Check if line center is within layout block
if self._point_in_bbox(line_center, layout_bbox):
block_lines.append(line_data)
if block_lines:
blocks.append(self._create_block(
block_id=f"block_{idx}",
block_type=block_type,
lines=block_lines
))
# Handle lines not assigned to any block
assigned_lines = set()
for block in blocks:
for line in block['lines']:
assigned_lines.add(line['line_id'])
unassigned_lines = [
line for i, line in enumerate(ocr_lines)
if f"line_{i}" not in assigned_lines
]
if unassigned_lines:
blocks.append(self._create_block(
block_id=f"block_{len(blocks)}",
block_type="paragraph",
lines=unassigned_lines
))
return blocks
def _group_lines_by_proximity(self, ocr_lines: List) -> List[Dict]:
"""
Group OCR lines into blocks based on spatial proximity
Simple heuristic: group lines that are close vertically
"""
if not ocr_lines:
return []
# Get image height for adaptive threshold (if not available, use fixed threshold)
# Calculate threshold as a percentage of image height for better adaptability
# For now, use a reasonable fixed threshold that works for most documents
threshold = 50 # Vertical distance threshold in pixels for grouping
# Sort lines by vertical position (top to bottom)
sorted_lines = sorted(
enumerate(ocr_lines),
key=lambda x: self._get_bbox_center(x[1][0])[1]
)
for orig_idx, line_data in sorted_lines:
bbox = line_data[0]
center_y = self._get_bbox_center(bbox)[1]
if last_y is None or abs(center_y - last_y) < threshold:
current_block_lines.append((orig_idx, line_data))
else:
# Start new block
if current_block_lines:
blocks.append(self._create_block(
block_id=f"block_{len(blocks)}",
block_type=self._infer_block_type(current_block_lines),
lines=[line[1] for line in current_block_lines],
line_indices=[line[0] for line in current_block_lines]
))
current_block_lines = [(orig_idx, line_data)]
last_y = center_y
# Add last block
if current_block_lines:
blocks.append(self._create_block(
block_id=f"block_{len(blocks)}",
block_type=self._infer_block_type(current_block_lines),
lines=[line[1] for line in current_block_lines],
line_indices=[line[0] for line in current_block_lines]
))
return blocks
def _infer_block_type(self, lines: List) -> str:
"""
Infer block type based on content heuristics
Uses simple rules: single short lines without periods are likely headers
"""
if not lines:
return "paragraph"
# Get first line text
first_line = lines[0][1]
text = first_line[1][0] if len(first_line) > 1 else ""
# Simple heuristics: single short lines without periods are likely headers
if len(lines) == 1:
if len(text) < self.DEFAULT_HEADER_MAX_LENGTH and not text.endswith('.'):
return "header"
# Default to paragraph
return "paragraph"
def _create_block(
self,
block_id: str,
block_type: str,
lines: List,
line_indices: Optional[List[int]] = None
) -> Dict:
"""Create a block structure from OCR lines"""
if line_indices is None:
line_indices = list(range(len(lines)))
block_lines = []
all_points = []
for idx, line_data in zip(line_indices, lines):
bbox = line_data[0]
text_tuple = line_data[1]
text = text_tuple[0] if isinstance(text_tuple, tuple) else text_tuple
confidence = text_tuple[1] if isinstance(text_tuple, tuple) and len(text_tuple) > 1 else 0.95
# Convert bbox to proper format
line_bbox = self._normalize_bbox(bbox)
all_points.extend(line_bbox)
# Estimate font size from bbox height
font_size = self._estimate_font_size(line_bbox)
# Process words
words = self._extract_words_from_line(text, line_bbox, confidence)
block_lines.append({
"line_id": f"line_{idx}",
"text": text,
"bounding_box": line_bbox,
"font_size_estimate": font_size,
"words": words
})
# Calculate block bounding box from all lines
block_bbox = self._calculate_enclosing_bbox(all_points)
return {
"block_id": block_id,
"block_type": block_type,
"bounding_box": block_bbox,
"lines": block_lines
}
def _extract_words_from_line(
self,
text: str,
line_bbox: List[List[int]],
line_confidence: float
) -> List[Dict]:
"""
Extract words from line and approximate their bounding boxes
"""
words = text.split()
if not words:
return []
# Calculate line dimensions
x_coords = [p[0] for p in line_bbox]
y_coords = [p[1] for p in line_bbox]
line_width = max(x_coords) - min(x_coords)
line_height = max(y_coords) - min(y_coords)
line_x_start = min(x_coords)
line_y_min = min(y_coords)
# Calculate total character count (including spaces)
total_chars = len(text)
word_list = []
char_position = 0
for word in words:
# Calculate word position proportionally
word_start_ratio = char_position / total_chars if total_chars > 0 else 0
word_end_ratio = (char_position + len(word)) / total_chars if total_chars > 0 else 0
word_x_start = line_x_start + int(line_width * word_start_ratio)
word_x_end = line_x_start + int(line_width * word_end_ratio)
# Create word bounding box (simplified rectangle)
word_bbox = [
[word_x_start, line_y_min],
[word_x_end, line_y_min],
[word_x_end, line_y_min + line_height],
[word_x_start, line_y_min + line_height]
]
# Extract characters
characters = self._extract_characters_from_word(
word,
word_bbox,
line_confidence
)
word_list.append({
"word": word,
"bounding_box": word_bbox,
"confidence": line_confidence,
"characters": characters
})
# Move position forward (word + space)
char_position += len(word) + 1
return word_list
def _extract_characters_from_word(
self,
word: str,
word_bbox: List[List[int]],
confidence: float
) -> List[Dict]:
"""
Extract individual characters and approximate their bounding boxes
"""
if not word:
return []
x_coords = [p[0] for p in word_bbox]
y_coords = [p[1] for p in word_bbox]
word_width = max(x_coords) - min(x_coords)
word_height = max(y_coords) - min(y_coords)
word_x_start = min(x_coords)
word_y_min = min(y_coords)
char_list = []
num_chars = len(word)
for i, char in enumerate(word):
# Calculate character position proportionally
char_start_ratio = i / num_chars
char_end_ratio = (i + 1) / num_chars
char_x_start = word_x_start + int(word_width * char_start_ratio)
char_x_end = word_x_start + int(word_width * char_end_ratio)
# Create character bounding box
char_bbox = [
[char_x_start, word_y_min],
[char_x_end, word_y_min],
[char_x_end, word_y_min + word_height],
[char_x_start, word_y_min + word_height]
]
char_list.append({
"char": char,
"bounding_box": char_bbox,
"confidence": confidence
})
return char_list
def _normalize_bbox(self, bbox: List) -> List[List[int]]:
"""Normalize bounding box to list of [x, y] coordinates"""
if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) == 2:
# Already in correct format
return [[int(p[0]), int(p[1])] for p in bbox]
else:
# Convert from other formats
return [[int(bbox[0]), int(bbox[1])],
[int(bbox[2]), int(bbox[1])],
[int(bbox[2]), int(bbox[3])],
[int(bbox[0]), int(bbox[3])]]
def _estimate_font_size(self, bbox: List[List[int]]) -> int:
"""
Estimate font size based on bounding box height
Simple heuristic: height in pixels approximates font size in points
Typical ratio: 1 point ≈ 1.333 pixels at 96 DPI
"""
y_coords = [p[1] for p in bbox]
height = max(y_coords) - min(y_coords)
# Convert pixel height to approximate font size
font_size = int(height * 0.75)
# Clamp between reasonable font size bounds
return max(self.MIN_FONT_SIZE, min(self.MAX_FONT_SIZE, font_size))
def _calculate_enclosing_bbox(self, points: List[List[int]]) -> List[List[int]]:
"""Calculate the minimum enclosing bounding box for a set of points"""
if not points:
return [[0, 0], [0, 0], [0, 0], [0, 0]]
x_coords = [p[0] for p in points]
y_coords = [p[1] for p in points]
min_x, max_x = min(x_coords), max(x_coords)
min_y, max_y = min(y_coords), max(y_coords)
return [
[min_x, min_y],
[max_x, min_y],
[max_x, max_y],
[min_x, max_y]
]
def _get_bbox_center(self, bbox: List) -> Tuple[float, float]:
"""Get center point of bounding box"""
if isinstance(bbox[0], (list, tuple)):
x_coords = [p[0] for p in bbox]
y_coords = [p[1] for p in bbox]
else:
x_coords = [bbox[0], bbox[2]]
y_coords = [bbox[1], bbox[3]]
return (sum(x_coords) / len(x_coords), sum(y_coords) / len(y_coords))
def _point_in_bbox(self, point: Tuple[float, float], bbox: List) -> bool:
"""Check if a point is inside a bounding box"""
x, y = point
if len(bbox) == 4 and not isinstance(bbox[0], (list, tuple)):
# [x1, y1, x2, y2] format
return bbox[0] <= x <= bbox[2] and bbox[1] <= y <= bbox[3]
return False