""" OCR Service Module - ENHANCED VERSION with OpenCV Text Block Analysis and Bold Detection Handles PDF to text conversion with OpenCV-based spacing analysis, bold text detection, and improved formatting """ import re import os import logging from typing import Optional, Dict, Any, Tuple, List import tempfile from pathlib import Path import cv2 import numpy as np # Load environment variables from dotenv import load_dotenv load_dotenv() # Azure Document Intelligence from azure.core.credentials import AzureKeyCredential from azure.ai.documentintelligence import DocumentIntelligenceClient from azure.core.exceptions import AzureError # Fallback OCR libraries try: import pytesseract from PIL import Image TESSERACT_AVAILABLE = True except ImportError: TESSERACT_AVAILABLE = False import fitz # PyMuPDF # Enhanced indentation detection with OpenCV from enhanced_indentation import EnhancedIndentationDetector, OpenCVTextAnalyzer # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EnhancedHTMLProcessor: """Process OCR results through HTML with OpenCV-enhanced text block analysis and bold detection""" def __init__(self): self.indent_detector = EnhancedIndentationDetector() self.opencv_analyzer = OpenCVTextAnalyzer() @staticmethod def create_html_from_azure_result(analysis_result, page_images=None) -> str: """Create structured HTML from Azure Document Intelligence result with OpenCV enhancement""" processor = EnhancedHTMLProcessor() html_parts = [''] html_parts.append('') if not analysis_result.pages: html_parts.append('

No content found

