Spaces:
Sleeping
Sleeping
| """ | |
| OCR Service Module - ENHANCED VERSION with OpenCV Text Block Analysis and Bold Detection | |
| Handles PDF to text conversion with OpenCV-based spacing analysis, bold text detection, and improved formatting | |
| """ | |
| import re | |
| import os | |
| import logging | |
| from typing import Optional, Dict, Any, Tuple, List | |
| import tempfile | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| # Load environment variables | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Azure Document Intelligence | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.ai.documentintelligence import DocumentIntelligenceClient | |
| from azure.core.exceptions import AzureError | |
| # Fallback OCR libraries | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| TESSERACT_AVAILABLE = True | |
| except ImportError: | |
| TESSERACT_AVAILABLE = False | |
| import fitz # PyMuPDF | |
| # Enhanced indentation detection with OpenCV | |
| from enhanced_indentation import EnhancedIndentationDetector, OpenCVTextAnalyzer | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class EnhancedHTMLProcessor: | |
| """Process OCR results through HTML with OpenCV-enhanced text block analysis and bold detection""" | |
| def __init__(self): | |
| self.indent_detector = EnhancedIndentationDetector() | |
| self.opencv_analyzer = OpenCVTextAnalyzer() | |
| def create_html_from_azure_result(analysis_result, page_images=None) -> str: | |
| """Create structured HTML from Azure Document Intelligence result with OpenCV enhancement""" | |
| processor = EnhancedHTMLProcessor() | |
| html_parts = ['<!DOCTYPE html><html><head><meta charset="UTF-8">'] | |
| html_parts.append('<style>') | |
| html_parts.append(''' | |
| body { | |
| font-family: 'Consolas', 'Courier New', monospace; | |
| line-height: 1.6; | |
| margin: 20px; | |
| white-space: pre-wrap; | |
| font-size: 11pt; | |
| background-color: #fafafa; | |
| } | |
| .page { | |
| margin-bottom: 30px; | |
| border: 1px solid #ddd; | |
| padding: 20px; | |
| background-color: white; | |
| border-radius: 5px; | |
| box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
| } | |
| .page-header { | |
| font-weight: bold; | |
| color: #2c3e50; | |
| margin-bottom: 15px; | |
| text-align: center; | |
| border-bottom: 2px solid #3498db; | |
| padding-bottom: 8px; | |
| font-size: 14pt; | |
| text-transform: uppercase; | |
| letter-spacing: 1px; | |
| } | |
| /* OpenCV-enhanced bold headers */ | |
| .opencv-bold-header { | |
| font-weight: bold; | |
| color: #2c3e50; | |
| font-size: 1.3em; | |
| margin: 20px 0 15px 0; | |
| border-left: 4px solid #e74c3c; | |
| padding-left: 12px; | |
| background-color: #fdf2f2; | |
| line-height: 1.4; | |
| } | |
| /* Enhanced indentation levels - 4 spaces per level system */ | |
| .indent-level-0 { margin-left: 0em; } | |
| .indent-level-1 { margin-left: 1.0em; } /* 4 spaces */ | |
| .indent-level-2 { margin-left: 2.0em; } /* 8 spaces */ | |
| .indent-level-3 { margin-left: 3.0em; } /* 12 spaces */ | |
| .indent-level-4 { margin-left: 4.0em; } /* 16 spaces */ | |
| .indent-level-5 { margin-left: 5.0em; } /* 20 spaces */ | |
| .indent-level-6 { margin-left: 6.0em; } /* 24 spaces */ | |
| .indent-level-7 { margin-left: 7.0em; } /* 28 spaces */ | |
| .indent-level-8 { margin-left: 8.0em; } /* 32 spaces */ | |
| .indent-level-9 { margin-left: 9.0em; } /* 36 spaces */ | |
| .indent-level-10 { margin-left: 10.0em; } /* 40 spaces */ | |
| /* OpenCV-detected headers have no indentation */ | |
| .opencv-bold-header.indent-level-1, | |
| .opencv-bold-header.indent-level-2, | |
| .opencv-bold-header.indent-level-3, | |
| .opencv-bold-header.indent-level-4, | |
| .opencv-bold-header.indent-level-5, | |
| .opencv-bold-header.indent-level-6, | |
| .opencv-bold-header.indent-level-7, | |
| .opencv-bold-header.indent-level-8, | |
| .opencv-bold-header.indent-level-9, | |
| .opencv-bold-header.indent-level-10 { | |
| margin-left: 0em !important; | |
| } | |
| /* Text classification styles */ | |
| .content-header { | |
| font-weight: bold; | |
| color: #2c3e50; | |
| font-size: 1.1em; | |
| margin: 15px 0 8px 0; | |
| border-left: 4px solid #3498db; | |
| padding-left: 10px; | |
| background-color: #f8f9fa; | |
| } | |
| .content-paragraph { | |
| color: #333; | |
| margin-bottom: 1em; | |
| line-height: 1.5; | |
| } | |
| .content-list-item { | |
| margin-bottom: 0.5em; | |
| line-height: 1.4; | |
| } | |
| /* Pattern-specific styles */ | |
| .numbered-primary { | |
| font-weight: bold; | |
| color: #2c3e50; | |
| border-left: 4px solid #3498db; | |
| padding-left: 8px; | |
| margin-bottom: 0.5em; | |
| background-color: #f8f9fa; | |
| } | |
| .numbered-secondary { | |
| font-weight: 600; | |
| color: #34495e; | |
| border-left: 3px solid #95a5a6; | |
| padding-left: 6px; | |
| margin-bottom: 0.4em; | |
| background-color: #f9f9f9; | |
| } | |
| .numbered-tertiary { | |
| color: #555; | |
| border-left: 2px solid #bdc3c7; | |
| padding-left: 4px; | |
| margin-bottom: 0.3em; | |
| } | |
| .numbered-quaternary { | |
| color: #666; | |
| border-left: 1px solid #dee2e6; | |
| padding-left: 3px; | |
| margin-bottom: 0.2em; | |
| } | |
| .numbered-quinary { | |
| color: #777; | |
| padding-left: 2px; | |
| margin-bottom: 0.2em; | |
| } | |
| /* Parenthetical styles */ | |
| .parenthetical-primary { | |
| font-weight: 600; | |
| color: #8e44ad; | |
| border-left: 3px solid #9b59b6; | |
| padding-left: 6px; | |
| margin-bottom: 0.4em; | |
| } | |
| .parenthetical-secondary { | |
| color: #9b59b6; | |
| border-left: 2px solid #af7ac5; | |
| padding-left: 4px; | |
| margin-bottom: 0.3em; | |
| } | |
| .parenthetical-tertiary { | |
| color: #af7ac5; | |
| padding-left: 3px; | |
| margin-bottom: 0.2em; | |
| } | |
| .parenthetical-quaternary { | |
| color: #c39bd3; | |
| padding-left: 2px; | |
| margin-bottom: 0.2em; | |
| } | |
| .bullet-primary { | |
| position: relative; | |
| padding-left: 1.2em; | |
| } | |
| .bullet-primary::before { | |
| content: "•"; | |
| position: absolute; | |
| left: 0; | |
| color: #3498db; | |
| font-weight: bold; | |
| } | |
| .bullet-secondary { | |
| position: relative; | |
| padding-left: 1.2em; | |
| } | |
| .bullet-secondary::before { | |
| content: "◦"; | |
| position: absolute; | |
| left: 0; | |
| color: #95a5a6; | |
| } | |
| .bullet-tertiary { | |
| position: relative; | |
| padding-left: 1.2em; | |
| } | |
| .bullet-tertiary::before { | |
| content: "▪"; | |
| position: absolute; | |
| left: 0; | |
| color: #bdc3c7; | |
| } | |
| .bullet-quaternary { | |
| position: relative; | |
| padding-left: 1.2em; | |
| } | |
| .bullet-quaternary::before { | |
| content: "‣"; | |
| position: absolute; | |
| left: 0; | |
| color: #dee2e6; | |
| } | |
| .lettered-primary { | |
| font-style: italic; | |
| color: #8e44ad; | |
| font-weight: 600; | |
| } | |
| .lettered-secondary { | |
| color: #9b59b6; | |
| font-style: italic; | |
| } | |
| .roman-primary { | |
| font-variant: small-caps; | |
| color: #d35400; | |
| font-weight: bold; | |
| } | |
| .roman-secondary { | |
| color: #e67e22; | |
| font-variant: small-caps; | |
| } | |
| .thai-primary { | |
| color: #16a085; | |
| font-weight: bold; | |
| } | |
| .thai-secondary { | |
| color: #1abc9c; | |
| } | |
| .paragraph { | |
| margin-bottom: 0.8em; | |
| white-space: pre-wrap; | |
| font-family: 'Consolas', 'Courier New', monospace; | |
| line-height: 1.4; | |
| } | |
| .title { | |
| font-size: 1.4em; | |
| font-weight: bold; | |
| margin: 15px 0 12px 0; | |
| color: #2c3e50; | |
| border-left: 4px solid #3498db; | |
| padding-left: 10px; | |
| } | |
| .section-heading { | |
| font-size: 1.2em; | |
| font-weight: bold; | |
| margin: 12px 0 8px 0; | |
| color: #34495e; | |
| border-left: 3px solid #95a5a6; | |
| padding-left: 8px; | |
| } | |
| .table-container { | |
| margin: 15px 0; | |
| font-family: 'Consolas', 'Courier New', monospace; | |
| background-color: #f8f9fa; | |
| padding: 10px; | |
| border-radius: 5px; | |
| border: 1px solid #dee2e6; | |
| } | |
| .table { | |
| border-collapse: collapse; | |
| width: 100%; | |
| margin: 8px 0; | |
| font-family: 'Consolas', 'Courier New', monospace; | |
| font-size: 10pt; | |
| background-color: white; | |
| } | |
| .table th, .table td { | |
| border: 1px solid #bdc3c7; | |
| padding: 6px 10px; | |
| text-align: left; | |
| white-space: pre-wrap; | |
| vertical-align: top; | |
| } | |
| .table th { | |
| background-color: #ecf0f1; | |
| font-weight: bold; | |
| color: #2c3e50; | |
| } | |
| .table tr:nth-child(even) { | |
| background-color: #f8f9fa; | |
| } | |
| .indented_text { | |
| color: #555; | |
| font-style: italic; | |
| } | |
| .space-indent { | |
| border-left: 1px dotted #ccc; | |
| padding-left: 5px; | |
| } | |
| .page-number { | |
| position: relative; | |
| float: right; | |
| background-color: #3498db; | |
| color: white; | |
| padding: 2px 8px; | |
| border-radius: 3px; | |
| font-size: 9pt; | |
| margin-top: -5px; | |
| } | |
| /* OpenCV block analysis indicators */ | |
| .opencv-paragraph-block { | |
| border-left: 2px solid #27ae60; | |
| padding-left: 8px; | |
| margin: 10px 0; | |
| } | |
| .opencv-text-block { | |
| background-color: #f8f9fa; | |
| border-radius: 3px; | |
| padding: 5px; | |
| margin: 5px 0; | |
| } | |
| ''') | |
| html_parts.append('</style></head><body>') | |
| if not analysis_result.pages: | |
| html_parts.append('<p>No content found</p></body></html>') | |
| return '\n'.join(html_parts) | |
| for page_num, page in enumerate(analysis_result.pages, 1): | |
| html_parts.append(f'<div class="page">') | |
| html_parts.append(f'<div class="page-header">Page {page_num} <span class="page-number">{page_num}</span></div>') | |
| # Get OpenCV analysis for this page if available | |
| opencv_analysis = None | |
| if page_images and page_num in page_images: | |
| page_text_lines = processor._extract_page_text_lines(page, analysis_result, page_num) | |
| opencv_analysis = processor.opencv_analyzer.analyze_text_blocks( | |
| page_images[page_num], page_text_lines | |
| ) | |
| # Process content with OpenCV-enhanced indentation detection and text classification | |
| content_items = processor._extract_page_content_enhanced( | |
| page, analysis_result, page_num, opencv_analysis | |
| ) | |
| content_items.sort(key=lambda x: (x['y_pos'], x['x_pos'])) | |
| # Generate HTML for each content item with OpenCV enhancement | |
| for item in content_items: | |
| if item['type'] == 'table': | |
| html_parts.append(processor._table_to_html(item['content'], item['table_idx'])) | |
| else: | |
| html_parts.append(processor._text_to_html_opencv_enhanced(item)) | |
| html_parts.append('</div>') | |
| html_parts.append('</body></html>') | |
| return '\n'.join(html_parts) | |
| def _extract_page_text_lines(self, page, analysis_result, page_num): | |
| """Extract text lines for OpenCV correlation""" | |
| text_lines = [] | |
| if hasattr(analysis_result, 'paragraphs') and analysis_result.paragraphs: | |
| page_paragraphs = [p for p in analysis_result.paragraphs if | |
| p.bounding_regions and | |
| p.bounding_regions[0].page_number == page_num] | |
| for para in page_paragraphs: | |
| if para.content.strip(): | |
| text_lines.append(para.content.strip()) | |
| elif page.lines: | |
| for line in page.lines: | |
| if line.content.strip(): | |
| text_lines.append(line.content.strip()) | |
| return text_lines | |
| def _extract_page_content_enhanced(self, page, analysis_result, page_num, opencv_analysis=None): | |
| """Extract page content with OpenCV-enhanced text block analysis and bold detection""" | |
| content_items = [] | |
| # Handle tables (existing logic) | |
| page_tables = [] | |
| table_regions = [] | |
| if analysis_result.tables: | |
| for table_idx, table in enumerate(analysis_result.tables): | |
| if self._is_table_on_page(table, page_num): | |
| page_tables.append((table_idx, table)) | |
| if table.bounding_regions: | |
| table_regions.append({ | |
| 'polygon': table.bounding_regions[0].polygon, | |
| 'table_idx': table_idx | |
| }) | |
| # Add tables to content | |
| for table_idx, table in page_tables: | |
| if table.bounding_regions and table.bounding_regions[0].polygon: | |
| polygon = table.bounding_regions[0].polygon | |
| y_pos = min(polygon[1], polygon[3], polygon[5], polygon[7]) | |
| x_pos = min(polygon[0], polygon[2], polygon[4], polygon[6]) | |
| content_items.append({ | |
| 'type': 'table', | |
| 'content': table, | |
| 'table_idx': table_idx, | |
| 'y_pos': y_pos, | |
| 'x_pos': x_pos | |
| }) | |
| # Process text content with OpenCV-enhanced analysis | |
| if hasattr(analysis_result, 'paragraphs') and analysis_result.paragraphs: | |
| page_paragraphs = [p for p in analysis_result.paragraphs if | |
| p.bounding_regions and | |
| p.bounding_regions[0].page_number == page_num] | |
| for para in page_paragraphs: | |
| if para.content.strip(): | |
| # Check table overlap | |
| overlap_ratio = self._calculate_table_overlap(para, table_regions) | |
| if overlap_ratio < 0.7: # Not heavily overlapping with table | |
| polygon = para.bounding_regions[0].polygon | |
| y_pos = min(polygon[1], polygon[3], polygon[5], polygon[7]) if polygon else 0 | |
| x_pos = min(polygon[0], polygon[2], polygon[4], polygon[6]) if polygon else 0 | |
| # Find corresponding OpenCV analysis | |
| opencv_line_mapping = None | |
| if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: | |
| for mapping in opencv_analysis['line_mappings']: | |
| if mapping.get('text', '').strip() == para.content.strip(): | |
| opencv_line_mapping = mapping | |
| break | |
| # Enhanced indentation detection with OpenCV | |
| if opencv_line_mapping: | |
| indent_info = self.indent_detector.detect_indentation_with_opencv( | |
| para.content, opencv_analysis, opencv_line_mapping | |
| ) | |
| else: | |
| indent_info = self.indent_detector.detect_indentation(para.content) | |
| # Intelligent text classification with OpenCV context | |
| context = { | |
| 'y_position': y_pos, | |
| 'x_position': x_pos, | |
| 'font_size': getattr(para, 'font_size', None), | |
| 'is_bold': getattr(para, 'is_bold', False), | |
| 'page_number': page_num | |
| } | |
| text_classification = self.indent_detector.classify_text_type( | |
| para.content, context, opencv_analysis | |
| ) | |
| content_items.append({ | |
| 'type': 'paragraph', | |
| 'content': indent_info['content'], | |
| 'role': getattr(para, 'role', 'paragraph'), | |
| 'y_pos': y_pos, | |
| 'x_pos': x_pos, | |
| 'indent_info': indent_info, | |
| 'text_classification': text_classification, | |
| 'opencv_analysis': opencv_line_mapping, | |
| 'preserve_spacing': True | |
| }) | |
| elif page.lines: | |
| # Process lines with OpenCV-enhanced analysis | |
| processed_lines = self._process_lines_opencv_enhanced(page.lines, table_regions, opencv_analysis) | |
| content_items.extend(processed_lines) | |
| return content_items | |
| def _process_lines_opencv_enhanced(self, lines, table_regions, opencv_analysis=None): | |
| """Process lines with OpenCV-enhanced text block analysis and bold detection""" | |
| content_items = [] | |
| processed_content = set() | |
| for line in lines: | |
| if not line.content.strip(): | |
| continue | |
| content_key = line.content.strip().lower() | |
| if content_key in processed_content: | |
| continue | |
| processed_content.add(content_key) | |
| # Check table overlap | |
| overlap_ratio = self._calculate_line_table_overlap(line, table_regions) | |
| if overlap_ratio < 0.7: | |
| polygon = line.polygon | |
| y_pos = min(polygon[1], polygon[3], polygon[5], polygon[7]) if polygon else 0 | |
| x_pos = min(polygon[0], polygon[2], polygon[4], polygon[6]) if polygon else 0 | |
| # Find corresponding OpenCV analysis | |
| opencv_line_mapping = None | |
| if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: | |
| for mapping in opencv_analysis['line_mappings']: | |
| if mapping.get('text', '').strip() == line.content.strip(): | |
| opencv_line_mapping = mapping | |
| break | |
| # Enhanced indentation detection with OpenCV | |
| if opencv_line_mapping: | |
| indent_info = self.indent_detector.detect_indentation_with_opencv( | |
| line.content, opencv_analysis, opencv_line_mapping | |
| ) | |
| else: | |
| indent_info = self.indent_detector.detect_indentation(line.content) | |
| # Text classification with OpenCV context | |
| context = { | |
| 'y_position': y_pos, | |
| 'x_position': x_pos | |
| } | |
| text_classification = self.indent_detector.classify_text_type( | |
| line.content, context, opencv_analysis | |
| ) | |
| content_items.append({ | |
| 'type': 'line', | |
| 'content': indent_info['content'], | |
| 'role': 'text', | |
| 'y_pos': y_pos, | |
| 'x_pos': x_pos, | |
| 'indent_info': indent_info, | |
| 'text_classification': text_classification, | |
| 'opencv_analysis': opencv_line_mapping, | |
| 'preserve_spacing': True | |
| }) | |
| return content_items | |
| def _text_to_html_opencv_enhanced(self, item): | |
| """Convert text item to HTML with OpenCV-enhanced formatting and bold detection""" | |
| content = item['content'] | |
| role = item.get('role', 'paragraph') | |
| indent_info = item.get('indent_info', {}) | |
| text_classification = item.get('text_classification', {}) | |
| opencv_analysis = item.get('opencv_analysis', {}) | |
| preserve_spacing = item.get('preserve_spacing', False) | |
| # Build CSS classes based on indentation info, text classification, and OpenCV | |
| css_classes = ['paragraph'] | |
| # Check if OpenCV detected this as a bold header | |
| is_opencv_bold_header = False | |
| if opencv_analysis and opencv_analysis.get('is_bold') and opencv_analysis.get('is_likely_header'): | |
| is_opencv_bold_header = True | |
| css_classes.append('opencv-bold-header') | |
| # Add text classification class | |
| if text_classification.get('type'): | |
| css_classes.append(f"content-{text_classification['type']}") | |
| # Add indentation level class ONLY if not a bold header | |
| if not is_opencv_bold_header and not indent_info.get('suppress_indentation', False): | |
| level = indent_info.get('level', 0) | |
| css_classes.append(f'indent-level-{min(level, 10)}') | |
| # Add pattern-specific formatting ONLY if not a bold header | |
| if not is_opencv_bold_header: | |
| formatting_hint = indent_info.get('formatting_hint', 'normal_text') | |
| if formatting_hint != 'normal_text': | |
| css_classes.append(formatting_hint) | |
| # Add space indent class if needed and not a bold header | |
| if not is_opencv_bold_header and indent_info.get('pattern_type') == 'space_indent': | |
| css_classes.append('space-indent') | |
| # Add OpenCV analysis indicators | |
| if opencv_analysis: | |
| if opencv_analysis.get('is_bold'): | |
| css_classes.append('opencv-text-block') | |
| # Preserve internal spacing | |
| if preserve_spacing: | |
| content = re.sub(r' +', lambda m: ' ' * len(m.group()), content) | |
| content = content.replace('\n', '<br>') | |
| # Add pattern marker if needed (but not for bullets or bold headers) | |
| pattern_marker = indent_info.get('pattern_marker', '') | |
| if (pattern_marker and | |
| not indent_info.get('is_bullet', False) and | |
| not is_opencv_bold_header): | |
| # For numbered/lettered items, include the marker | |
| content = f"{pattern_marker} {content}" | |
| # Build final HTML with OpenCV enhancement | |
| class_str = f' class="{" ".join(css_classes)}"' | |
| # Use OpenCV and text classification to determine HTML structure | |
| if is_opencv_bold_header: | |
| return f'<div class="opencv-bold-header"{class_str}>{content}</div>' | |
| elif (text_classification.get('is_header') and | |
| text_classification.get('confidence', 0) > 0.6 and | |
| not is_opencv_bold_header): | |
| return f'<div class="content-header"{class_str}>{content}</div>' | |
| elif role == 'title': | |
| return f'<div class="title"{class_str}>{content}</div>' | |
| elif role == 'sectionHeading': | |
| return f'<div class="section-heading"{class_str}>{content}</div>' | |
| else: | |
| return f'<div{class_str}>{content}</div>' | |
| def _table_to_html(self, table, table_idx): | |
| """Convert table to HTML with improved cell alignment and artifact removal""" | |
| if not table.cells: | |
| return f'<div class="table-container"><h4>Table {table_idx + 1} (Empty)</h4></div>' | |
| # Get table dimensions | |
| max_row = max(cell.row_index for cell in table.cells) + 1 | |
| max_col = max(cell.column_index for cell in table.cells) + 1 | |
| # Create table matrix with cell span information | |
| table_matrix = [[{"content": "", "rowspan": 1, "colspan": 1, "occupied": False} | |
| for _ in range(max_col)] for _ in range(max_row)] | |
| # Fill matrix with proper handling of spans | |
| for cell in table.cells: | |
| row_idx = cell.row_index | |
| col_idx = cell.column_index | |
| # Clean the content | |
| content = self.clean_ocr_artifacts(cell.content or "").strip() | |
| # Get span information | |
| rowspan = getattr(cell, 'row_span', 1) or 1 | |
| colspan = getattr(cell, 'column_span', 1) or 1 | |
| # Mark this cell and any cells it spans over | |
| if row_idx < max_row and col_idx < max_col: | |
| # Find the first non-occupied cell in this position | |
| while col_idx < max_col and table_matrix[row_idx][col_idx]["occupied"]: | |
| col_idx += 1 | |
| if col_idx < max_col: | |
| table_matrix[row_idx][col_idx]["content"] = content | |
| table_matrix[row_idx][col_idx]["rowspan"] = rowspan | |
| table_matrix[row_idx][col_idx]["colspan"] = colspan | |
| # Mark spanned cells as occupied | |
| for r in range(row_idx, min(row_idx + rowspan, max_row)): | |
| for c in range(col_idx, min(col_idx + colspan, max_col)): | |
| if r != row_idx or c != col_idx: | |
| table_matrix[r][c]["occupied"] = True | |
| # Generate HTML | |
| html_parts = [f'<div class="table-container">'] | |
| html_parts.append(f'<h4>Table {table_idx + 1}</h4>') | |
| html_parts.append('<table class="table">') | |
| for row_idx, row in enumerate(table_matrix): | |
| html_parts.append('<tr>') | |
| for col_idx, cell in enumerate(row): | |
| if not cell["occupied"]: | |
| content = cell["content"] | |
| rowspan_attr = f' rowspan="{cell["rowspan"]}"' if cell["rowspan"] > 1 else '' | |
| colspan_attr = f' colspan="{cell["colspan"]}"' if cell["colspan"] > 1 else '' | |
| if row_idx == 0 and content.strip(): # Header row | |
| html_parts.append(f'<th{rowspan_attr}{colspan_attr}>{content}</th>') | |
| else: | |
| html_parts.append(f'<td{rowspan_attr}{colspan_attr}>{content}</td>') | |
| html_parts.append('</tr>') | |
| html_parts.append('</table></div>') | |
| return '\n'.join(html_parts) | |
| def _is_table_on_page(self, table, page_num): | |
| """Check if table belongs to the specified page""" | |
| if not table.cells: | |
| return False | |
| for cell in table.cells: | |
| if (cell.bounding_regions and | |
| cell.bounding_regions[0].page_number == page_num): | |
| return True | |
| return False | |
| def _calculate_table_overlap(self, content_item, table_regions): | |
| """Calculate overlap ratio between content and tables""" | |
| if not table_regions or not content_item.bounding_regions: | |
| return 0.0 | |
| content_polygon = content_item.bounding_regions[0].polygon | |
| if not content_polygon or len(content_polygon) < 8: | |
| return 0.0 | |
| # Content bounding box | |
| content_x1 = min(content_polygon[0], content_polygon[2], content_polygon[4], content_polygon[6]) | |
| content_x2 = max(content_polygon[0], content_polygon[2], content_polygon[4], content_polygon[6]) | |
| content_y1 = min(content_polygon[1], content_polygon[3], content_polygon[5], content_polygon[7]) | |
| content_y2 = max(content_polygon[1], content_polygon[3], content_polygon[5], content_polygon[7]) | |
| content_area = (content_x2 - content_x1) * (content_y2 - content_y1) | |
| if content_area <= 0: | |
| return 0.0 | |
| max_overlap_ratio = 0.0 | |
| for table_region in table_regions: | |
| table_polygon = table_region['polygon'] | |
| if not table_polygon or len(table_polygon) < 8: | |
| continue | |
| # Table bounding box | |
| table_x1 = min(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) | |
| table_x2 = max(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) | |
| table_y1 = min(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) | |
| table_y2 = max(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) | |
| # Calculate intersection | |
| intersect_x1 = max(content_x1, table_x1) | |
| intersect_x2 = min(content_x2, table_x2) | |
| intersect_y1 = max(content_y1, table_y1) | |
| intersect_y2 = min(content_y2, table_y2) | |
| if intersect_x2 > intersect_x1 and intersect_y2 > intersect_y1: | |
| intersect_area = (intersect_x2 - intersect_x1) * (intersect_y2 - intersect_y1) | |
| overlap_ratio = intersect_area / content_area | |
| max_overlap_ratio = max(max_overlap_ratio, overlap_ratio) | |
| return max_overlap_ratio | |
| def _calculate_line_table_overlap(self, line, table_regions): | |
| """Calculate overlap between line and tables""" | |
| if not table_regions or not line.polygon: | |
| return 0.0 | |
| line_polygon = line.polygon | |
| if len(line_polygon) < 8: | |
| return 0.0 | |
| # Line bounding box | |
| line_x1 = min(line_polygon[0], line_polygon[2], line_polygon[4], line_polygon[6]) | |
| line_x2 = max(line_polygon[0], line_polygon[2], line_polygon[4], line_polygon[6]) | |
| line_y1 = min(line_polygon[1], line_polygon[3], line_polygon[5], line_polygon[7]) | |
| line_y2 = max(line_polygon[1], line_polygon[3], line_polygon[5], line_polygon[7]) | |
| line_area = (line_x2 - line_x1) * (line_y2 - line_y1) | |
| if line_area <= 0: | |
| return 0.0 | |
| max_overlap = 0.0 | |
| for table_region in table_regions: | |
| table_polygon = table_region['polygon'] | |
| if not table_polygon or len(table_polygon) < 8: | |
| continue | |
| table_x1 = min(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) | |
| table_x2 = max(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) | |
| table_y1 = min(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) | |
| table_y2 = max(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) | |
| # Calculate intersection | |
| intersect_x1 = max(line_x1, table_x1) | |
| intersect_x2 = min(line_x2, table_x2) | |
| intersect_y1 = max(line_y1, table_y1) | |
| intersect_y2 = min(line_y2, table_y2) | |
| if intersect_x2 > intersect_x1 and intersect_y2 > intersect_y1: | |
| intersect_area = (intersect_x2 - intersect_x1) * (intersect_y2 - intersect_y1) | |
| overlap_ratio = intersect_area / line_area | |
| max_overlap = max(max_overlap, overlap_ratio) | |
| return max_overlap | |
| def clean_ocr_artifacts(text: str) -> str: | |
| """Remove OCR artifacts like checkbox markers and clean up text""" | |
| if not text: | |
| return text | |
| # Remove checkbox markers | |
| text = re.sub(r':unselected:', '', text) | |
| text = re.sub(r':selected:', '', text) # Replace with checkmark | |
| # Clean up multiple spaces | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def html_to_formatted_text_enhanced(html_content): | |
| """Convert HTML back to formatted text with OpenCV-enhanced preservation""" | |
| from html.parser import HTMLParser | |
| class OpenCVEnhancedTextExtractor(HTMLParser): | |
| def __init__(self): | |
| super().__init__() | |
| self.text_parts = [] | |
| self.indent_detector = EnhancedIndentationDetector() | |
| self.in_title = False | |
| self.in_section_heading = False | |
| self.in_table = False | |
| self.current_table_row = [] | |
| self.table_data = [] | |
| self.current_indent_level = 0 | |
| self.current_formatting_hint = 'normal_text' | |
| self.in_page_header = False | |
| self.current_classes = [] | |
| self.in_content_header = False | |
| self.in_opencv_bold_header = False | |
| def handle_starttag(self, tag, attrs): | |
| attr_dict = dict(attrs) | |
| class_attr = attr_dict.get('class', '') | |
| self.current_classes = class_attr.split() | |
| if 'opencv-bold-header' in class_attr: | |
| self.in_opencv_bold_header = True | |
| # Bold headers get special treatment - no indentation | |
| elif 'page-header' in class_attr: | |
| self.in_page_header = True | |
| if len(self.text_parts) > 0: | |
| self.text_parts.append('\n\n' + '=' * 80 + '\n') | |
| elif 'content-header' in class_attr: | |
| self.in_content_header = True | |
| elif 'title' in class_attr: | |
| self.in_title = True | |
| elif 'section-heading' in class_attr: | |
| self.in_section_heading = True | |
| elif tag == 'table': | |
| self.in_table = True | |
| self.table_data = [] | |
| elif tag == 'tr': | |
| self.current_table_row = [] | |
| elif tag == 'br': | |
| self.text_parts.append('\n') | |
| # Extract indent level from class ONLY if not OpenCV bold header | |
| if not self.in_opencv_bold_header: | |
| for cls in self.current_classes: | |
| if cls.startswith('indent-level-'): | |
| try: | |
| self.current_indent_level = int(cls.split('-')[-1]) | |
| except ValueError: | |
| self.current_indent_level = 0 | |
| break | |
| else: | |
| self.current_indent_level = 0 | |
| else: | |
| self.current_indent_level = 0 # Force no indentation for bold headers | |
| # Extract formatting hint | |
| formatting_hints = [ | |
| 'numbered-primary', 'numbered-secondary', 'numbered-tertiary', 'numbered-quaternary', 'numbered-quinary', | |
| 'parenthetical-primary', 'parenthetical-secondary', 'parenthetical-tertiary', 'parenthetical-quaternary', | |
| 'bullet-primary', 'bullet-secondary', 'bullet-tertiary', 'bullet-quaternary', | |
| 'lettered-primary', 'lettered-secondary', | |
| 'roman-primary', 'roman-secondary', | |
| 'thai-primary', 'thai-secondary', | |
| 'indented_text', 'space-indent' | |
| ] | |
| for hint in formatting_hints: | |
| if hint in self.current_classes: | |
| self.current_formatting_hint = hint | |
| break | |
| else: | |
| self.current_formatting_hint = 'normal_text' | |
| def handle_endtag(self, tag): | |
| if tag == 'div' and self.in_opencv_bold_header: | |
| self.text_parts.append('\n\n') | |
| self.in_opencv_bold_header = False | |
| elif tag == 'div' and self.in_page_header: | |
| self.text_parts.append('\n' + '=' * 80 + '\n\n') | |
| self.in_page_header = False | |
| elif tag == 'div' and self.in_content_header: | |
| self.text_parts.append('\n\n') | |
| self.in_content_header = False | |
| elif tag == 'div' and self.in_title: | |
| self.text_parts.append('\n\n') | |
| self.in_title = False | |
| elif tag == 'div' and self.in_section_heading: | |
| self.text_parts.append('\n\n') | |
| self.in_section_heading = False | |
| elif tag == 'table': | |
| self.in_table = False | |
| self._format_table() | |
| elif tag == 'tr' and self.current_table_row: | |
| self.table_data.append(self.current_table_row[:]) | |
| elif tag == 'div' and not self.in_table: | |
| if not self.in_title and not self.in_section_heading and not self.in_page_header and not self.in_content_header and not self.in_opencv_bold_header: | |
| self.text_parts.append('\n') | |
| # Reset state | |
| if tag == 'div': | |
| self.current_indent_level = 0 | |
| self.current_formatting_hint = 'normal_text' | |
| self.current_classes = [] | |
| def handle_data(self, data): | |
| if data.strip(): | |
| # Clean OCR artifacts first | |
| data = data.replace(':unselected:', '') | |
| data = data.replace(':selected:', '') | |
| data = data.replace(' ', ' ') | |
| if self.in_page_header: | |
| page_match = re.search(r'Page (\d+)', data) | |
| if page_match: | |
| page_num = int(page_match.group(1)) | |
| page_header = f"PAGE {page_num}" | |
| self.text_parts.append(page_header.center(80)) | |
| elif self.in_opencv_bold_header: | |
| # OpenCV detected bold headers - no indentation, special formatting | |
| self.text_parts.append(f'\n## {data.strip().upper()}') | |
| elif self.in_content_header: | |
| indent_str = " " * self.current_indent_level # 4 spaces per level | |
| self.text_parts.append(f'\n{indent_str}# {data.strip()}') | |
| elif self.in_title: | |
| indent_str = " " * self.current_indent_level # 4 spaces per level | |
| self.text_parts.append(f'\n{indent_str}## {data.strip()}') | |
| elif self.in_section_heading: | |
| indent_str = " " * self.current_indent_level # 4 spaces per level | |
| self.text_parts.append(f'\n{indent_str}### {data.strip()}') | |
| elif self.in_table: | |
| self.current_table_row.append(data.strip()) | |
| else: | |
| # Apply OpenCV-enhanced indentation formatting using 4 spaces per level | |
| indent_str = " " * self.current_indent_level # 4 spaces per level | |
| # Handle different formatting hints including parenthetical using 4 spaces | |
| if 'bullet' in self.current_formatting_hint: | |
| # Use appropriate bullet symbol based on level | |
| if 'primary' in self.current_formatting_hint: | |
| bullet = '•' | |
| elif 'secondary' in self.current_formatting_hint: | |
| bullet = '◦' | |
| elif 'tertiary' in self.current_formatting_hint: | |
| bullet = '▪' | |
| elif 'quaternary' in self.current_formatting_hint: | |
| bullet = '‣' | |
| else: | |
| bullet = '•' | |
| self.text_parts.append(f'{indent_str}{bullet} {data.strip()}') | |
| elif any(pattern in self.current_formatting_hint for pattern in ['numbered', 'lettered', 'roman', 'thai', 'parenthetical']): | |
| # For numbered/lettered/parenthetical items, the marker should already be in the text | |
| self.text_parts.append(f'{indent_str}{data.strip()}') | |
| elif 'space-indent' in self.current_formatting_hint: | |
| # Simple indented text using 4 spaces | |
| self.text_parts.append(f'{indent_str}{data.strip()}') | |
| else: | |
| # Regular text with indentation using 4 spaces | |
| self.text_parts.append(f'{indent_str}{data.strip()}') | |
| def _format_table(self): | |
| """Format table with proper alignment""" | |
| if not self.table_data: | |
| return | |
| self.text_parts.append('\n\n') | |
| if self.table_data: | |
| max_cols = max(len(row) for row in self.table_data) | |
| col_widths = [0] * max_cols | |
| # Calculate column widths | |
| for row in self.table_data: | |
| for i, cell in enumerate(row): | |
| if i < max_cols: | |
| col_widths[i] = max(col_widths[i], len(str(cell))) | |
| # Ensure minimum column width | |
| col_widths = [max(width, 8) for width in col_widths] | |
| # Format rows with proper alignment | |
| for row_idx, row in enumerate(self.table_data): | |
| formatted_cells = [] | |
| for i, cell in enumerate(row): | |
| if i < max_cols: | |
| width = col_widths[i] | |
| formatted_cells.append(str(cell).ljust(width)) | |
| row_text = ' | '.join(formatted_cells) | |
| self.text_parts.append(row_text) | |
| # Add separator after header | |
| if row_idx == 0 and len(self.table_data) > 1: | |
| separator_cells = ['-' * col_widths[i] for i in range(max_cols)] | |
| separator_text = ' | '.join(separator_cells) | |
| self.text_parts.append(separator_text) | |
| self.text_parts.append('\n') | |
| self.text_parts.append('\n') | |
| extractor = OpenCVEnhancedTextExtractor() | |
| extractor.feed(html_content) | |
| result = ''.join(extractor.text_parts) | |
| # Clean up excessive newlines while preserving intentional spacing | |
| result = re.sub(r'\n{4,}', '\n\n\n', result) # Max 3 consecutive newlines | |
| # Ensure proper spacing around page headers | |
| result = re.sub(r'(={80})\n*([A-Z ]+)\n*(={80})', r'\1\n\2\n\3', result) | |
| return result.strip() | |
| def _validate_and_fix_table_structure(self, table_matrix): | |
| """Validate and fix common table structure issues""" | |
| if not table_matrix: | |
| return table_matrix | |
| max_row = len(table_matrix) | |
| max_col = len(table_matrix[0]) if table_matrix else 0 | |
| # Ensure all rows have same number of columns | |
| for row in table_matrix: | |
| while len(row) < max_col: | |
| row.append({"content": "", "rowspan": 1, "colspan": 1, "occupied": False}) | |
| # Remove completely empty rows | |
| table_matrix = [row for row in table_matrix if any(cell["content"].strip() for cell in row)] | |
| # Merge cells with identical content in adjacent columns (likely split cells) | |
| for row_idx, row in enumerate(table_matrix): | |
| col_idx = 0 | |
| while col_idx < len(row) - 1: | |
| current = row[col_idx] | |
| next_cell = row[col_idx + 1] | |
| if (current["content"] == next_cell["content"] and | |
| current["content"].strip() and | |
| not current["occupied"] and not next_cell["occupied"]): | |
| # Merge cells | |
| current["colspan"] += next_cell["colspan"] | |
| next_cell["occupied"] = True | |
| col_idx += 1 | |
| return table_matrix | |
| class OCRService: | |
| """Main OCR service with OpenCV-enhanced text analysis, spacing detection, and bold text recognition""" | |
| def __init__(self): | |
| self.azure_endpoint = os.getenv('AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT') | |
| self.azure_key = os.getenv('AZURE_DOCUMENT_INTELLIGENCE_KEY') | |
| # Initialize Azure client if credentials are available | |
| self.azure_client = None | |
| if self.azure_endpoint and self.azure_key: | |
| try: | |
| self.azure_client = DocumentIntelligenceClient( | |
| endpoint=self.azure_endpoint, | |
| credential=AzureKeyCredential(self.azure_key) | |
| ) | |
| logger.info("Azure Document Intelligence client initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Azure client: {e}") | |
| else: | |
| logger.warning("Azure credentials not found. Azure OCR will be unavailable.") | |
| def convert_pdf_to_text(self, pdf_path: str, method: str = "auto") -> Dict[str, Any]: | |
| """ | |
| Convert PDF to text using specified method with OpenCV-enhanced processing | |
| Args: | |
| pdf_path: Path to the PDF file | |
| method: OCR method ('azure', 'tesseract', 'pymupdf', 'auto') | |
| Returns: | |
| Dict containing text content, HTML, metadata, and OpenCV analysis | |
| """ | |
| result = { | |
| 'success': False, | |
| 'text': '', | |
| 'html': '', | |
| 'method_used': '', | |
| 'metadata': {}, | |
| 'error': None | |
| } | |
| if not os.path.exists(pdf_path): | |
| result['error'] = f"PDF file not found: {pdf_path}" | |
| return result | |
| # Auto method selection | |
| if method == "auto": | |
| if self.azure_client: | |
| method = "azure" | |
| elif self._check_tesseract_available(): | |
| method = "tesseract" | |
| else: | |
| method = "pymupdf" | |
| # Try primary method | |
| try: | |
| if method == "azure" and self.azure_client: | |
| result = self._azure_ocr_with_opencv_enhancement(pdf_path) | |
| elif method == "tesseract": | |
| result = self._tesseract_ocr_with_opencv(pdf_path) | |
| elif method == "pymupdf": | |
| result = self._pymupdf_extract_with_opencv(pdf_path) | |
| else: | |
| result['error'] = f"Method '{method}' not available or not configured" | |
| except Exception as e: | |
| logger.error(f"Primary method '{method}' failed: {e}") | |
| result['error'] = str(e) | |
| # Fallback mechanism | |
| if not result['success']: | |
| logger.info("Primary method failed, trying fallback methods...") | |
| result = self._try_fallback_methods(pdf_path, exclude_method=method) | |
| return result | |
| def _extract_page_images_from_pdf(self, pdf_path: str) -> Dict[int, np.ndarray]: | |
| """Extract page images for OpenCV analysis""" | |
| page_images = {} | |
| pdf_document = None | |
| try: | |
| pdf_document = fitz.open(pdf_path) | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document.load_page(page_num) | |
| # Render page to image for OpenCV analysis | |
| mat = fitz.Matrix(2.0, 2.0) # High resolution for better analysis | |
| pix = page.get_pixmap(matrix=mat) | |
| # Convert to numpy array | |
| img_data = pix.tobytes("png") | |
| import io | |
| from PIL import Image | |
| pil_image = Image.open(io.BytesIO(img_data)) | |
| img_array = np.array(pil_image) | |
| # Convert RGB to BGR for OpenCV | |
| if len(img_array.shape) == 3: | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
| page_images[page_num + 1] = img_array | |
| except Exception as e: | |
| logger.error(f"Error extracting page images: {e}") | |
| finally: | |
| if pdf_document: | |
| pdf_document.close() | |
| return page_images | |
| def _azure_ocr_with_opencv_enhancement(self, pdf_path: str) -> Dict[str, Any]: | |
| """Azure Document Intelligence OCR with OpenCV-enhanced text analysis and bold detection""" | |
| result = { | |
| 'success': False, | |
| 'text': '', | |
| 'html': '', | |
| 'method_used': 'azure_document_intelligence_opencv_enhanced', | |
| 'metadata': {}, | |
| 'error': None | |
| } | |
| try: | |
| # Extract page images for OpenCV analysis | |
| page_images = self._extract_page_images_from_pdf(pdf_path) | |
| with open(pdf_path, 'rb') as pdf_file: | |
| file_content = pdf_file.read() | |
| # Use enhanced analysis features | |
| from azure.ai.documentintelligence.models import AnalyzeDocumentRequest | |
| try: | |
| # Try with features parameter for better table extraction | |
| poller = self.azure_client.begin_analyze_document( | |
| "prebuilt-layout", | |
| body=file_content, | |
| content_type="application/pdf", | |
| features=["keyValuePairs"], # Enable key-value pair detection | |
| output_content_format="markdown" # Better structure preservation | |
| ) | |
| except (TypeError, AttributeError): | |
| # Fallback to basic call | |
| try: | |
| poller = self.azure_client.begin_analyze_document( | |
| "prebuilt-layout", | |
| body=file_content, | |
| content_type="application/pdf" | |
| ) | |
| except TypeError: | |
| try: | |
| poller = self.azure_client.begin_analyze_document( | |
| model_id="prebuilt-layout", | |
| body=file_content | |
| ) | |
| except TypeError: | |
| pdf_file.seek(0) | |
| poller = self.azure_client.begin_analyze_document( | |
| "prebuilt-layout", | |
| document=pdf_file | |
| ) | |
| analysis_result = poller.result() | |
| # Generate HTML with OpenCV-enhanced processing | |
| html_content = EnhancedHTMLProcessor.create_html_from_azure_result( | |
| analysis_result, page_images | |
| ) | |
| # Convert HTML to formatted text with OpenCV enhancement | |
| formatted_text = EnhancedHTMLProcessor.html_to_formatted_text_enhanced(html_content) | |
| # Analyze document structure with OpenCV enhancement | |
| detector = EnhancedIndentationDetector() | |
| text_lines = formatted_text.split('\n') | |
| # Perform OpenCV analysis on first page for overall document analysis | |
| opencv_document_analysis = None | |
| if page_images: | |
| first_page_image = list(page_images.values())[0] | |
| opencv_document_analysis = detector.opencv_analyzer.analyze_text_blocks( | |
| first_page_image, text_lines | |
| ) | |
| document_analysis = detector.analyze_document_structure_with_opencv( | |
| text_lines, None # We already have the OpenCV analysis | |
| ) | |
| if opencv_document_analysis: | |
| document_analysis['opencv_global_analysis'] = opencv_document_analysis | |
| result.update({ | |
| 'success': True, | |
| 'text': formatted_text, | |
| 'html': html_content, | |
| 'metadata': { | |
| 'pages': len(analysis_result.pages) if analysis_result.pages else 0, | |
| 'tables': len(analysis_result.tables) if analysis_result.tables else 0, | |
| 'paragraphs': len(analysis_result.paragraphs) if hasattr(analysis_result, 'paragraphs') and analysis_result.paragraphs else 0, | |
| 'has_handwritten': any(style.is_handwritten for style in analysis_result.styles) if analysis_result.styles else False, | |
| 'html_generated': True, | |
| 'opencv_enhanced': True, | |
| 'opencv_bold_detection': True, | |
| 'opencv_spacing_analysis': True, | |
| 'enhanced_indentation': True, | |
| 'intelligent_text_classification': True, | |
| 'parenthetical_patterns_supported': True, | |
| 'page_numbers_added': True, | |
| 'comprehensive_formatting': True, | |
| 'azure_analysis': analysis_result, | |
| 'document_structure_analysis': document_analysis, | |
| 'page_images_processed': len(page_images) | |
| } | |
| }) | |
| logger.info("Azure OCR with OpenCV enhancement completed successfully") | |
| logger.info(f"OpenCV analysis: {len(page_images)} pages processed with text block and bold detection") | |
| except Exception as e: | |
| logger.error(f"Azure OCR with OpenCV error: {e}") | |
| result['error'] = f"Azure OCR with OpenCV error: {e}" | |
| return result | |
| def _tesseract_ocr_with_opencv(self, pdf_path: str) -> Dict[str, Any]: | |
| """Tesseract OCR with OpenCV-enhanced text analysis and bold detection""" | |
| result = { | |
| 'success': False, | |
| 'text': '', | |
| 'html': '', | |
| 'method_used': 'tesseract_opencv_enhanced', | |
| 'metadata': {}, | |
| 'error': None | |
| } | |
| if not TESSERACT_AVAILABLE: | |
| result['error'] = "Tesseract not available" | |
| return result | |
| pdf_document = None | |
| try: | |
| pdf_document = fitz.open(pdf_path) | |
| page_count = len(pdf_document) | |
| all_text = [] | |
| html_parts = ['<!DOCTYPE html><html><head><meta charset="UTF-8"><style>'] | |
| html_parts.append(''' | |
| body { font-family: "Consolas", monospace; line-height: 1.6; margin: 20px; } | |
| .page { margin-bottom: 30px; border: 1px solid #ddd; padding: 20px; } | |
| .page-header { font-weight: bold; text-align: center; border-bottom: 2px solid #3498db; padding-bottom: 8px; margin-bottom: 15px; } | |
| .paragraph { margin-bottom: 0.8em; white-space: pre-wrap; } | |
| .opencv-bold-header { font-weight: bold; color: #2c3e50; font-size: 1.3em; margin: 20px 0 15px 0; border-left: 4px solid #e74c3c; padding-left: 12px; background-color: #fdf2f2; } | |
| .content-header { font-weight: bold; color: #2c3e50; margin: 10px 0; } | |
| .content-paragraph { margin-bottom: 1em; } | |
| .content-list-item { margin-bottom: 0.5em; } | |
| ''') | |
| html_parts.append('</style></head><body>') | |
| indent_detector = EnhancedIndentationDetector() | |
| opencv_analyzer = OpenCVTextAnalyzer() | |
| for page_num in range(page_count): | |
| # Add page header to text | |
| page_header = f"\n{'=' * 80}\n{'PAGE ' + str(page_num + 1).center(74)}\n{'=' * 80}\n\n" | |
| all_text.append(page_header) | |
| page = pdf_document.load_page(page_num) | |
| # Render page to image | |
| mat = fitz.Matrix(2.0, 2.0) | |
| pix = page.get_pixmap(matrix=mat) | |
| img_data = pix.tobytes("png") | |
| temp_img_path = None | |
| opencv_analysis = None | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_img: | |
| temp_img.write(img_data) | |
| temp_img_path = temp_img.name | |
| # Convert to OpenCV format for analysis | |
| img_cv = cv2.imread(temp_img_path) | |
| processed_img = self._preprocess_image(temp_img_path) | |
| custom_config = r'--oem 3 --psm 6 -c preserve_interword_spaces=1' | |
| text = pytesseract.image_to_string(processed_img, config=custom_config, lang='eng') | |
| all_text.append(text) | |
| # Perform OpenCV analysis | |
| text_lines = text.split('\n') | |
| opencv_analysis = opencv_analyzer.analyze_text_blocks(img_cv, text_lines) | |
| # Add to HTML with OpenCV-enhanced processing | |
| html_parts.append(f'<div class="page">') | |
| html_parts.append(f'<div class="page-header">Page {page_num + 1}</div>') | |
| # Process each line with OpenCV enhancement | |
| lines = text.split('\n') | |
| for line in lines: | |
| if line.strip(): | |
| # Find OpenCV mapping for this line | |
| opencv_line_mapping = None | |
| if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: | |
| for mapping in opencv_analysis['line_mappings']: | |
| if mapping.get('text', '').strip() == line.strip(): | |
| opencv_line_mapping = mapping | |
| break | |
| # Enhanced indentation detection with OpenCV | |
| if opencv_line_mapping: | |
| indent_info = indent_detector.detect_indentation_with_opencv( | |
| line, opencv_analysis, opencv_line_mapping | |
| ) | |
| else: | |
| indent_info = indent_detector.detect_indentation(line) | |
| text_classification = indent_detector.classify_text_type( | |
| line, opencv_analysis=opencv_analysis | |
| ) | |
| # Build CSS classes | |
| css_classes = [] | |
| # Check if OpenCV detected bold header | |
| is_opencv_bold_header = (opencv_line_mapping and | |
| opencv_line_mapping.get('is_bold') and | |
| opencv_line_mapping.get('is_likely_header')) | |
| if is_opencv_bold_header: | |
| css_classes.append('opencv-bold-header') | |
| else: | |
| level = indent_info.get('level', 0) | |
| css_classes.append(f'indent-level-{min(level, 10)}') | |
| formatting_hint = indent_info.get('formatting_hint', 'normal_text') | |
| if formatting_hint != 'normal_text': | |
| css_classes.append(formatting_hint) | |
| # Add text classification class | |
| if text_classification.get('type'): | |
| css_classes.append(f"content-{text_classification['type']}") | |
| class_str = f' class="paragraph {" ".join(css_classes)}"' | |
| content = indent_info.get('content', line.strip()) | |
| # Add marker for non-bullet items (unless bold header) | |
| if not is_opencv_bold_header: | |
| marker = indent_info.get('pattern_marker', '') | |
| if marker and not indent_info.get('is_bullet', False): | |
| content = f"{marker} {content}" | |
| html_parts.append(f'<div{class_str}>{content}</div>') | |
| else: | |
| html_parts.append('<div class="paragraph"><br></div>') | |
| html_parts.append('</div>') | |
| finally: | |
| if temp_img_path and os.path.exists(temp_img_path): | |
| try: | |
| os.unlink(temp_img_path) | |
| except: | |
| pass | |
| html_parts.append('</body></html>') | |
| # Convert HTML back to formatted text | |
| html_content = '\n'.join(html_parts) | |
| formatted_text = EnhancedHTMLProcessor.html_to_formatted_text_enhanced(html_content) | |
| result.update({ | |
| 'success': True, | |
| 'text': formatted_text, | |
| 'html': html_content, | |
| 'metadata': { | |
| 'pages': page_count, | |
| 'html_generated': True, | |
| 'opencv_enhanced': True, | |
| 'opencv_bold_detection': True, | |
| 'opencv_spacing_analysis': True, | |
| 'enhanced_indentation': True, | |
| 'intelligent_text_classification': True, | |
| 'parenthetical_patterns_supported': True, | |
| 'page_numbers_added': True, | |
| 'comprehensive_formatting': True | |
| } | |
| }) | |
| logger.info("Tesseract OCR with OpenCV enhancement completed successfully") | |
| except Exception as e: | |
| logger.error(f"Tesseract OCR with OpenCV error: {e}") | |
| result['error'] = f"Tesseract OCR with OpenCV error: {e}" | |
| finally: | |
| if pdf_document is not None: | |
| try: | |
| pdf_document.close() | |
| except: | |
| pass | |
| return result | |
| def _pymupdf_extract_with_opencv(self, pdf_path: str) -> Dict[str, Any]: | |
| """PyMuPDF text extraction with OpenCV-enhanced analysis and bold detection""" | |
| result = { | |
| 'success': False, | |
| 'text': '', | |
| 'html': '', | |
| 'method_used': 'pymupdf_opencv_enhanced', | |
| 'metadata': {}, | |
| 'error': None | |
| } | |
| pdf_document = None | |
| try: | |
| pdf_document = fitz.open(pdf_path) | |
| page_count = len(pdf_document) | |
| all_text = [] | |
| html_parts = ['<!DOCTYPE html><html><head><meta charset="UTF-8"><style>'] | |
| html_parts.append(''' | |
| body { font-family: "Consolas", monospace; line-height: 1.6; margin: 20px; } | |
| .page { margin-bottom: 30px; border: 1px solid #ddd; padding: 20px; } | |
| .page-header { font-weight: bold; text-align: center; border-bottom: 2px solid #3498db; padding-bottom: 8px; margin-bottom: 15px; } | |
| .paragraph { margin-bottom: 0.8em; white-space: pre-wrap; } | |
| .opencv-bold-header { font-weight: bold; color: #2c3e50; font-size: 1.3em; margin: 20px 0 15px 0; border-left: 4px solid #e74c3c; padding-left: 12px; background-color: #fdf2f2; } | |
| .content-header { font-weight: bold; color: #2c3e50; margin: 10px 0; } | |
| .content-paragraph { margin-bottom: 1em; } | |
| .content-list-item { margin-bottom: 0.5em; } | |
| ''') | |
| html_parts.append('</style></head><body>') | |
| indent_detector = EnhancedIndentationDetector() | |
| opencv_analyzer = OpenCVTextAnalyzer() | |
| for page_num in range(page_count): | |
| # Add page header to text | |
| page_header = f"\n{'=' * 80}\n{'PAGE ' + str(page_num + 1).center(74)}\n{'=' * 80}\n\n" | |
| all_text.append(page_header) | |
| page = pdf_document.load_page(page_num) | |
| text = page.get_text() | |
| all_text.append(text) | |
| # Get page image for OpenCV analysis | |
| mat = fitz.Matrix(2.0, 2.0) | |
| pix = page.get_pixmap(matrix=mat) | |
| img_data = pix.tobytes("png") | |
| # Convert to OpenCV format | |
| import io | |
| from PIL import Image | |
| pil_image = Image.open(io.BytesIO(img_data)) | |
| img_array = np.array(pil_image) | |
| img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
| # Perform OpenCV analysis | |
| text_lines = text.split('\n') | |
| opencv_analysis = opencv_analyzer.analyze_text_blocks(img_cv, text_lines) | |
| # Add to HTML with OpenCV-enhanced processing | |
| html_parts.append(f'<div class="page">') | |
| html_parts.append(f'<div class="page-header">Page {page_num + 1}</div>') | |
| # Process each line with OpenCV enhancement | |
| lines = text.split('\n') | |
| for line in lines: | |
| if line.strip(): | |
| # Find OpenCV mapping for this line | |
| opencv_line_mapping = None | |
| if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: | |
| for mapping in opencv_analysis['line_mappings']: | |
| if mapping.get('text', '').strip() == line.strip(): | |
| opencv_line_mapping = mapping | |
| break | |
| # Enhanced indentation detection with OpenCV | |
| if opencv_line_mapping: | |
| indent_info = indent_detector.detect_indentation_with_opencv( | |
| line, opencv_analysis, opencv_line_mapping | |
| ) | |
| else: | |
| indent_info = indent_detector.detect_indentation(line) | |
| text_classification = indent_detector.classify_text_type( | |
| line, opencv_analysis=opencv_analysis | |
| ) | |
| # Build CSS classes | |
| css_classes = [] | |
| # Check if OpenCV detected bold header | |
| is_opencv_bold_header = (opencv_line_mapping and | |
| opencv_line_mapping.get('is_bold') and | |
| opencv_line_mapping.get('is_likely_header')) | |
| if is_opencv_bold_header: | |
| css_classes.append('opencv-bold-header') | |
| else: | |
| level = indent_info.get('level', 0) | |
| css_classes.append(f'indent-level-{min(level, 10)}') | |
| formatting_hint = indent_info.get('formatting_hint', 'normal_text') | |
| if formatting_hint != 'normal_text': | |
| css_classes.append(formatting_hint) | |
| # Add text classification class | |
| if text_classification.get('type'): | |
| css_classes.append(f"content-{text_classification['type']}") | |
| class_str = f' class="paragraph {" ".join(css_classes)}"' | |
| content = indent_info.get('content', line.strip()) | |
| # Add marker for non-bullet items (unless bold header) | |
| if not is_opencv_bold_header: | |
| marker = indent_info.get('pattern_marker', '') | |
| if marker and not indent_info.get('is_bullet', False): | |
| content = f"{marker} {content}" | |
| html_parts.append(f'<div{class_str}>{content}</div>') | |
| else: | |
| html_parts.append('<div class="paragraph"><br></div>') | |
| html_parts.append('</div>') | |
| html_parts.append('</body></html>') | |
| # Convert HTML back to formatted text | |
| html_content = '\n'.join(html_parts) | |
| formatted_text = EnhancedHTMLProcessor.html_to_formatted_text_enhanced(html_content) | |
| result.update({ | |
| 'success': True, | |
| 'text': formatted_text, | |
| 'html': html_content, | |
| 'metadata': { | |
| 'pages': page_count, | |
| 'html_generated': True, | |
| 'opencv_enhanced': True, | |
| 'opencv_bold_detection': True, | |
| 'opencv_spacing_analysis': True, | |
| 'enhanced_indentation': True, | |
| 'intelligent_text_classification': True, | |
| 'parenthetical_patterns_supported': True, | |
| 'page_numbers_added': True, | |
| 'comprehensive_formatting': True | |
| } | |
| }) | |
| logger.info("PyMuPDF extraction with OpenCV enhancement completed successfully") | |
| except Exception as e: | |
| logger.error(f"PyMuPDF with OpenCV error: {e}") | |
| result['error'] = f"PyMuPDF with OpenCV error: {e}" | |
| finally: | |
| if pdf_document is not None: | |
| try: | |
| pdf_document.close() | |
| except: | |
| pass | |
| return result | |
| def _preprocess_image(self, image_path: str) -> np.ndarray: | |
| """Preprocess image for better OCR accuracy""" | |
| img = cv2.imread(image_path) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| denoised = cv2.medianBlur(gray, 3) | |
| _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| return binary | |
| def _try_fallback_methods(self, pdf_path: str, exclude_method: str = None) -> Dict[str, Any]: | |
| """Try fallback OCR methods""" | |
| fallback_methods = [] | |
| if exclude_method != "azure" and self.azure_client: | |
| fallback_methods.append("azure") | |
| if exclude_method != "tesseract" and self._check_tesseract_available(): | |
| fallback_methods.append("tesseract") | |
| if exclude_method != "pymupdf": | |
| fallback_methods.append("pymupdf") | |
| for method in fallback_methods: | |
| logger.info(f"Trying fallback method: {method}") | |
| try: | |
| if method == "azure": | |
| result = self._azure_ocr_with_opencv_enhancement(pdf_path) | |
| elif method == "tesseract": | |
| result = self._tesseract_ocr_with_opencv(pdf_path) | |
| elif method == "pymupdf": | |
| result = self._pymupdf_extract_with_opencv(pdf_path) | |
| if result['success']: | |
| result['method_used'] += '_fallback' | |
| return result | |
| except Exception as e: | |
| logger.error(f"Fallback method {method} failed: {e}") | |
| continue | |
| return { | |
| 'success': False, | |
| 'text': '', | |
| 'html': '', | |
| 'method_used': 'all_methods_failed', | |
| 'metadata': {}, | |
| 'error': 'All OCR methods failed' | |
| } | |
| def _check_tesseract_available(self) -> bool: | |
| """Check if Tesseract is available""" | |
| if not TESSERACT_AVAILABLE: | |
| return False | |
| try: | |
| pytesseract.get_tesseract_version() | |
| return True | |
| except: | |
| return False | |
| def get_available_methods(self) -> list: | |
| """Get list of available OCR methods""" | |
| methods = [] | |
| if self.azure_client: | |
| methods.append("azure") | |
| if self._check_tesseract_available(): | |
| methods.append("tesseract") | |
| methods.append("pymupdf") | |
| return methods | |