""" OCR Service Module Handles all OCR operations using PaddleOCR """ import os import logging from typing import Dict, List, Any, Tuple, Optional import numpy as np from PIL import Image from paddleocr import PaddleOCR import cv2 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class OCRService: """ Service class for OCR operations using PaddleOCR. Supports text detection, recognition, layout parsing, and angle classification. """ # Configuration constants MIN_FONT_SIZE = 8 # Minimum font size in points MAX_FONT_SIZE = 72 # Maximum font size in points DEFAULT_HEADER_MAX_LENGTH = 50 # Max characters for header detection DEFAULT_VERTICAL_THRESHOLD_RATIO = 0.05 # Vertical grouping threshold as ratio of image height def __init__(self, use_gpu: bool = False, lang: str = 'en'): """ Initialize OCR Service Args: use_gpu: Whether to use GPU for processing lang: Language for OCR (default: 'en') """ self.use_gpu = use_gpu self.lang = lang # Initialize PaddleOCR with all features enabled logger.info(f"Initializing PaddleOCR (GPU: {use_gpu}, Language: {lang})") self.ocr_engine = PaddleOCR( use_angle_cls=True, # Enable angle classification lang=lang, use_gpu=use_gpu, show_log=False, use_space_char=True ) # Initialize structure parser for layout analysis try: from paddleocr import PPStructure self.structure_engine = PPStructure( use_gpu=use_gpu, lang=lang, show_log=False, layout=True, # Enable layout analysis table=False, # We'll handle tables separately if needed ocr=False # We'll use our own OCR ) except ImportError: logger.warning("PPStructure not available, layout parsing will be limited") self.structure_engine = None def process_image(self, image_path: str) -> Dict[str, Any]: """ Process an image and return structured OCR results Args: image_path: Path to the image file Returns: Dictionary containing structured OCR results """ # Load image image = cv2.imread(image_path) if image is None: raise ValueError(f"Cannot read image from {image_path}") # Get image dimensions height, width = image.shape[:2] logger.info(f"Processing image: {width}x{height}") # Perform OCR ocr_result = self.ocr_engine.ocr(image_path, cls=True) # Perform layout analysis if available layout_result = None if self.structure_engine: try: layout_result = self.structure_engine(image_path) except Exception as e: logger.warning(f"Layout analysis failed: {e}") # Build structured response structured_result = self._build_structured_response( ocr_result, layout_result, width, height ) return structured_result def _build_structured_response( self, ocr_result: List, layout_result: Optional[List], width: int, height: int ) -> Dict[str, Any]: """ Build structured JSON response from OCR results Args: ocr_result: Raw OCR result from PaddleOCR layout_result: Layout analysis result width: Image width height: Image height Returns: Structured dictionary matching required schema """ blocks = [] # Extract layout blocks if available layout_blocks = self._extract_layout_blocks(layout_result) if layout_result else [] # Process OCR results if ocr_result and ocr_result[0]: # Group lines into blocks based on layout or proximity if layout_blocks: blocks = self._group_lines_by_layout(ocr_result[0], layout_blocks) else: blocks = self._group_lines_by_proximity(ocr_result[0]) return { "image_width": width, "image_height": height, "blocks": blocks } def _extract_layout_blocks(self, layout_result: List) -> List[Dict]: """Extract layout blocks from structure parser result""" blocks = [] for item in layout_result: if isinstance(item, dict) and 'type' in item: blocks.append({ 'type': item.get('type', 'paragraph'), 'bbox': item.get('bbox', [0, 0, 0, 0]) }) return blocks def _group_lines_by_layout( self, ocr_lines: List, layout_blocks: List[Dict] ) -> List[Dict]: """Group OCR lines into layout blocks""" blocks = [] # If no layout blocks, fall back to proximity grouping if not layout_blocks: return self._group_lines_by_proximity(ocr_lines) # Assign lines to layout blocks for idx, layout_block in enumerate(layout_blocks): block_type = layout_block.get('type', 'paragraph') layout_bbox = layout_block.get('bbox', [0, 0, 0, 0]) # Find lines that belong to this block block_lines = [] for line_data in ocr_lines: line_bbox = line_data[0] line_center = self._get_bbox_center(line_bbox) # Check if line center is within layout block if self._point_in_bbox(line_center, layout_bbox): block_lines.append(line_data) if block_lines: blocks.append(self._create_block( block_id=f"block_{idx}", block_type=block_type, lines=block_lines )) # Handle lines not assigned to any block assigned_lines = set() for block in blocks: for line in block['lines']: assigned_lines.add(line['line_id']) unassigned_lines = [ line for i, line in enumerate(ocr_lines) if f"line_{i}" not in assigned_lines ] if unassigned_lines: blocks.append(self._create_block( block_id=f"block_{len(blocks)}", block_type="paragraph", lines=unassigned_lines )) return blocks def _group_lines_by_proximity(self, ocr_lines: List) -> List[Dict]: """ Group OCR lines into blocks based on spatial proximity Simple heuristic: group lines that are close vertically """ if not ocr_lines: return [] # Get image height for adaptive threshold (if not available, use fixed threshold) # Calculate threshold as a percentage of image height for better adaptability # For now, use a reasonable fixed threshold that works for most documents threshold = 50 # Vertical distance threshold in pixels for grouping # Sort lines by vertical position (top to bottom) sorted_lines = sorted( enumerate(ocr_lines), key=lambda x: self._get_bbox_center(x[1][0])[1] ) for orig_idx, line_data in sorted_lines: bbox = line_data[0] center_y = self._get_bbox_center(bbox)[1] if last_y is None or abs(center_y - last_y) < threshold: current_block_lines.append((orig_idx, line_data)) else: # Start new block if current_block_lines: blocks.append(self._create_block( block_id=f"block_{len(blocks)}", block_type=self._infer_block_type(current_block_lines), lines=[line[1] for line in current_block_lines], line_indices=[line[0] for line in current_block_lines] )) current_block_lines = [(orig_idx, line_data)] last_y = center_y # Add last block if current_block_lines: blocks.append(self._create_block( block_id=f"block_{len(blocks)}", block_type=self._infer_block_type(current_block_lines), lines=[line[1] for line in current_block_lines], line_indices=[line[0] for line in current_block_lines] )) return blocks def _infer_block_type(self, lines: List) -> str: """ Infer block type based on content heuristics Uses simple rules: single short lines without periods are likely headers """ if not lines: return "paragraph" # Get first line text first_line = lines[0][1] text = first_line[1][0] if len(first_line) > 1 else "" # Simple heuristics: single short lines without periods are likely headers if len(lines) == 1: if len(text) < self.DEFAULT_HEADER_MAX_LENGTH and not text.endswith('.'): return "header" # Default to paragraph return "paragraph" def _create_block( self, block_id: str, block_type: str, lines: List, line_indices: Optional[List[int]] = None ) -> Dict: """Create a block structure from OCR lines""" if line_indices is None: line_indices = list(range(len(lines))) block_lines = [] all_points = [] for idx, line_data in zip(line_indices, lines): bbox = line_data[0] text_tuple = line_data[1] text = text_tuple[0] if isinstance(text_tuple, tuple) else text_tuple confidence = text_tuple[1] if isinstance(text_tuple, tuple) and len(text_tuple) > 1 else 0.95 # Convert bbox to proper format line_bbox = self._normalize_bbox(bbox) all_points.extend(line_bbox) # Estimate font size from bbox height font_size = self._estimate_font_size(line_bbox) # Process words words = self._extract_words_from_line(text, line_bbox, confidence) block_lines.append({ "line_id": f"line_{idx}", "text": text, "bounding_box": line_bbox, "font_size_estimate": font_size, "words": words }) # Calculate block bounding box from all lines block_bbox = self._calculate_enclosing_bbox(all_points) return { "block_id": block_id, "block_type": block_type, "bounding_box": block_bbox, "lines": block_lines } def _extract_words_from_line( self, text: str, line_bbox: List[List[int]], line_confidence: float ) -> List[Dict]: """ Extract words from line and approximate their bounding boxes """ words = text.split() if not words: return [] # Calculate line dimensions x_coords = [p[0] for p in line_bbox] y_coords = [p[1] for p in line_bbox] line_width = max(x_coords) - min(x_coords) line_height = max(y_coords) - min(y_coords) line_x_start = min(x_coords) line_y_min = min(y_coords) # Calculate total character count (including spaces) total_chars = len(text) word_list = [] char_position = 0 for word in words: # Calculate word position proportionally word_start_ratio = char_position / total_chars if total_chars > 0 else 0 word_end_ratio = (char_position + len(word)) / total_chars if total_chars > 0 else 0 word_x_start = line_x_start + int(line_width * word_start_ratio) word_x_end = line_x_start + int(line_width * word_end_ratio) # Create word bounding box (simplified rectangle) word_bbox = [ [word_x_start, line_y_min], [word_x_end, line_y_min], [word_x_end, line_y_min + line_height], [word_x_start, line_y_min + line_height] ] # Extract characters characters = self._extract_characters_from_word( word, word_bbox, line_confidence ) word_list.append({ "word": word, "bounding_box": word_bbox, "confidence": line_confidence, "characters": characters }) # Move position forward (word + space) char_position += len(word) + 1 return word_list def _extract_characters_from_word( self, word: str, word_bbox: List[List[int]], confidence: float ) -> List[Dict]: """ Extract individual characters and approximate their bounding boxes """ if not word: return [] x_coords = [p[0] for p in word_bbox] y_coords = [p[1] for p in word_bbox] word_width = max(x_coords) - min(x_coords) word_height = max(y_coords) - min(y_coords) word_x_start = min(x_coords) word_y_min = min(y_coords) char_list = [] num_chars = len(word) for i, char in enumerate(word): # Calculate character position proportionally char_start_ratio = i / num_chars char_end_ratio = (i + 1) / num_chars char_x_start = word_x_start + int(word_width * char_start_ratio) char_x_end = word_x_start + int(word_width * char_end_ratio) # Create character bounding box char_bbox = [ [char_x_start, word_y_min], [char_x_end, word_y_min], [char_x_end, word_y_min + word_height], [char_x_start, word_y_min + word_height] ] char_list.append({ "char": char, "bounding_box": char_bbox, "confidence": confidence }) return char_list def _normalize_bbox(self, bbox: List) -> List[List[int]]: """Normalize bounding box to list of [x, y] coordinates""" if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) == 2: # Already in correct format return [[int(p[0]), int(p[1])] for p in bbox] else: # Convert from other formats return [[int(bbox[0]), int(bbox[1])], [int(bbox[2]), int(bbox[1])], [int(bbox[2]), int(bbox[3])], [int(bbox[0]), int(bbox[3])]] def _estimate_font_size(self, bbox: List[List[int]]) -> int: """ Estimate font size based on bounding box height Simple heuristic: height in pixels approximates font size in points Typical ratio: 1 point ≈ 1.333 pixels at 96 DPI """ y_coords = [p[1] for p in bbox] height = max(y_coords) - min(y_coords) # Convert pixel height to approximate font size font_size = int(height * 0.75) # Clamp between reasonable font size bounds return max(self.MIN_FONT_SIZE, min(self.MAX_FONT_SIZE, font_size)) def _calculate_enclosing_bbox(self, points: List[List[int]]) -> List[List[int]]: """Calculate the minimum enclosing bounding box for a set of points""" if not points: return [[0, 0], [0, 0], [0, 0], [0, 0]] x_coords = [p[0] for p in points] y_coords = [p[1] for p in points] min_x, max_x = min(x_coords), max(x_coords) min_y, max_y = min(y_coords), max(y_coords) return [ [min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y] ] def _get_bbox_center(self, bbox: List) -> Tuple[float, float]: """Get center point of bounding box""" if isinstance(bbox[0], (list, tuple)): x_coords = [p[0] for p in bbox] y_coords = [p[1] for p in bbox] else: x_coords = [bbox[0], bbox[2]] y_coords = [bbox[1], bbox[3]] return (sum(x_coords) / len(x_coords), sum(y_coords) / len(y_coords)) def _point_in_bbox(self, point: Tuple[float, float], bbox: List) -> bool: """Check if a point is inside a bounding box""" x, y = point if len(bbox) == 4 and not isinstance(bbox[0], (list, tuple)): # [x1, y1, x2, y2] format return bbox[0] <= x <= bbox[2] and bbox[1] <= y <= bbox[3] return False