') return '\n'.join(html_parts) for page_num, page in enumerate(analysis_result.pages, 1): html_parts.append(f'
') html_parts.append(f'') # Get OpenCV analysis for this page if available opencv_analysis = None if page_images and page_num in page_images: page_text_lines = processor._extract_page_text_lines(page, analysis_result, page_num) opencv_analysis = processor.opencv_analyzer.analyze_text_blocks( page_images[page_num], page_text_lines ) # Process content with OpenCV-enhanced indentation detection and text classification content_items = processor._extract_page_content_enhanced( page, analysis_result, page_num, opencv_analysis ) content_items.sort(key=lambda x: (x['y_pos'], x['x_pos'])) # Generate HTML for each content item with OpenCV enhancement for item in content_items: if item['type'] == 'table': html_parts.append(processor._table_to_html(item['content'], item['table_idx'])) else: html_parts.append(processor._text_to_html_opencv_enhanced(item)) html_parts.append('
') html_parts.append('') return '\n'.join(html_parts) def _extract_page_text_lines(self, page, analysis_result, page_num): """Extract text lines for OpenCV correlation""" text_lines = [] if hasattr(analysis_result, 'paragraphs') and analysis_result.paragraphs: page_paragraphs = [p for p in analysis_result.paragraphs if p.bounding_regions and p.bounding_regions[0].page_number == page_num] for para in page_paragraphs: if para.content.strip(): text_lines.append(para.content.strip()) elif page.lines: for line in page.lines: if line.content.strip(): text_lines.append(line.content.strip()) return text_lines def _extract_page_content_enhanced(self, page, analysis_result, page_num, opencv_analysis=None): """Extract page content with OpenCV-enhanced text block analysis and bold detection""" content_items = [] # Handle tables (existing logic) page_tables = [] table_regions = [] if analysis_result.tables: for table_idx, table in enumerate(analysis_result.tables): if self._is_table_on_page(table, page_num): page_tables.append((table_idx, table)) if table.bounding_regions: table_regions.append({ 'polygon': table.bounding_regions[0].polygon, 'table_idx': table_idx }) # Add tables to content for table_idx, table in page_tables: if table.bounding_regions and table.bounding_regions[0].polygon: polygon = table.bounding_regions[0].polygon y_pos = min(polygon[1], polygon[3], polygon[5], polygon[7]) x_pos = min(polygon[0], polygon[2], polygon[4], polygon[6]) content_items.append({ 'type': 'table', 'content': table, 'table_idx': table_idx, 'y_pos': y_pos, 'x_pos': x_pos }) # Process text content with OpenCV-enhanced analysis if hasattr(analysis_result, 'paragraphs') and analysis_result.paragraphs: page_paragraphs = [p for p in analysis_result.paragraphs if p.bounding_regions and p.bounding_regions[0].page_number == page_num] for para in page_paragraphs: if para.content.strip(): # Check table overlap overlap_ratio = self._calculate_table_overlap(para, table_regions) if overlap_ratio < 0.7: # Not heavily overlapping with table polygon = para.bounding_regions[0].polygon y_pos = min(polygon[1], polygon[3], polygon[5], polygon[7]) if polygon else 0 x_pos = min(polygon[0], polygon[2], polygon[4], polygon[6]) if polygon else 0 # Find corresponding OpenCV analysis opencv_line_mapping = None if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: for mapping in opencv_analysis['line_mappings']: if mapping.get('text', '').strip() == para.content.strip(): opencv_line_mapping = mapping break # Enhanced indentation detection with OpenCV if opencv_line_mapping: indent_info = self.indent_detector.detect_indentation_with_opencv( para.content, opencv_analysis, opencv_line_mapping ) else: indent_info = self.indent_detector.detect_indentation(para.content) # Intelligent text classification with OpenCV context context = { 'y_position': y_pos, 'x_position': x_pos, 'font_size': getattr(para, 'font_size', None), 'is_bold': getattr(para, 'is_bold', False), 'page_number': page_num } text_classification = self.indent_detector.classify_text_type( para.content, context, opencv_analysis ) content_items.append({ 'type': 'paragraph', 'content': indent_info['content'], 'role': getattr(para, 'role', 'paragraph'), 'y_pos': y_pos, 'x_pos': x_pos, 'indent_info': indent_info, 'text_classification': text_classification, 'opencv_analysis': opencv_line_mapping, 'preserve_spacing': True }) elif page.lines: # Process lines with OpenCV-enhanced analysis processed_lines = self._process_lines_opencv_enhanced(page.lines, table_regions, opencv_analysis) content_items.extend(processed_lines) return content_items def _process_lines_opencv_enhanced(self, lines, table_regions, opencv_analysis=None): """Process lines with OpenCV-enhanced text block analysis and bold detection""" content_items = [] processed_content = set() for line in lines: if not line.content.strip(): continue content_key = line.content.strip().lower() if content_key in processed_content: continue processed_content.add(content_key) # Check table overlap overlap_ratio = self._calculate_line_table_overlap(line, table_regions) if overlap_ratio < 0.7: polygon = line.polygon y_pos = min(polygon[1], polygon[3], polygon[5], polygon[7]) if polygon else 0 x_pos = min(polygon[0], polygon[2], polygon[4], polygon[6]) if polygon else 0 # Find corresponding OpenCV analysis opencv_line_mapping = None if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: for mapping in opencv_analysis['line_mappings']: if mapping.get('text', '').strip() == line.content.strip(): opencv_line_mapping = mapping break # Enhanced indentation detection with OpenCV if opencv_line_mapping: indent_info = self.indent_detector.detect_indentation_with_opencv( line.content, opencv_analysis, opencv_line_mapping ) else: indent_info = self.indent_detector.detect_indentation(line.content) # Text classification with OpenCV context context = { 'y_position': y_pos, 'x_position': x_pos } text_classification = self.indent_detector.classify_text_type( line.content, context, opencv_analysis ) content_items.append({ 'type': 'line', 'content': indent_info['content'], 'role': 'text', 'y_pos': y_pos, 'x_pos': x_pos, 'indent_info': indent_info, 'text_classification': text_classification, 'opencv_analysis': opencv_line_mapping, 'preserve_spacing': True }) return content_items def _text_to_html_opencv_enhanced(self, item): """Convert text item to HTML with OpenCV-enhanced formatting and bold detection""" content = item['content'] role = item.get('role', 'paragraph') indent_info = item.get('indent_info', {}) text_classification = item.get('text_classification', {}) opencv_analysis = item.get('opencv_analysis', {}) preserve_spacing = item.get('preserve_spacing', False) # Build CSS classes based on indentation info, text classification, and OpenCV css_classes = ['paragraph'] # Check if OpenCV detected this as a bold header is_opencv_bold_header = False if opencv_analysis and opencv_analysis.get('is_bold') and opencv_analysis.get('is_likely_header'): is_opencv_bold_header = True css_classes.append('opencv-bold-header') # Add text classification class if text_classification.get('type'): css_classes.append(f"content-{text_classification['type']}") # Add indentation level class ONLY if not a bold header if not is_opencv_bold_header and not indent_info.get('suppress_indentation', False): level = indent_info.get('level', 0) css_classes.append(f'indent-level-{min(level, 10)}') # Add pattern-specific formatting ONLY if not a bold header if not is_opencv_bold_header: formatting_hint = indent_info.get('formatting_hint', 'normal_text') if formatting_hint != 'normal_text': css_classes.append(formatting_hint) # Add space indent class if needed and not a bold header if not is_opencv_bold_header and indent_info.get('pattern_type') == 'space_indent': css_classes.append('space-indent') # Add OpenCV analysis indicators if opencv_analysis: if opencv_analysis.get('is_bold'): css_classes.append('opencv-text-block') # Preserve internal spacing if preserve_spacing: content = re.sub(r' +', lambda m: ' ' * len(m.group()), content) content = content.replace('\n', '
') # Add pattern marker if needed (but not for bullets or bold headers) pattern_marker = indent_info.get('pattern_marker', '') if (pattern_marker and not indent_info.get('is_bullet', False) and not is_opencv_bold_header): # For numbered/lettered items, include the marker content = f"{pattern_marker} {content}" # Build final HTML with OpenCV enhancement class_str = f' class="{" ".join(css_classes)}"' # Use OpenCV and text classification to determine HTML structure if is_opencv_bold_header: return f'
{content}
' elif (text_classification.get('is_header') and text_classification.get('confidence', 0) > 0.6 and not is_opencv_bold_header): return f'
{content}
' elif role == 'title': return f'
{content}
' elif role == 'sectionHeading': return f'
{content}
' else: return f'{content}' def _table_to_html(self, table, table_idx): """Convert table to HTML with improved cell alignment and artifact removal""" if not table.cells: return f'

