# ocr_processors.py # Procesadores OCR independientes y su gestor import cv2 import numpy as np import easyocr from typing import Dict, List from dollar_correction import DollarSignCorrectionProcessor from unified_extractors import Vendor, VendorSchemaManager try: import pytesseract from pytesseract import Output PYTESSERACT_AVAILABLE = True except ImportError: PYTESSERACT_AVAILABLE = False print("ADVERTENCIA: pytesseract no está disponible. Usando EasyOCR por defecto.") from azure_ocr_processor import AzureOCRProcessor, AZURE_AVAILABLE class OCRProcessor: """Clase base para procesadores OCR""" def __init__(self): pass def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: """Procesa la imagen y retorna bloques de texto""" raise NotImplementedError class EasyOCRProcessor(OCRProcessor): """Procesador usando EasyOCR""" def __init__(self): super().__init__() self.reader = easyocr.Reader(['en', 'fr'], gpu=False) def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: """Extrae texto usando EasyOCR""" results = self.reader.readtext( image, contrast_ths=0.05, adjust_contrast=0.7, low_text=0.3, detail=1 ) text_blocks = [] for (bbox, text, confidence) in results: if confidence > 0.3: x_coords = [point[0] for point in bbox] y_coords = [point[1] for point in bbox] text_blocks.append({ 'text': text.strip(), 'x': min(x_coords), 'y': min(y_coords), 'width': max(x_coords) - min(x_coords), 'height': max(y_coords) - min(y_coords), 'confidence': confidence * 100, 'engine': 'easyocr' }) return sorted(text_blocks, key=lambda b: (b['y'], b['x'])) class PytesseractOCRProcessor(OCRProcessor): """Procesador usando Pytesseract con soporte para tablas""" def __init__(self): super().__init__() if not PYTESSERACT_AVAILABLE: raise RuntimeError("Pytesseract no está disponible") def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: """Extrae texto usando Pytesseract""" mode = ocr_config.get("mode", "block") # Preprocesar imagen processed_image = self._preprocess_image(image, ocr_config) if mode == "table": text_blocks = self._extract_table_structure(processed_image, ocr_config) # Si se requiere reconstrucción multilinea if ocr_config.get("requires_reconstruction", False): reconstructed_text = self._reconstruct_multiline_text(text_blocks, ocr_config) if reconstructed_text: text_blocks.append({ 'text': f"TEXTO_RECONSTRUIDO:\n{reconstructed_text}", 'x': 0, 'y': 0, 'width': 100, 'height': 100, 'confidence': 100, 'engine': 'reconstructed', 'is_reconstructed': True }) else: text_blocks = self._extract_block_structure(processed_image) return text_blocks def _preprocess_image(self, image: np.ndarray, ocr_config: Dict) -> np.ndarray: """Preprocesa la imagen según configuración""" preprocessing = ocr_config.get("preprocessing", {}) # Convertir a escala de grises if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image # Aplicar denoising si está configurado if preprocessing.get("denoise", False): gray = cv2.medianBlur(gray, 3) # Aplicar enhancement si está configurado if preprocessing.get("enhance", False): clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) gray = clahe.apply(gray) # Aplicar binarización si está configurado if preprocessing.get("binarize", False): gray = cv2.adaptiveThreshold( gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 15, 8 ) # Limpieza morfológica kernel = np.ones((2,2), np.uint8) gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel) gray = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel) return gray def _extract_table_structure(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: """Extrae estructura de tabla""" custom_config = r'--oem 3 --psm 6 -c preserve_interword_spaces=1' table_data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config) text_blocks = [] n_boxes = len(table_data['text']) for i in range(n_boxes): text = table_data['text'][i].strip() confidence = int(table_data['conf'][i]) if text and confidence > 20: text_blocks.append({ 'text': text, 'x': table_data['left'][i], 'y': table_data['top'][i], 'width': table_data['width'][i], 'height': table_data['height'][i], 'confidence': confidence, 'block_num': table_data['block_num'][i], 'par_num': table_data['par_num'][i], 'line_num': table_data['line_num'][i], 'word_num': table_data['word_num'][i], 'engine': 'pytesseract' }) # Si hay muy pocos bloques, intentar con métodos alternativos if len(text_blocks) < 10: return self._extract_with_alternative_methods(image) return text_blocks def _extract_with_alternative_methods(self, image: np.ndarray) -> List[Dict]: """Intenta extraer con múltiples configuraciones""" configs = [ r'--oem 3 --psm 4', r'--oem 3 --psm 6', r'--oem 3 --psm 8', r'--oem 3 --psm 11', ] all_blocks = [] for config in configs: try: data = pytesseract.image_to_data(image, output_type=Output.DICT, config=config) for i in range(len(data['text'])): text = data['text'][i].strip() if text and int(data['conf'][i]) > 10: all_blocks.append({ 'text': text, 'x': data['left'][i], 'y': data['top'][i], 'width': data['width'][i], 'height': data['height'][i], 'confidence': int(data['conf'][i]), 'engine': 'pytesseract_alt' }) except Exception as e: print(f"ADVERTENCIA: Falló configuración {config}: {e}") # Eliminar duplicados unique_blocks = [] seen_positions = set() for block in all_blocks: position_key = (block['x'], block['y'], block['text']) if position_key not in seen_positions: seen_positions.add(position_key) unique_blocks.append(block) return sorted(unique_blocks, key=lambda b: (b['y'], b['x'])) def _extract_block_structure(self, image: np.ndarray) -> List[Dict]: """Extrae estructura de bloques""" custom_config = r'--oem 3 --psm 1' data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config) text_blocks = [] n_boxes = len(data['text']) for i in range(n_boxes): text = data['text'][i].strip() confidence = int(data['conf'][i]) if text and confidence > 30: text_blocks.append({ 'text': text, 'x': data['left'][i], 'y': data['top'][i], 'width': data['width'][i], 'height': data['height'][i], 'confidence': confidence, 'engine': 'pytesseract' }) return sorted(text_blocks, key=lambda b: (b['y'], b['x'])) def _reconstruct_multiline_text(self, text_blocks: List[Dict], ocr_config: Dict) -> str: """Reconstruye texto multilinea para proveedores que lo requieren""" # Filtrar bloques reconstruidos previos original_blocks = [block for block in text_blocks if not block.get('is_reconstructed')] if not original_blocks: return "" # Agrupar en líneas line_threshold = ocr_config.get("line_threshold", 20) lines = self._group_into_lines(original_blocks, line_threshold) # Reconstruir texto reconstructed_text = "" for line_blocks in lines: line_blocks.sort(key=lambda b: b['x']) line_text = ' '.join(block['text'].strip() for block in line_blocks) if line_text.strip(): reconstructed_text += line_text + "\n" return reconstructed_text def _group_into_lines(self, sorted_blocks: List[Dict], line_threshold: int = 20) -> List[List[Dict]]: """Agrupa bloques en líneas""" if not sorted_blocks: return [] sorted_blocks = sorted(sorted_blocks, key=lambda b: b['y']) lines = [] current_line = [sorted_blocks[0]] current_y = sorted_blocks[0]['y'] for block in sorted_blocks[1:]: y_diff = abs(block['y'] - current_y) if y_diff <= line_threshold: current_line.append(block) current_y = sum(b['y'] for b in current_line) / len(current_line) else: current_line.sort(key=lambda b: b['x']) lines.append(current_line) current_line = [block] current_y = block['y'] if current_line: current_line.sort(key=lambda b: b['x']) lines.append(current_line) return lines # Modificar la clase OCRManager: class OCRManager: """Gestiona los diferentes procesadores OCR según el proveedor""" def __init__(self): self.processors = { 'easyocr': EasyOCRProcessor(), 'pytesseract': PytesseractOCRProcessor() if PYTESSERACT_AVAILABLE else None, 'azure': None # Se inicializará bajo demanda } def _get_azure_processor(self): """Inicializa el procesador Azure bajo demanda""" if self.processors['azure'] is None and AZURE_AVAILABLE: try: self.processors['azure'] = AzureOCRProcessor() print("INFO: Procesador Azure Document Intelligence inicializado") except Exception as e: print(f"ERROR al inicializar Azure: {e}") return None return self.processors['azure'] def extract_text_with_positions(self, image: np.ndarray, vendor: Vendor, schema_manager: VendorSchemaManager) -> List[Dict]: """Extrae texto usando el procesador apropiado para el proveedor""" # Obtener configuración OCR del proveedor ocr_config = schema_manager.get_ocr_config(vendor) engine = ocr_config.get("engine", "easyocr") print(f"INFO: Usando engine '{engine}' para proveedor {vendor.value}") print(f"INFO: Configuración OCR: {ocr_config}") # Seleccionar procesador if engine == 'azure': processor = self._get_azure_processor() if processor is None: print("ADVERTENCIA: Azure no disponible, usando EasyOCR como fallback") processor = self.processors['easyocr'] ocr_config = {"engine": "easyocr", "mode": "block"} else: processor = self.processors.get(engine) if processor is None: print(f"ADVERTENCIA: Engine '{engine}' no disponible, usando EasyOCR") processor = self.processors['easyocr'] ocr_config = {"engine": "easyocr", "mode": "block"} # Procesar imagen try: text_blocks = processor.process(image, ocr_config) print(f"INFO: Extraídos {len(text_blocks)} bloques de texto con {engine}") # NO aplicar corrección $ vs 8 para Azure (ya viene procesado) if engine != 'azure': dollar_correction_config = ocr_config.get("dollar_sign_correction", {}) if dollar_correction_config.get("enabled", False): print(f"INFO: Aplicando corrección $ vs 8 para {vendor.value}") corrector = DollarSignCorrectionProcessor(dollar_correction_config) text_blocks = corrector.process(text_blocks) return text_blocks except Exception as e: print(f"ERROR en procesamiento OCR con {engine}: {e}") # Fallback a EasyOCR if engine != 'easyocr': print("INFO: Intentando con EasyOCR como fallback...") return self.processors['easyocr'].process(image, {"engine": "easyocr"}) raise