File size: 17,412 Bytes
9a34207
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
"""
OCR Service Module
Handles all OCR operations using PaddleOCR
"""

import os
import logging
from typing import Dict, List, Any, Tuple, Optional
import numpy as np
from PIL import Image
from paddleocr import PaddleOCR
import cv2

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class OCRService:
    """
    Service class for OCR operations using PaddleOCR.
    Supports text detection, recognition, layout parsing, and angle classification.
    """

    # Configuration constants
    MIN_FONT_SIZE = 8  # Minimum font size in points
    MAX_FONT_SIZE = 72  # Maximum font size in points
    DEFAULT_HEADER_MAX_LENGTH = 50  # Max characters for header detection
    DEFAULT_VERTICAL_THRESHOLD_RATIO = 0.05  # Vertical grouping threshold as ratio of image height

    def __init__(self, use_gpu: bool = False, lang: str = 'en'):
        """
        Initialize OCR Service
        
        Args:
            use_gpu: Whether to use GPU for processing
            lang: Language for OCR (default: 'en')
        """
        self.use_gpu = use_gpu
        self.lang = lang
        
        # Initialize PaddleOCR with all features enabled
        logger.info(f"Initializing PaddleOCR (GPU: {use_gpu}, Language: {lang})")
        self.ocr_engine = PaddleOCR(
            use_angle_cls=True,  # Enable angle classification
            lang=lang,
            use_gpu=use_gpu,
            show_log=False,
            use_space_char=True
        )
        
        # Initialize structure parser for layout analysis
        try:
            from paddleocr import PPStructure
            self.structure_engine = PPStructure(
                use_gpu=use_gpu,
                lang=lang,
                show_log=False,
                layout=True,  # Enable layout analysis
                table=False,  # We'll handle tables separately if needed
                ocr=False  # We'll use our own OCR
            )
        except ImportError:
            logger.warning("PPStructure not available, layout parsing will be limited")
            self.structure_engine = None

    def process_image(self, image_path: str) -> Dict[str, Any]:
        """
        Process an image and return structured OCR results
        
        Args:
            image_path: Path to the image file
            
        Returns:
            Dictionary containing structured OCR results
        """
        # Load image
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Cannot read image from {image_path}")
        
        # Get image dimensions
        height, width = image.shape[:2]
        logger.info(f"Processing image: {width}x{height}")
        
        # Perform OCR
        ocr_result = self.ocr_engine.ocr(image_path, cls=True)
        
        # Perform layout analysis if available
        layout_result = None
        if self.structure_engine:
            try:
                layout_result = self.structure_engine(image_path)
            except Exception as e:
                logger.warning(f"Layout analysis failed: {e}")
        
        # Build structured response
        structured_result = self._build_structured_response(
            ocr_result,
            layout_result,
            width,
            height
        )
        
        return structured_result

    def _build_structured_response(
        self,
        ocr_result: List,
        layout_result: Optional[List],
        width: int,
        height: int
    ) -> Dict[str, Any]:
        """
        Build structured JSON response from OCR results
        
        Args:
            ocr_result: Raw OCR result from PaddleOCR
            layout_result: Layout analysis result
            width: Image width
            height: Image height
            
        Returns:
            Structured dictionary matching required schema
        """
        blocks = []
        
        # Extract layout blocks if available
        layout_blocks = self._extract_layout_blocks(layout_result) if layout_result else []
        
        # Process OCR results
        if ocr_result and ocr_result[0]:
            # Group lines into blocks based on layout or proximity
            if layout_blocks:
                blocks = self._group_lines_by_layout(ocr_result[0], layout_blocks)
            else:
                blocks = self._group_lines_by_proximity(ocr_result[0])
        
        return {
            "image_width": width,
            "image_height": height,
            "blocks": blocks
        }

    def _extract_layout_blocks(self, layout_result: List) -> List[Dict]:
        """Extract layout blocks from structure parser result"""
        blocks = []
        for item in layout_result:
            if isinstance(item, dict) and 'type' in item:
                blocks.append({
                    'type': item.get('type', 'paragraph'),
                    'bbox': item.get('bbox', [0, 0, 0, 0])
                })
        return blocks

    def _group_lines_by_layout(
        self,
        ocr_lines: List,
        layout_blocks: List[Dict]
    ) -> List[Dict]:
        """Group OCR lines into layout blocks"""
        blocks = []
        
        # If no layout blocks, fall back to proximity grouping
        if not layout_blocks:
            return self._group_lines_by_proximity(ocr_lines)
        
        # Assign lines to layout blocks
        for idx, layout_block in enumerate(layout_blocks):
            block_type = layout_block.get('type', 'paragraph')
            layout_bbox = layout_block.get('bbox', [0, 0, 0, 0])
            
            # Find lines that belong to this block
            block_lines = []
            for line_data in ocr_lines:
                line_bbox = line_data[0]
                line_center = self._get_bbox_center(line_bbox)
                
                # Check if line center is within layout block
                if self._point_in_bbox(line_center, layout_bbox):
                    block_lines.append(line_data)
            
            if block_lines:
                blocks.append(self._create_block(
                    block_id=f"block_{idx}",
                    block_type=block_type,
                    lines=block_lines
                ))
        
        # Handle lines not assigned to any block
        assigned_lines = set()
        for block in blocks:
            for line in block['lines']:
                assigned_lines.add(line['line_id'])
        
        unassigned_lines = [
            line for i, line in enumerate(ocr_lines)
            if f"line_{i}" not in assigned_lines
        ]
        
        if unassigned_lines:
            blocks.append(self._create_block(
                block_id=f"block_{len(blocks)}",
                block_type="paragraph",
                lines=unassigned_lines
            ))
        
        return blocks

    def _group_lines_by_proximity(self, ocr_lines: List) -> List[Dict]:
        """
        Group OCR lines into blocks based on spatial proximity
        Simple heuristic: group lines that are close vertically
        """
        if not ocr_lines:
            return []
        
        # Get image height for adaptive threshold (if not available, use fixed threshold)
        # Calculate threshold as a percentage of image height for better adaptability
        # For now, use a reasonable fixed threshold that works for most documents
        threshold = 50  # Vertical distance threshold in pixels for grouping
        
        # Sort lines by vertical position (top to bottom)
        sorted_lines = sorted(
            enumerate(ocr_lines),
            key=lambda x: self._get_bbox_center(x[1][0])[1]
        )
        
        for orig_idx, line_data in sorted_lines:
            bbox = line_data[0]
            center_y = self._get_bbox_center(bbox)[1]
            
            if last_y is None or abs(center_y - last_y) < threshold:
                current_block_lines.append((orig_idx, line_data))
            else:
                # Start new block
                if current_block_lines:
                    blocks.append(self._create_block(
                        block_id=f"block_{len(blocks)}",
                        block_type=self._infer_block_type(current_block_lines),
                        lines=[line[1] for line in current_block_lines],
                        line_indices=[line[0] for line in current_block_lines]
                    ))
                current_block_lines = [(orig_idx, line_data)]
            
            last_y = center_y
        
        # Add last block
        if current_block_lines:
            blocks.append(self._create_block(
                block_id=f"block_{len(blocks)}",
                block_type=self._infer_block_type(current_block_lines),
                lines=[line[1] for line in current_block_lines],
                line_indices=[line[0] for line in current_block_lines]
            ))
        
        return blocks

    def _infer_block_type(self, lines: List) -> str:
        """
        Infer block type based on content heuristics
        Uses simple rules: single short lines without periods are likely headers
        """
        if not lines:
            return "paragraph"
        
        # Get first line text
        first_line = lines[0][1]
        text = first_line[1][0] if len(first_line) > 1 else ""
        
        # Simple heuristics: single short lines without periods are likely headers
        if len(lines) == 1:
            if len(text) < self.DEFAULT_HEADER_MAX_LENGTH and not text.endswith('.'):
                return "header"
        
        # Default to paragraph
        return "paragraph"

    def _create_block(
        self,
        block_id: str,
        block_type: str,
        lines: List,
        line_indices: Optional[List[int]] = None
    ) -> Dict:
        """Create a block structure from OCR lines"""
        if line_indices is None:
            line_indices = list(range(len(lines)))
        
        block_lines = []
        all_points = []
        
        for idx, line_data in zip(line_indices, lines):
            bbox = line_data[0]
            text_tuple = line_data[1]
            text = text_tuple[0] if isinstance(text_tuple, tuple) else text_tuple
            confidence = text_tuple[1] if isinstance(text_tuple, tuple) and len(text_tuple) > 1 else 0.95
            
            # Convert bbox to proper format
            line_bbox = self._normalize_bbox(bbox)
            all_points.extend(line_bbox)
            
            # Estimate font size from bbox height
            font_size = self._estimate_font_size(line_bbox)
            
            # Process words
            words = self._extract_words_from_line(text, line_bbox, confidence)
            
            block_lines.append({
                "line_id": f"line_{idx}",
                "text": text,
                "bounding_box": line_bbox,
                "font_size_estimate": font_size,
                "words": words
            })
        
        # Calculate block bounding box from all lines
        block_bbox = self._calculate_enclosing_bbox(all_points)
        
        return {
            "block_id": block_id,
            "block_type": block_type,
            "bounding_box": block_bbox,
            "lines": block_lines
        }

    def _extract_words_from_line(
        self,
        text: str,
        line_bbox: List[List[int]],
        line_confidence: float
    ) -> List[Dict]:
        """
        Extract words from line and approximate their bounding boxes
        """
        words = text.split()
        if not words:
            return []
        
        # Calculate line dimensions
        x_coords = [p[0] for p in line_bbox]
        y_coords = [p[1] for p in line_bbox]
        line_width = max(x_coords) - min(x_coords)
        line_height = max(y_coords) - min(y_coords)
        line_x_start = min(x_coords)
        line_y_min = min(y_coords)
        
        # Calculate total character count (including spaces)
        total_chars = len(text)
        
        word_list = []
        char_position = 0
        
        for word in words:
            # Calculate word position proportionally
            word_start_ratio = char_position / total_chars if total_chars > 0 else 0
            word_end_ratio = (char_position + len(word)) / total_chars if total_chars > 0 else 0
            
            word_x_start = line_x_start + int(line_width * word_start_ratio)
            word_x_end = line_x_start + int(line_width * word_end_ratio)
            
            # Create word bounding box (simplified rectangle)
            word_bbox = [
                [word_x_start, line_y_min],
                [word_x_end, line_y_min],
                [word_x_end, line_y_min + line_height],
                [word_x_start, line_y_min + line_height]
            ]
            
            # Extract characters
            characters = self._extract_characters_from_word(
                word,
                word_bbox,
                line_confidence
            )
            
            word_list.append({
                "word": word,
                "bounding_box": word_bbox,
                "confidence": line_confidence,
                "characters": characters
            })
            
            # Move position forward (word + space)
            char_position += len(word) + 1
        
        return word_list

    def _extract_characters_from_word(
        self,
        word: str,
        word_bbox: List[List[int]],
        confidence: float
    ) -> List[Dict]:
        """
        Extract individual characters and approximate their bounding boxes
        """
        if not word:
            return []
        
        x_coords = [p[0] for p in word_bbox]
        y_coords = [p[1] for p in word_bbox]
        word_width = max(x_coords) - min(x_coords)
        word_height = max(y_coords) - min(y_coords)
        word_x_start = min(x_coords)
        word_y_min = min(y_coords)
        
        char_list = []
        num_chars = len(word)
        
        for i, char in enumerate(word):
            # Calculate character position proportionally
            char_start_ratio = i / num_chars
            char_end_ratio = (i + 1) / num_chars
            
            char_x_start = word_x_start + int(word_width * char_start_ratio)
            char_x_end = word_x_start + int(word_width * char_end_ratio)
            
            # Create character bounding box
            char_bbox = [
                [char_x_start, word_y_min],
                [char_x_end, word_y_min],
                [char_x_end, word_y_min + word_height],
                [char_x_start, word_y_min + word_height]
            ]
            
            char_list.append({
                "char": char,
                "bounding_box": char_bbox,
                "confidence": confidence
            })
        
        return char_list

    def _normalize_bbox(self, bbox: List) -> List[List[int]]:
        """Normalize bounding box to list of [x, y] coordinates"""
        if isinstance(bbox[0], (list, tuple)) and len(bbox[0]) == 2:
            # Already in correct format
            return [[int(p[0]), int(p[1])] for p in bbox]
        else:
            # Convert from other formats
            return [[int(bbox[0]), int(bbox[1])],
                    [int(bbox[2]), int(bbox[1])],
                    [int(bbox[2]), int(bbox[3])],
                    [int(bbox[0]), int(bbox[3])]]

    def _estimate_font_size(self, bbox: List[List[int]]) -> int:
        """
        Estimate font size based on bounding box height
        Simple heuristic: height in pixels approximates font size in points
        Typical ratio: 1 point ≈ 1.333 pixels at 96 DPI
        """
        y_coords = [p[1] for p in bbox]
        height = max(y_coords) - min(y_coords)
        # Convert pixel height to approximate font size
        font_size = int(height * 0.75)
        # Clamp between reasonable font size bounds
        return max(self.MIN_FONT_SIZE, min(self.MAX_FONT_SIZE, font_size))

    def _calculate_enclosing_bbox(self, points: List[List[int]]) -> List[List[int]]:
        """Calculate the minimum enclosing bounding box for a set of points"""
        if not points:
            return [[0, 0], [0, 0], [0, 0], [0, 0]]
        
        x_coords = [p[0] for p in points]
        y_coords = [p[1] for p in points]
        
        min_x, max_x = min(x_coords), max(x_coords)
        min_y, max_y = min(y_coords), max(y_coords)
        
        return [
            [min_x, min_y],
            [max_x, min_y],
            [max_x, max_y],
            [min_x, max_y]
        ]

    def _get_bbox_center(self, bbox: List) -> Tuple[float, float]:
        """Get center point of bounding box"""
        if isinstance(bbox[0], (list, tuple)):
            x_coords = [p[0] for p in bbox]
            y_coords = [p[1] for p in bbox]
        else:
            x_coords = [bbox[0], bbox[2]]
            y_coords = [bbox[1], bbox[3]]
        
        return (sum(x_coords) / len(x_coords), sum(y_coords) / len(y_coords))

    def _point_in_bbox(self, point: Tuple[float, float], bbox: List) -> bool:
        """Check if a point is inside a bounding box"""
        x, y = point
        if len(bbox) == 4 and not isinstance(bbox[0], (list, tuple)):
            # [x1, y1, x2, y2] format
            return bbox[0] <= x <= bbox[2] and bbox[1] <= y <= bbox[3]
        return False