Table {table_idx + 1} (Empty)

' # Get table dimensions max_row = max(cell.row_index for cell in table.cells) + 1 max_col = max(cell.column_index for cell in table.cells) + 1 # Create table matrix with cell span information table_matrix = [[{"content": "", "rowspan": 1, "colspan": 1, "occupied": False} for _ in range(max_col)] for _ in range(max_row)] # Fill matrix with proper handling of spans for cell in table.cells: row_idx = cell.row_index col_idx = cell.column_index # Clean the content content = self.clean_ocr_artifacts(cell.content or "").strip() # Get span information rowspan = getattr(cell, 'row_span', 1) or 1 colspan = getattr(cell, 'column_span', 1) or 1 # Mark this cell and any cells it spans over if row_idx < max_row and col_idx < max_col: # Find the first non-occupied cell in this position while col_idx < max_col and table_matrix[row_idx][col_idx]["occupied"]: col_idx += 1 if col_idx < max_col: table_matrix[row_idx][col_idx]["content"] = content table_matrix[row_idx][col_idx]["rowspan"] = rowspan table_matrix[row_idx][col_idx]["colspan"] = colspan # Mark spanned cells as occupied for r in range(row_idx, min(row_idx + rowspan, max_row)): for c in range(col_idx, min(col_idx + colspan, max_col)): if r != row_idx or c != col_idx: table_matrix[r][c]["occupied"] = True # Generate HTML html_parts = [f'
'] html_parts.append(f'

Table {table_idx + 1}

