Spaces:
Sleeping
Sleeping
| """ | |
| OCR Service Module | |
| Handles all OCR operations using PaddleOCR | |
| """ | |
| import os | |
| import logging | |
| from typing import Dict, List, Any, Tuple, Optional | |
| import numpy as np | |
| from PIL import Image | |
| from paddleocr import PaddleOCR | |
| import cv2 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class OCRService: | |
| """ | |
| Service class for OCR operations using PaddleOCR. | |
| Supports text detection, recognition, layout parsing, and angle classification. | |
| """ | |
| # Configuration constants | |
| MIN_FONT_SIZE = 8 # Minimum font size in points | |
| MAX_FONT_SIZE = 72 # Maximum font size in points | |
| DEFAULT_HEADER_MAX_LENGTH = 50 # Max characters for header detection | |
| DEFAULT_VERTICAL_THRESHOLD_RATIO = 0.05 # Vertical grouping threshold as ratio of image height | |
| def __init__(self, use_gpu: bool = False, lang: str = 'en'): | |
| """ | |
| Initialize OCR Service | |
| Args: | |
| use_gpu: Whether to use GPU for processing | |
| lang: Language for OCR (default: 'en') | |
| """ | |
| self.use_gpu = use_gpu | |
| self.lang = lang | |
| # Initialize PaddleOCR with all features enabled | |
| logger.info(f"Initializing PaddleOCR (GPU: {use_gpu}, Language: {lang})") | |
| self.ocr_engine = PaddleOCR( | |
| use_angle_cls=True, # Enable angle classification | |
| lang=lang, | |
| use_gpu=use_gpu, | |
| show_log=False, | |
| use_space_char=True | |
| ) | |
| # Initialize structure parser for layout analysis | |
| try: | |
| from paddleocr import PPStructure | |
| self.structure_engine = PPStructure( | |
| use_gpu=use_gpu, | |
| lang=lang, | |
| show_log=False, | |
| layout=True, # Enable layout analysis | |
| table=False, # We'll handle tables separately if needed | |
| ocr=False # We'll use our own OCR | |
| ) | |
| except ImportError: | |
| logger.warning("PPStructure not available, layout parsing will be limited") | |
| self.structure_engine = None | |
| def process_image(self, image_path: str) -> Dict[str, Any]: | |
| """ | |
| Process an image and return structured OCR results | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Dictionary containing structured OCR results | |
| """ | |
| # Load image | |
| image = cv2.imread(image_path) | |
| if image is None: | |
| raise ValueError(f"Cannot read image from {image_path}") | |
| # Get image dimensions | |
| height, width = image.shape[:2] | |
| logger.info(f"Processing image: {width}x{height}") | |
| # Perform OCR | |
| ocr_result = self.ocr_engine.ocr(image_path, cls=True) | |
| # Perform layout analysis if available | |
| layout_result = None | |
| if self.structure_engine: | |
| try: | |
| layout_result = self.structure_engine(image_path) | |
| except Exception as e: | |
| logger.warning(f"Layout analysis failed: {e}") | |
| # Build structured response | |
| structured_result = self._build_structured_response( | |
| ocr_result, | |
| layout_result, | |
| width, | |
| height | |
| ) | |
| return structured_result | |
| def _build_structured_response( | |
| self, | |
| ocr_result: List, | |
| layout_result: Optional[List], | |
| width: int, | |
| height: int | |
| ) -> Dict[str, Any]: | |
| """ | |
| Build structured JSON response from OCR results | |
| Args: | |
| ocr_result: Raw OCR result from PaddleOCR | |
| layout_result: Layout analysis result | |
| width: Image width | |
| height: Image height | |
| Returns: | |
| Structured dictionary matching required schema | |
| """ | |
| blocks = [] | |
| # Extract layout blocks if available | |
| layout_blocks = self._extract_layout_blocks(layout_result) if layout_result else [] | |
| # Process OCR results | |
| if ocr_result and ocr_result[0]: | |
| # Group lines into blocks based on layout or proximity | |
| if layout_blocks: | |
| blocks = self._group_lines_by_layout(ocr_result[0], layout_blocks) | |
| else: | |
| blocks = self._group_lines_by_proximity(ocr_result[0]) | |
| return { | |
| "image_width": width, | |
| "image_height": height, | |
| "blocks": blocks | |
| } | |
| def _extract_layout_blocks(self, layout_result: List) -> List[Dict]: | |
| """Extract layout blocks from structure parser result""" | |
| blocks = [] | |
| for item in layout_result: | |
| if isinstance(item, dict) and 'type' in item: | |
| blocks.append({ | |
| 'type': item.get('type', 'paragraph'), | |
| 'bbox': item.get('bbox', [0, 0, 0, 0]) | |
| }) | |
| return blocks | |
| def _group_lines_by_layout( | |
| self, | |
| ocr_lines: List, | |
| layout_blocks: List[Dict] | |
| ) -> List[Dict]: | |
| """Group OCR lines into layout blocks""" | |
| blocks = [] | |
| # If no layout blocks, fall back to proximity grouping | |
| if not layout_blocks: | |
| return self._group_lines_by_proximity(ocr_lines) | |
| # Assign lines to layout blocks | |
| for idx, layout_block in enumerate(layout_blocks): | |
| block_type = layout_block.get('type', 'paragraph') | |
| layout_bbox = layout_block.get('bbox', [0, 0, 0, 0]) | |
| # Find lines that belong to this block | |
| block_lines = [] | |
| for line_data in ocr_lines: | |
| line_bbox = line_data[0] | |
| line_center = self._get_bbox_center(line_bbox) | |
| # Check if line center is within layout block | |
| if self._point_in_bbox(line_center, layout_bbox): | |
| block_lines.append(line_data) | |
| if block_lines: | |
| blocks.append(self._create_block( | |
| block_id=f"block_{idx}", | |
| block_type=block_type, | |
| lines=block_lines | |
| )) | |
| # Handle lines not assigned to any block | |
| assigned_lines = set() | |
| for block in blocks: | |
| for line in block['lines']: | |
| assigned_lines.add(line['line_id']) | |
| unassigned_lines = [ | |
| line for i, line in enumerate(ocr_lines) | |
| if f"line_{i}" not in assigned_lines | |
| ] | |
| if unassigned_lines: | |
| blocks.append(self._create_block( | |
| block_id=f"block_{len(blocks)}", | |
| block_type="paragraph", | |
| lines=unassigned_lines | |
| )) | |
| return blocks | |
| def _group_lines_by_proximity(self, ocr_lines: List) -> List[Dict]: | |
| """ | |
| Group OCR lines into blocks based on spatial proximity | |
| Simple heuristic: group lines that are close vertically | |
| """ | |
| if not ocr_lines: | |
| return [] | |
| # Get image height for adaptive threshold (if not available, use fixed threshold) | |
| # Calculate threshold as a percentage of image height for better adaptability | |
| # For now, use a reasonable fixed threshold that works for most documents | |
| threshold = 50 # Vertical distance threshold in pixels for grouping | |
| # Sort lines by vertical position (top to bottom) | |
| sorted_lines = sorted( | |
| enumerate(ocr_lines), | |
| key=lambda x: self._get_bbox_center(x[1][0])[1] | |
| ) | |
| for orig_idx, line_data in sorted_lines: | |
| bbox = line_data[0] | |
| center_y = self._get_bbox_center(bbox)[1] | |
| if last_y is None or abs(center_y - last_y) < threshold: | |
| current_block_lines.append((orig_idx, line_data)) | |
| else: | |
| # Start new block | |
| if current_block_lines: | |
| blocks.append(self._create_block( | |
| block_id=f"block_{len(blocks)}", | |
| block_type=self._infer_block_type(current_block_lines), | |
| lines=[line[1] for line in current_block_lines], | |
| line_indices=[line[0] for line in current_block_lines] | |
| )) | |
| current_block_lines = [(orig_idx, line_data)] | |
| last_y = center_y | |
| # Add last block | |
| if current_block_lines: | |
| blocks.append(self._create_block( | |
| block_id=f"block_{len(blocks)}", | |
| block_type=self._infer_block_type(current_block_lines), | |
| lines=[line[1] for line in current_block_lines], | |
| line_indices=[line[0] for line in current_block_lines] | |
| )) | |
| return blocks | |
| def _infer_block_type(self, lines: List) -> str: | |
| """ | |
| Infer block type based on content heuristics | |
| Uses simple rules: single short lines without periods are likely headers | |
| """ | |
| if not lines: | |
| return "paragraph" | |
| # Get first line text | |
| first_line = lines[0][1] | |
| text = first_line[1][0] if len(first_line) > 1 else "" | |
| # Simple heuristics: single short lines without periods are likely headers | |
| if len(lines) == 1: | |
| if len(text) < self.DEFAULT_HEADER_MAX_LENGTH and not text.endswith('.'): | |
| return "header" | |
| # Default to paragraph | |
| return "paragraph" | |
| def _create_block( | |
| self, | |
| block_id: str, | |
| block_type: str, | |
| lines: List, | |
| line_indices: Optional[List[int]] = None | |
| ) -> Dict: | |
| """Create a block structure from OCR lines""" | |
| if line_indices is None: | |
| line_indices = list(range(len(lines))) | |
| block_lines = [] | |
| all_points = [] | |
| for idx, line_data in zip(line_indices, lines): | |
| bbox = line_data[0] | |
| text_tuple = line_data[1] | |
| text = text_tuple[0] if isinstance(text_tuple, tuple) else text_tuple | |
| confidence = text_tuple[1] if isinstance(text_tuple, tuple) and len(text_tuple) > 1 else 0.95 | |
| # Convert bbox to proper format | |
| line_bbox = self._normalize_bbox(bbox) | |
| all_points.extend(line_bbox) | |
| # Estimate font size from bbox height | |
| font_size = self._estimate_font_size(line_bbox) | |
| # Process words | |
| words = self._extract_words_from_line(text, line_bbox, confidence) | |
| block_lines.append({ | |
| "line_id": f"line_{idx}", | |
| "text": text, | |
| "bounding_box": line_bbox, | |
| "font_size_estimate": font_size, | |
| "words": words | |
| }) | |
| # Calculate block bounding box from all lines | |
| block_bbox = self._calculate_enclosing_bbox(all_points) | |
| return { | |
| "block_id": block_id, | |
| "block_type": block_type, | |
| "bounding_box": block_bbox, | |
| "lines": block_lines | |
| } | |
| def _extract_words_from_line( | |
| self, | |
| text: str, | |
| line_bbox: List[List[int]], | |
| line_confidence: float | |
| ) -> List[Dict]: | |
| """ | |
| Extract words from line and approximate their bounding boxes | |
| """ | |
| words = text.split() | |
| if not words: | |
| return [] | |
| # Calculate line dimensions | |
| x_coords = [p[0] for p in line_bbox] | |
| y_coords = [p[1] for p in line_bbox] | |
| line_width = max(x_coords) - min(x_coords) | |
| line_height = max(y_coords) - min(y_coords) | |
| line_x_start = min(x_coords) | |
| line_y_min = min(y_coords) | |
| # Calculate total character count (including spaces) | |
| total_chars = len(text) | |
| word_list = [] | |
| char_position = 0 | |
| for word in words: | |
| # Calculate word position proportionally | |
| word_start_ratio = char_position / total_chars if total_chars > 0 else 0 | |
| word_end_ratio = (char_position + len(word)) / total_chars if total_chars > 0 else 0 | |
| word_x_start = line_x_start + int(line_width * word_start_ratio) | |
| word_x_end = line_x_start + int(line_width * word_end_ratio) | |
| # Create word bounding box (simplified rectangle) | |
| word_bbox = [ | |
| [word_x_start, line_y_min], | |
| [word_x_end, line_y_min], | |
| [word_x_end, line_y_min + line_height], | |
| [word_x_start, line_y_min + line_height] | |
| ] | |
| # Extract characters | |
| characters = self._extract_characters_from_word( | |
| word, | |
| word_bbox, | |
| line_confidence | |
| ) | |
| word_list.append({ | |
| "word": word, | |
| "bounding_box": word_bbox, | |
| "confidence": line_confidence, | |
| "characters": characters | |
| }) | |
| # Move position forward (word + space) | |
| char_position += len(word) + 1 | |
| return word_list | |
| def _extract_characters_from_word( | |
| self, | |
| word: str, | |
| word_bbox: List[List[int]], | |
| confidence: float | |
| ) -> List[Dict]: | |
| """ | |
| Extract individual characters and approximate their bounding boxes | |
| """ | |
| if not word: | |
| return [] | |
| x_coords = [p[0] for p in word_bbox] | |
| y_coords = [p[1] for p in word_bbox] | |
| word_width = max(x_coords) - min(x_coords) | |
| word_height = max(y_coords) - min(y_coords) | |
| word_x_start = min(x_coords) | |
| word_y_min = min(y_coords) | |
| char_list = [] | |
| num_chars = len(word) | |
| for i, char in enumerate(word): | |
| # Calculate character position proportionally | |
| char_start_ratio = i / num_chars | |
| char_end_ratio = (i + 1) / num_chars | |
| char_x_start = word_x_start + int(word_width * char_start_ratio) | |
| char_x_end = word_x_start + int(word_width * char_end_ratio) | |
| # Create character bounding box | |
| char_bbox = [ | |
| [char_x_start, word_y_min], | |
| [char_x_end, word_y_min], | |
| [char_x_end, word_y_min + word_height], | |
| [char_x_start, word_y_min + word_height] | |
| ] | |
| char_list.append({ | |
| "char": char, | |
| "bounding_box": char_bbox, | |
| "confidence": confidence | |
| }) | |
| return char_list | |
| def _normalize_bbox(self, bbox: List) -> List[List[int]]: | |
| """Normalize bounding box to list of [x, y] coordinates""" | |
| if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) == 2: | |
| # Already in correct format | |
| return [[int(p[0]), int(p[1])] for p in bbox] | |
| else: | |
| # Convert from other formats | |
| return [[int(bbox[0]), int(bbox[1])], | |
| [int(bbox[2]), int(bbox[1])], | |
| [int(bbox[2]), int(bbox[3])], | |
| [int(bbox[0]), int(bbox[3])]] | |
| def _estimate_font_size(self, bbox: List[List[int]]) -> int: | |
| """ | |
| Estimate font size based on bounding box height | |
| Simple heuristic: height in pixels approximates font size in points | |
| Typical ratio: 1 point ≈ 1.333 pixels at 96 DPI | |
| """ | |
| y_coords = [p[1] for p in bbox] | |
| height = max(y_coords) - min(y_coords) | |
| # Convert pixel height to approximate font size | |
| font_size = int(height * 0.75) | |
| # Clamp between reasonable font size bounds | |
| return max(self.MIN_FONT_SIZE, min(self.MAX_FONT_SIZE, font_size)) | |
| def _calculate_enclosing_bbox(self, points: List[List[int]]) -> List[List[int]]: | |
| """Calculate the minimum enclosing bounding box for a set of points""" | |
| if not points: | |
| return [[0, 0], [0, 0], [0, 0], [0, 0]] | |
| x_coords = [p[0] for p in points] | |
| y_coords = [p[1] for p in points] | |
| min_x, max_x = min(x_coords), max(x_coords) | |
| min_y, max_y = min(y_coords), max(y_coords) | |
| return [ | |
| [min_x, min_y], | |
| [max_x, min_y], | |
| [max_x, max_y], | |
| [min_x, max_y] | |
| ] | |
| def _get_bbox_center(self, bbox: List) -> Tuple[float, float]: | |
| """Get center point of bounding box""" | |
| if isinstance(bbox[0], (list, tuple)): | |
| x_coords = [p[0] for p in bbox] | |
| y_coords = [p[1] for p in bbox] | |
| else: | |
| x_coords = [bbox[0], bbox[2]] | |
| y_coords = [bbox[1], bbox[3]] | |
| return (sum(x_coords) / len(x_coords), sum(y_coords) / len(y_coords)) | |
| def _point_in_bbox(self, point: Tuple[float, float], bbox: List) -> bool: | |
| """Check if a point is inside a bounding box""" | |
| x, y = point | |
| if len(bbox) == 4 and not isinstance(bbox[0], (list, tuple)): | |
| # [x1, y1, x2, y2] format | |
| return bbox[0] <= x <= bbox[2] and bbox[1] <= y <= bbox[3] | |
| return False | |