') html_parts.append('') for row_idx, row in enumerate(table_matrix): html_parts.append('') for col_idx, cell in enumerate(row): if not cell["occupied"]: content = cell["content"] rowspan_attr = f' rowspan="{cell["rowspan"]}"' if cell["rowspan"] > 1 else '' colspan_attr = f' colspan="{cell["colspan"]}"' if cell["colspan"] > 1 else '' if row_idx == 0 and content.strip(): # Header row html_parts.append(f'{content}') else: html_parts.append(f'{content}') html_parts.append('') html_parts.append('
') return '\n'.join(html_parts) def _is_table_on_page(self, table, page_num): """Check if table belongs to the specified page""" if not table.cells: return False for cell in table.cells: if (cell.bounding_regions and cell.bounding_regions[0].page_number == page_num): return True return False def _calculate_table_overlap(self, content_item, table_regions): """Calculate overlap ratio between content and tables""" if not table_regions or not content_item.bounding_regions: return 0.0 content_polygon = content_item.bounding_regions[0].polygon if not content_polygon or len(content_polygon) < 8: return 0.0 # Content bounding box content_x1 = min(content_polygon[0], content_polygon[2], content_polygon[4], content_polygon[6]) content_x2 = max(content_polygon[0], content_polygon[2], content_polygon[4], content_polygon[6]) content_y1 = min(content_polygon[1], content_polygon[3], content_polygon[5], content_polygon[7]) content_y2 = max(content_polygon[1], content_polygon[3], content_polygon[5], content_polygon[7]) content_area = (content_x2 - content_x1) * (content_y2 - content_y1) if content_area <= 0: return 0.0 max_overlap_ratio = 0.0 for table_region in table_regions: table_polygon = table_region['polygon'] if not table_polygon or len(table_polygon) < 8: continue # Table bounding box table_x1 = min(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) table_x2 = max(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) table_y1 = min(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) table_y2 = max(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) # Calculate intersection intersect_x1 = max(content_x1, table_x1) intersect_x2 = min(content_x2, table_x2) intersect_y1 = max(content_y1, table_y1) intersect_y2 = min(content_y2, table_y2) if intersect_x2 > intersect_x1 and intersect_y2 > intersect_y1: intersect_area = (intersect_x2 - intersect_x1) * (intersect_y2 - intersect_y1) overlap_ratio = intersect_area / content_area max_overlap_ratio = max(max_overlap_ratio, overlap_ratio) return max_overlap_ratio def _calculate_line_table_overlap(self, line, table_regions): """Calculate overlap between line and tables""" if not table_regions or not line.polygon: return 0.0 line_polygon = line.polygon if len(line_polygon) < 8: return 0.0 # Line bounding box line_x1 = min(line_polygon[0], line_polygon[2], line_polygon[4], line_polygon[6]) line_x2 = max(line_polygon[0], line_polygon[2], line_polygon[4], line_polygon[6]) line_y1 = min(line_polygon[1], line_polygon[3], line_polygon[5], line_polygon[7]) line_y2 = max(line_polygon[1], line_polygon[3], line_polygon[5], line_polygon[7]) line_area = (line_x2 - line_x1) * (line_y2 - line_y1) if line_area <= 0: return 0.0 max_overlap = 0.0 for table_region in table_regions: table_polygon = table_region['polygon'] if not table_polygon or len(table_polygon) < 8: continue table_x1 = min(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) table_x2 = max(table_polygon[0], table_polygon[2], table_polygon[4], table_polygon[6]) table_y1 = min(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) table_y2 = max(table_polygon[1], table_polygon[3], table_polygon[5], table_polygon[7]) # Calculate intersection intersect_x1 = max(line_x1, table_x1) intersect_x2 = min(line_x2, table_x2) intersect_y1 = max(line_y1, table_y1) intersect_y2 = min(line_y2, table_y2) if intersect_x2 > intersect_x1 and intersect_y2 > intersect_y1: intersect_area = (intersect_x2 - intersect_x1) * (intersect_y2 - intersect_y1) overlap_ratio = intersect_area / line_area max_overlap = max(max_overlap, overlap_ratio) return max_overlap @staticmethod def clean_ocr_artifacts(text: str) -> str: """Remove OCR artifacts like checkbox markers and clean up text""" if not text: return text # Remove checkbox markers text = re.sub(r':unselected:', '', text) text = re.sub(r':selected:', '', text) # Replace with checkmark # Clean up multiple spaces text = re.sub(r'\s+', ' ', text) return text.strip() @staticmethod def html_to_formatted_text_enhanced(html_content): """Convert HTML back to formatted text with OpenCV-enhanced preservation""" from html.parser import HTMLParser class OpenCVEnhancedTextExtractor(HTMLParser): def __init__(self): super().__init__() self.text_parts = [] self.indent_detector = EnhancedIndentationDetector() self.in_title = False self.in_section_heading = False self.in_table = False self.current_table_row = [] self.table_data = [] self.current_indent_level = 0 self.current_formatting_hint = 'normal_text' self.in_page_header = False self.current_classes = [] self.in_content_header = False self.in_opencv_bold_header = False def handle_starttag(self, tag, attrs): attr_dict = dict(attrs) class_attr = attr_dict.get('class', '') self.current_classes = class_attr.split() if 'opencv-bold-header' in class_attr: self.in_opencv_bold_header = True # Bold headers get special treatment - no indentation elif 'page-header' in class_attr: self.in_page_header = True if len(self.text_parts) > 0: self.text_parts.append('\n\n' + '=' * 80 + '\n') elif 'content-header' in class_attr: self.in_content_header = True elif 'title' in class_attr: self.in_title = True elif 'section-heading' in class_attr: self.in_section_heading = True elif tag == 'table': self.in_table = True self.table_data = [] elif tag == 'tr': self.current_table_row = [] elif tag == 'br': self.text_parts.append('\n') # Extract indent level from class ONLY if not OpenCV bold header if not self.in_opencv_bold_header: for cls in self.current_classes: if cls.startswith('indent-level-'): try: self.current_indent_level = int(cls.split('-')[-1]) except ValueError: self.current_indent_level = 0 break else: self.current_indent_level = 0 else: self.current_indent_level = 0 # Force no indentation for bold headers # Extract formatting hint formatting_hints = [ 'numbered-primary', 'numbered-secondary', 'numbered-tertiary', 'numbered-quaternary', 'numbered-quinary', 'parenthetical-primary', 'parenthetical-secondary', 'parenthetical-tertiary', 'parenthetical-quaternary', 'bullet-primary', 'bullet-secondary', 'bullet-tertiary', 'bullet-quaternary', 'lettered-primary', 'lettered-secondary', 'roman-primary', 'roman-secondary', 'thai-primary', 'thai-secondary', 'indented_text', 'space-indent' ] for hint in formatting_hints: if hint in self.current_classes: self.current_formatting_hint = hint break else: self.current_formatting_hint = 'normal_text' def handle_endtag(self, tag): if tag == 'div' and self.in_opencv_bold_header: self.text_parts.append('\n\n') self.in_opencv_bold_header = False elif tag == 'div' and self.in_page_header: self.text_parts.append('\n' + '=' * 80 + '\n\n') self.in_page_header = False elif tag == 'div' and self.in_content_header: self.text_parts.append('\n\n') self.in_content_header = False elif tag == 'div' and self.in_title: self.text_parts.append('\n\n') self.in_title = False elif tag == 'div' and self.in_section_heading: self.text_parts.append('\n\n') self.in_section_heading = False elif tag == 'table': self.in_table = False self._format_table() elif tag == 'tr' and self.current_table_row: self.table_data.append(self.current_table_row[:]) elif tag == 'div' and not self.in_table: if not self.in_title and not self.in_section_heading and not self.in_page_header and not self.in_content_header and not self.in_opencv_bold_header: self.text_parts.append('\n') # Reset state if tag == 'div': self.current_indent_level = 0 self.current_formatting_hint = 'normal_text' self.current_classes = [] def handle_data(self, data): if data.strip(): # Clean OCR artifacts first data = data.replace(':unselected:', '') data = data.replace(':selected:', '') data = data.replace(' ', ' ') if self.in_page_header: page_match = re.search(r'Page (\d+)', data) if page_match: page_num = int(page_match.group(1)) page_header = f"PAGE {page_num}" self.text_parts.append(page_header.center(80)) elif self.in_opencv_bold_header: # OpenCV detected bold headers - no indentation, special formatting self.text_parts.append(f'\n## {data.strip().upper()}') elif self.in_content_header: indent_str = " " * self.current_indent_level # 4 spaces per level self.text_parts.append(f'\n{indent_str}# {data.strip()}') elif self.in_title: indent_str = " " * self.current_indent_level # 4 spaces per level self.text_parts.append(f'\n{indent_str}## {data.strip()}') elif self.in_section_heading: indent_str = " " * self.current_indent_level # 4 spaces per level self.text_parts.append(f'\n{indent_str}### {data.strip()}') elif self.in_table: self.current_table_row.append(data.strip()) else: # Apply OpenCV-enhanced indentation formatting using 4 spaces per level indent_str = " " * self.current_indent_level # 4 spaces per level # Handle different formatting hints including parenthetical using 4 spaces if 'bullet' in self.current_formatting_hint: # Use appropriate bullet symbol based on level if 'primary' in self.current_formatting_hint: bullet = '•' elif 'secondary' in self.current_formatting_hint: bullet = '◦' elif 'tertiary' in self.current_formatting_hint: bullet = '▪' elif 'quaternary' in self.current_formatting_hint: bullet = '‣' else: bullet = '•' self.text_parts.append(f'{indent_str}{bullet} {data.strip()}') elif any(pattern in self.current_formatting_hint for pattern in ['numbered', 'lettered', 'roman', 'thai', 'parenthetical']): # For numbered/lettered/parenthetical items, the marker should already be in the text self.text_parts.append(f'{indent_str}{data.strip()}') elif 'space-indent' in self.current_formatting_hint: # Simple indented text using 4 spaces self.text_parts.append(f'{indent_str}{data.strip()}') else: # Regular text with indentation using 4 spaces self.text_parts.append(f'{indent_str}{data.strip()}') def _format_table(self): """Format table with proper alignment""" if not self.table_data: return self.text_parts.append('\n\n') if self.table_data: max_cols = max(len(row) for row in self.table_data) col_widths = [0] * max_cols # Calculate column widths for row in self.table_data: for i, cell in enumerate(row): if i < max_cols: col_widths[i] = max(col_widths[i], len(str(cell))) # Ensure minimum column width col_widths = [max(width, 8) for width in col_widths] # Format rows with proper alignment for row_idx, row in enumerate(self.table_data): formatted_cells = [] for i, cell in enumerate(row): if i < max_cols: width = col_widths[i] formatted_cells.append(str(cell).ljust(width)) row_text = ' | '.join(formatted_cells) self.text_parts.append(row_text) # Add separator after header if row_idx == 0 and len(self.table_data) > 1: separator_cells = ['-' * col_widths[i] for i in range(max_cols)] separator_text = ' | '.join(separator_cells) self.text_parts.append(separator_text) self.text_parts.append('\n') self.text_parts.append('\n') extractor = OpenCVEnhancedTextExtractor() extractor.feed(html_content) result = ''.join(extractor.text_parts) # Clean up excessive newlines while preserving intentional spacing result = re.sub(r'\n{4,}', '\n\n\n', result) # Max 3 consecutive newlines # Ensure proper spacing around page headers result = re.sub(r'(={80})\n*([A-Z ]+)\n*(={80})', r'\1\n\2\n\3', result) return result.strip() def _validate_and_fix_table_structure(self, table_matrix): """Validate and fix common table structure issues""" if not table_matrix: return table_matrix max_row = len(table_matrix) max_col = len(table_matrix[0]) if table_matrix else 0 # Ensure all rows have same number of columns for row in table_matrix: while len(row) < max_col: row.append({"content": "", "rowspan": 1, "colspan": 1, "occupied": False}) # Remove completely empty rows table_matrix = [row for row in table_matrix if any(cell["content"].strip() for cell in row)] # Merge cells with identical content in adjacent columns (likely split cells) for row_idx, row in enumerate(table_matrix): col_idx = 0 while col_idx < len(row) - 1: current = row[col_idx] next_cell = row[col_idx + 1] if (current["content"] == next_cell["content"] and current["content"].strip() and not current["occupied"] and not next_cell["occupied"]): # Merge cells current["colspan"] += next_cell["colspan"] next_cell["occupied"] = True col_idx += 1 return table_matrix class OCRService: """Main OCR service with OpenCV-enhanced text analysis, spacing detection, and bold text recognition""" def __init__(self): self.azure_endpoint = os.getenv('AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT') self.azure_key = os.getenv('AZURE_DOCUMENT_INTELLIGENCE_KEY') # Initialize Azure client if credentials are available self.azure_client = None if self.azure_endpoint and self.azure_key: try: self.azure_client = DocumentIntelligenceClient( endpoint=self.azure_endpoint, credential=AzureKeyCredential(self.azure_key) ) logger.info("Azure Document Intelligence client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Azure client: {e}") else: logger.warning("Azure credentials not found. Azure OCR will be unavailable.") def convert_pdf_to_text(self, pdf_path: str, method: str = "auto") -> Dict[str, Any]: """ Convert PDF to text using specified method with OpenCV-enhanced processing Args: pdf_path: Path to the PDF file method: OCR method ('azure', 'tesseract', 'pymupdf', 'auto') Returns: Dict containing text content, HTML, metadata, and OpenCV analysis """ result = { 'success': False, 'text': '', 'html': '', 'method_used': '', 'metadata': {}, 'error': None } if not os.path.exists(pdf_path): result['error'] = f"PDF file not found: {pdf_path}" return result # Auto method selection if method == "auto": if self.azure_client: method = "azure" elif self._check_tesseract_available(): method = "tesseract" else: method = "pymupdf" # Try primary method try: if method == "azure" and self.azure_client: result = self._azure_ocr_with_opencv_enhancement(pdf_path) elif method == "tesseract": result = self._tesseract_ocr_with_opencv(pdf_path) elif method == "pymupdf": result = self._pymupdf_extract_with_opencv(pdf_path) else: result['error'] = f"Method '{method}' not available or not configured" except Exception as e: logger.error(f"Primary method '{method}' failed: {e}") result['error'] = str(e) # Fallback mechanism if not result['success']: logger.info("Primary method failed, trying fallback methods...") result = self._try_fallback_methods(pdf_path, exclude_method=method) return result def _extract_page_images_from_pdf(self, pdf_path: str) -> Dict[int, np.ndarray]: """Extract page images for OpenCV analysis""" page_images = {} pdf_document = None try: pdf_document = fitz.open(pdf_path) for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) # Render page to image for OpenCV analysis mat = fitz.Matrix(2.0, 2.0) # High resolution for better analysis pix = page.get_pixmap(matrix=mat) # Convert to numpy array img_data = pix.tobytes("png") import io from PIL import Image pil_image = Image.open(io.BytesIO(img_data)) img_array = np.array(pil_image) # Convert RGB to BGR for OpenCV if len(img_array.shape) == 3: img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) page_images[page_num + 1] = img_array except Exception as e: logger.error(f"Error extracting page images: {e}") finally: if pdf_document: pdf_document.close() return page_images def _azure_ocr_with_opencv_enhancement(self, pdf_path: str) -> Dict[str, Any]: """Azure Document Intelligence OCR with OpenCV-enhanced text analysis and bold detection""" result = { 'success': False, 'text': '', 'html': '', 'method_used': 'azure_document_intelligence_opencv_enhanced', 'metadata': {}, 'error': None } try: # Extract page images for OpenCV analysis page_images = self._extract_page_images_from_pdf(pdf_path) with open(pdf_path, 'rb') as pdf_file: file_content = pdf_file.read() # Use enhanced analysis features from azure.ai.documentintelligence.models import AnalyzeDocumentRequest try: # Try with features parameter for better table extraction poller = self.azure_client.begin_analyze_document( "prebuilt-layout", body=file_content, content_type="application/pdf", features=["keyValuePairs"], # Enable key-value pair detection output_content_format="markdown" # Better structure preservation ) except (TypeError, AttributeError): # Fallback to basic call try: poller = self.azure_client.begin_analyze_document( "prebuilt-layout", body=file_content, content_type="application/pdf" ) except TypeError: try: poller = self.azure_client.begin_analyze_document( model_id="prebuilt-layout", body=file_content ) except TypeError: pdf_file.seek(0) poller = self.azure_client.begin_analyze_document( "prebuilt-layout", document=pdf_file ) analysis_result = poller.result() # Generate HTML with OpenCV-enhanced processing html_content = EnhancedHTMLProcessor.create_html_from_azure_result( analysis_result, page_images ) # Convert HTML to formatted text with OpenCV enhancement formatted_text = EnhancedHTMLProcessor.html_to_formatted_text_enhanced(html_content) # Analyze document structure with OpenCV enhancement detector = EnhancedIndentationDetector() text_lines = formatted_text.split('\n') # Perform OpenCV analysis on first page for overall document analysis opencv_document_analysis = None if page_images: first_page_image = list(page_images.values())[0] opencv_document_analysis = detector.opencv_analyzer.analyze_text_blocks( first_page_image, text_lines ) document_analysis = detector.analyze_document_structure_with_opencv( text_lines, None # We already have the OpenCV analysis ) if opencv_document_analysis: document_analysis['opencv_global_analysis'] = opencv_document_analysis result.update({ 'success': True, 'text': formatted_text, 'html': html_content, 'metadata': { 'pages': len(analysis_result.pages) if analysis_result.pages else 0, 'tables': len(analysis_result.tables) if analysis_result.tables else 0, 'paragraphs': len(analysis_result.paragraphs) if hasattr(analysis_result, 'paragraphs') and analysis_result.paragraphs else 0, 'has_handwritten': any(style.is_handwritten for style in analysis_result.styles) if analysis_result.styles else False, 'html_generated': True, 'opencv_enhanced': True, 'opencv_bold_detection': True, 'opencv_spacing_analysis': True, 'enhanced_indentation': True, 'intelligent_text_classification': True, 'parenthetical_patterns_supported': True, 'page_numbers_added': True, 'comprehensive_formatting': True, 'azure_analysis': analysis_result, 'document_structure_analysis': document_analysis, 'page_images_processed': len(page_images) } }) logger.info("Azure OCR with OpenCV enhancement completed successfully") logger.info(f"OpenCV analysis: {len(page_images)} pages processed with text block and bold detection") except Exception as e: logger.error(f"Azure OCR with OpenCV error: {e}") result['error'] = f"Azure OCR with OpenCV error: {e}" return result def _tesseract_ocr_with_opencv(self, pdf_path: str) -> Dict[str, Any]: """Tesseract OCR with OpenCV-enhanced text analysis and bold detection""" result = { 'success': False, 'text': '', 'html': '', 'method_used': 'tesseract_opencv_enhanced', 'metadata': {}, 'error': None } if not TESSERACT_AVAILABLE: result['error'] = "Tesseract not available" return result pdf_document = None try: pdf_document = fitz.open(pdf_path) page_count = len(pdf_document) all_text = [] html_parts = ['') indent_detector = EnhancedIndentationDetector() opencv_analyzer = OpenCVTextAnalyzer() for page_num in range(page_count): # Add page header to text page_header = f"\n{'=' * 80}\n{'PAGE ' + str(page_num + 1).center(74)}\n{'=' * 80}\n\n" all_text.append(page_header) page = pdf_document.load_page(page_num) # Render page to image mat = fitz.Matrix(2.0, 2.0) pix = page.get_pixmap(matrix=mat) img_data = pix.tobytes("png") temp_img_path = None opencv_analysis = None try: with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_img: temp_img.write(img_data) temp_img_path = temp_img.name # Convert to OpenCV format for analysis img_cv = cv2.imread(temp_img_path) processed_img = self._preprocess_image(temp_img_path) custom_config = r'--oem 3 --psm 6 -c preserve_interword_spaces=1' text = pytesseract.image_to_string(processed_img, config=custom_config, lang='eng') all_text.append(text) # Perform OpenCV analysis text_lines = text.split('\n') opencv_analysis = opencv_analyzer.analyze_text_blocks(img_cv, text_lines) # Add to HTML with OpenCV-enhanced processing html_parts.append(f'
') html_parts.append(f'') # Process each line with OpenCV enhancement lines = text.split('\n') for line in lines: if line.strip(): # Find OpenCV mapping for this line opencv_line_mapping = None if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: for mapping in opencv_analysis['line_mappings']: if mapping.get('text', '').strip() == line.strip(): opencv_line_mapping = mapping break # Enhanced indentation detection with OpenCV if opencv_line_mapping: indent_info = indent_detector.detect_indentation_with_opencv( line, opencv_analysis, opencv_line_mapping ) else: indent_info = indent_detector.detect_indentation(line) text_classification = indent_detector.classify_text_type( line, opencv_analysis=opencv_analysis ) # Build CSS classes css_classes = [] # Check if OpenCV detected bold header is_opencv_bold_header = (opencv_line_mapping and opencv_line_mapping.get('is_bold') and opencv_line_mapping.get('is_likely_header')) if is_opencv_bold_header: css_classes.append('opencv-bold-header') else: level = indent_info.get('level', 0) css_classes.append(f'indent-level-{min(level, 10)}') formatting_hint = indent_info.get('formatting_hint', 'normal_text') if formatting_hint != 'normal_text': css_classes.append(formatting_hint) # Add text classification class if text_classification.get('type'): css_classes.append(f"content-{text_classification['type']}") class_str = f' class="paragraph {" ".join(css_classes)}"' content = indent_info.get('content', line.strip()) # Add marker for non-bullet items (unless bold header) if not is_opencv_bold_header: marker = indent_info.get('pattern_marker', '') if marker and not indent_info.get('is_bullet', False): content = f"{marker} {content}" html_parts.append(f'{content}
') else: html_parts.append('

') html_parts.append('') finally: if temp_img_path and os.path.exists(temp_img_path): try: os.unlink(temp_img_path) except: pass html_parts.append('') # Convert HTML back to formatted text html_content = '\n'.join(html_parts) formatted_text = EnhancedHTMLProcessor.html_to_formatted_text_enhanced(html_content) result.update({ 'success': True, 'text': formatted_text, 'html': html_content, 'metadata': { 'pages': page_count, 'html_generated': True, 'opencv_enhanced': True, 'opencv_bold_detection': True, 'opencv_spacing_analysis': True, 'enhanced_indentation': True, 'intelligent_text_classification': True, 'parenthetical_patterns_supported': True, 'page_numbers_added': True, 'comprehensive_formatting': True } }) logger.info("Tesseract OCR with OpenCV enhancement completed successfully") except Exception as e: logger.error(f"Tesseract OCR with OpenCV error: {e}") result['error'] = f"Tesseract OCR with OpenCV error: {e}" finally: if pdf_document is not None: try: pdf_document.close() except: pass return result def _pymupdf_extract_with_opencv(self, pdf_path: str) -> Dict[str, Any]: """PyMuPDF text extraction with OpenCV-enhanced analysis and bold detection""" result = { 'success': False, 'text': '', 'html': '', 'method_used': 'pymupdf_opencv_enhanced', 'metadata': {}, 'error': None } pdf_document = None try: pdf_document = fitz.open(pdf_path) page_count = len(pdf_document) all_text = [] html_parts = ['') indent_detector = EnhancedIndentationDetector() opencv_analyzer = OpenCVTextAnalyzer() for page_num in range(page_count): # Add page header to text page_header = f"\n{'=' * 80}\n{'PAGE ' + str(page_num + 1).center(74)}\n{'=' * 80}\n\n" all_text.append(page_header) page = pdf_document.load_page(page_num) text = page.get_text() all_text.append(text) # Get page image for OpenCV analysis mat = fitz.Matrix(2.0, 2.0) pix = page.get_pixmap(matrix=mat) img_data = pix.tobytes("png") # Convert to OpenCV format import io from PIL import Image pil_image = Image.open(io.BytesIO(img_data)) img_array = np.array(pil_image) img_cv = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) # Perform OpenCV analysis text_lines = text.split('\n') opencv_analysis = opencv_analyzer.analyze_text_blocks(img_cv, text_lines) # Add to HTML with OpenCV-enhanced processing html_parts.append(f'
') html_parts.append(f'') # Process each line with OpenCV enhancement lines = text.split('\n') for line in lines: if line.strip(): # Find OpenCV mapping for this line opencv_line_mapping = None if opencv_analysis and opencv_analysis.get('success') and 'line_mappings' in opencv_analysis: for mapping in opencv_analysis['line_mappings']: if mapping.get('text', '').strip() == line.strip(): opencv_line_mapping = mapping break # Enhanced indentation detection with OpenCV if opencv_line_mapping: indent_info = indent_detector.detect_indentation_with_opencv( line, opencv_analysis, opencv_line_mapping ) else: indent_info = indent_detector.detect_indentation(line) text_classification = indent_detector.classify_text_type( line, opencv_analysis=opencv_analysis ) # Build CSS classes css_classes = [] # Check if OpenCV detected bold header is_opencv_bold_header = (opencv_line_mapping and opencv_line_mapping.get('is_bold') and opencv_line_mapping.get('is_likely_header')) if is_opencv_bold_header: css_classes.append('opencv-bold-header') else: level = indent_info.get('level', 0) css_classes.append(f'indent-level-{min(level, 10)}') formatting_hint = indent_info.get('formatting_hint', 'normal_text') if formatting_hint != 'normal_text': css_classes.append(formatting_hint) # Add text classification class if text_classification.get('type'): css_classes.append(f"content-{text_classification['type']}") class_str = f' class="paragraph {" ".join(css_classes)}"' content = indent_info.get('content', line.strip()) # Add marker for non-bullet items (unless bold header) if not is_opencv_bold_header: marker = indent_info.get('pattern_marker', '') if marker and not indent_info.get('is_bullet', False): content = f"{marker} {content}" html_parts.append(f'{content}
') else: html_parts.append('

') html_parts.append('') html_parts.append('') # Convert HTML back to formatted text html_content = '\n'.join(html_parts) formatted_text = EnhancedHTMLProcessor.html_to_formatted_text_enhanced(html_content) result.update({ 'success': True, 'text': formatted_text, 'html': html_content, 'metadata': { 'pages': page_count, 'html_generated': True, 'opencv_enhanced': True, 'opencv_bold_detection': True, 'opencv_spacing_analysis': True, 'enhanced_indentation': True, 'intelligent_text_classification': True, 'parenthetical_patterns_supported': True, 'page_numbers_added': True, 'comprehensive_formatting': True } }) logger.info("PyMuPDF extraction with OpenCV enhancement completed successfully") except Exception as e: logger.error(f"PyMuPDF with OpenCV error: {e}") result['error'] = f"PyMuPDF with OpenCV error: {e}" finally: if pdf_document is not None: try: pdf_document.close() except: pass return result def _preprocess_image(self, image_path: str) -> np.ndarray: """Preprocess image for better OCR accuracy""" img = cv2.imread(image_path) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) denoised = cv2.medianBlur(gray, 3) _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) return binary def _try_fallback_methods(self, pdf_path: str, exclude_method: str = None) -> Dict[str, Any]: """Try fallback OCR methods""" fallback_methods = [] if exclude_method != "azure" and self.azure_client: fallback_methods.append("azure") if exclude_method != "tesseract" and self._check_tesseract_available(): fallback_methods.append("tesseract") if exclude_method != "pymupdf": fallback_methods.append("pymupdf") for method in fallback_methods: logger.info(f"Trying fallback method: {method}") try: if method == "azure": result = self._azure_ocr_with_opencv_enhancement(pdf_path) elif method == "tesseract": result = self._tesseract_ocr_with_opencv(pdf_path) elif method == "pymupdf": result = self._pymupdf_extract_with_opencv(pdf_path) if result['success']: result['method_used'] += '_fallback' return result except Exception as e: logger.error(f"Fallback method {method} failed: {e}") continue return { 'success': False, 'text': '', 'html': '', 'method_used': 'all_methods_failed', 'metadata': {}, 'error': 'All OCR methods failed' } def _check_tesseract_available(self) -> bool: """Check if Tesseract is available""" if not TESSERACT_AVAILABLE: return False try: pytesseract.get_tesseract_version() return True except: return False def get_available_methods(self) -> list: """Get list of available OCR methods""" methods = [] if self.azure_client: methods.append("azure") if self._check_tesseract_available(): methods.append("tesseract") methods.append("pymupdf") return methods