Spaces:
Running
Running
| # ocr_processors.py | |
| # Procesadores OCR independientes y su gestor | |
| import cv2 | |
| import numpy as np | |
| import easyocr | |
| from typing import Dict, List | |
| from dollar_correction import DollarSignCorrectionProcessor | |
| from unified_extractors import Vendor, VendorSchemaManager | |
| try: | |
| import pytesseract | |
| from pytesseract import Output | |
| PYTESSERACT_AVAILABLE = True | |
| except ImportError: | |
| PYTESSERACT_AVAILABLE = False | |
| print("ADVERTENCIA: pytesseract no est谩 disponible. Usando EasyOCR por defecto.") | |
| from azure_ocr_processor import AzureOCRProcessor, AZURE_AVAILABLE | |
| class OCRProcessor: | |
| """Clase base para procesadores OCR""" | |
| def __init__(self): | |
| pass | |
| def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: | |
| """Procesa la imagen y retorna bloques de texto""" | |
| raise NotImplementedError | |
| class EasyOCRProcessor(OCRProcessor): | |
| """Procesador usando EasyOCR""" | |
| def __init__(self): | |
| super().__init__() | |
| self.reader = easyocr.Reader(['en', 'fr'], gpu=False) | |
| def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: | |
| """Extrae texto usando EasyOCR""" | |
| results = self.reader.readtext( | |
| image, | |
| contrast_ths=0.05, | |
| adjust_contrast=0.7, | |
| low_text=0.3, | |
| detail=1 | |
| ) | |
| text_blocks = [] | |
| for (bbox, text, confidence) in results: | |
| if confidence > 0.3: | |
| x_coords = [point[0] for point in bbox] | |
| y_coords = [point[1] for point in bbox] | |
| text_blocks.append({ | |
| 'text': text.strip(), | |
| 'x': min(x_coords), | |
| 'y': min(y_coords), | |
| 'width': max(x_coords) - min(x_coords), | |
| 'height': max(y_coords) - min(y_coords), | |
| 'confidence': confidence * 100, | |
| 'engine': 'easyocr' | |
| }) | |
| return sorted(text_blocks, key=lambda b: (b['y'], b['x'])) | |
| class PytesseractOCRProcessor(OCRProcessor): | |
| """Procesador usando Pytesseract con soporte para tablas""" | |
| def __init__(self): | |
| super().__init__() | |
| if not PYTESSERACT_AVAILABLE: | |
| raise RuntimeError("Pytesseract no est谩 disponible") | |
| def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: | |
| """Extrae texto usando Pytesseract""" | |
| mode = ocr_config.get("mode", "block") | |
| # Preprocesar imagen | |
| processed_image = self._preprocess_image(image, ocr_config) | |
| if mode == "table": | |
| text_blocks = self._extract_table_structure(processed_image, ocr_config) | |
| # Si se requiere reconstrucci贸n multilinea | |
| if ocr_config.get("requires_reconstruction", False): | |
| reconstructed_text = self._reconstruct_multiline_text(text_blocks, ocr_config) | |
| if reconstructed_text: | |
| text_blocks.append({ | |
| 'text': f"TEXTO_RECONSTRUIDO:\n{reconstructed_text}", | |
| 'x': 0, | |
| 'y': 0, | |
| 'width': 100, | |
| 'height': 100, | |
| 'confidence': 100, | |
| 'engine': 'reconstructed', | |
| 'is_reconstructed': True | |
| }) | |
| else: | |
| text_blocks = self._extract_block_structure(processed_image) | |
| return text_blocks | |
| def _preprocess_image(self, image: np.ndarray, ocr_config: Dict) -> np.ndarray: | |
| """Preprocesa la imagen seg煤n configuraci贸n""" | |
| preprocessing = ocr_config.get("preprocessing", {}) | |
| # Convertir a escala de grises | |
| if len(image.shape) == 3: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| else: | |
| gray = image | |
| # Aplicar denoising si est谩 configurado | |
| if preprocessing.get("denoise", False): | |
| gray = cv2.medianBlur(gray, 3) | |
| # Aplicar enhancement si est谩 configurado | |
| if preprocessing.get("enhance", False): | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) | |
| gray = clahe.apply(gray) | |
| # Aplicar binarizaci贸n si est谩 configurado | |
| if preprocessing.get("binarize", False): | |
| gray = cv2.adaptiveThreshold( | |
| gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 15, 8 | |
| ) | |
| # Limpieza morfol贸gica | |
| kernel = np.ones((2,2), np.uint8) | |
| gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel) | |
| gray = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel) | |
| return gray | |
| def _extract_table_structure(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: | |
| """Extrae estructura de tabla""" | |
| custom_config = r'--oem 3 --psm 6 -c preserve_interword_spaces=1' | |
| table_data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config) | |
| text_blocks = [] | |
| n_boxes = len(table_data['text']) | |
| for i in range(n_boxes): | |
| text = table_data['text'][i].strip() | |
| confidence = int(table_data['conf'][i]) | |
| if text and confidence > 20: | |
| text_blocks.append({ | |
| 'text': text, | |
| 'x': table_data['left'][i], | |
| 'y': table_data['top'][i], | |
| 'width': table_data['width'][i], | |
| 'height': table_data['height'][i], | |
| 'confidence': confidence, | |
| 'block_num': table_data['block_num'][i], | |
| 'par_num': table_data['par_num'][i], | |
| 'line_num': table_data['line_num'][i], | |
| 'word_num': table_data['word_num'][i], | |
| 'engine': 'pytesseract' | |
| }) | |
| # Si hay muy pocos bloques, intentar con m茅todos alternativos | |
| if len(text_blocks) < 10: | |
| return self._extract_with_alternative_methods(image) | |
| return text_blocks | |
| def _extract_with_alternative_methods(self, image: np.ndarray) -> List[Dict]: | |
| """Intenta extraer con m煤ltiples configuraciones""" | |
| configs = [ | |
| r'--oem 3 --psm 4', | |
| r'--oem 3 --psm 6', | |
| r'--oem 3 --psm 8', | |
| r'--oem 3 --psm 11', | |
| ] | |
| all_blocks = [] | |
| for config in configs: | |
| try: | |
| data = pytesseract.image_to_data(image, output_type=Output.DICT, config=config) | |
| for i in range(len(data['text'])): | |
| text = data['text'][i].strip() | |
| if text and int(data['conf'][i]) > 10: | |
| all_blocks.append({ | |
| 'text': text, | |
| 'x': data['left'][i], | |
| 'y': data['top'][i], | |
| 'width': data['width'][i], | |
| 'height': data['height'][i], | |
| 'confidence': int(data['conf'][i]), | |
| 'engine': 'pytesseract_alt' | |
| }) | |
| except Exception as e: | |
| print(f"ADVERTENCIA: Fall贸 configuraci贸n {config}: {e}") | |
| # Eliminar duplicados | |
| unique_blocks = [] | |
| seen_positions = set() | |
| for block in all_blocks: | |
| position_key = (block['x'], block['y'], block['text']) | |
| if position_key not in seen_positions: | |
| seen_positions.add(position_key) | |
| unique_blocks.append(block) | |
| return sorted(unique_blocks, key=lambda b: (b['y'], b['x'])) | |
| def _extract_block_structure(self, image: np.ndarray) -> List[Dict]: | |
| """Extrae estructura de bloques""" | |
| custom_config = r'--oem 3 --psm 1' | |
| data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config) | |
| text_blocks = [] | |
| n_boxes = len(data['text']) | |
| for i in range(n_boxes): | |
| text = data['text'][i].strip() | |
| confidence = int(data['conf'][i]) | |
| if text and confidence > 30: | |
| text_blocks.append({ | |
| 'text': text, | |
| 'x': data['left'][i], | |
| 'y': data['top'][i], | |
| 'width': data['width'][i], | |
| 'height': data['height'][i], | |
| 'confidence': confidence, | |
| 'engine': 'pytesseract' | |
| }) | |
| return sorted(text_blocks, key=lambda b: (b['y'], b['x'])) | |
| def _reconstruct_multiline_text(self, text_blocks: List[Dict], ocr_config: Dict) -> str: | |
| """Reconstruye texto multilinea para proveedores que lo requieren""" | |
| # Filtrar bloques reconstruidos previos | |
| original_blocks = [block for block in text_blocks if not block.get('is_reconstructed')] | |
| if not original_blocks: | |
| return "" | |
| # Agrupar en l铆neas | |
| line_threshold = ocr_config.get("line_threshold", 20) | |
| lines = self._group_into_lines(original_blocks, line_threshold) | |
| # Reconstruir texto | |
| reconstructed_text = "" | |
| for line_blocks in lines: | |
| line_blocks.sort(key=lambda b: b['x']) | |
| line_text = ' '.join(block['text'].strip() for block in line_blocks) | |
| if line_text.strip(): | |
| reconstructed_text += line_text + "\n" | |
| return reconstructed_text | |
| def _group_into_lines(self, sorted_blocks: List[Dict], line_threshold: int = 20) -> List[List[Dict]]: | |
| """Agrupa bloques en l铆neas""" | |
| if not sorted_blocks: | |
| return [] | |
| sorted_blocks = sorted(sorted_blocks, key=lambda b: b['y']) | |
| lines = [] | |
| current_line = [sorted_blocks[0]] | |
| current_y = sorted_blocks[0]['y'] | |
| for block in sorted_blocks[1:]: | |
| y_diff = abs(block['y'] - current_y) | |
| if y_diff <= line_threshold: | |
| current_line.append(block) | |
| current_y = sum(b['y'] for b in current_line) / len(current_line) | |
| else: | |
| current_line.sort(key=lambda b: b['x']) | |
| lines.append(current_line) | |
| current_line = [block] | |
| current_y = block['y'] | |
| if current_line: | |
| current_line.sort(key=lambda b: b['x']) | |
| lines.append(current_line) | |
| return lines | |
| # Modificar la clase OCRManager: | |
| class OCRManager: | |
| """Gestiona los diferentes procesadores OCR seg煤n el proveedor""" | |
| def __init__(self): | |
| self.processors = { | |
| 'easyocr': EasyOCRProcessor(), | |
| 'pytesseract': PytesseractOCRProcessor() if PYTESSERACT_AVAILABLE else None, | |
| 'azure': None # Se inicializar谩 bajo demanda | |
| } | |
| def _get_azure_processor(self): | |
| """Inicializa el procesador Azure bajo demanda""" | |
| if self.processors['azure'] is None and AZURE_AVAILABLE: | |
| try: | |
| self.processors['azure'] = AzureOCRProcessor() | |
| print("INFO: Procesador Azure Document Intelligence inicializado") | |
| except Exception as e: | |
| print(f"ERROR al inicializar Azure: {e}") | |
| return None | |
| return self.processors['azure'] | |
| def extract_text_with_positions(self, image: np.ndarray, vendor: Vendor, schema_manager: VendorSchemaManager) -> List[Dict]: | |
| """Extrae texto usando el procesador apropiado para el proveedor""" | |
| # Obtener configuraci贸n OCR del proveedor | |
| ocr_config = schema_manager.get_ocr_config(vendor) | |
| engine = ocr_config.get("engine", "easyocr") | |
| print(f"INFO: Usando engine '{engine}' para proveedor {vendor.value}") | |
| print(f"INFO: Configuraci贸n OCR: {ocr_config}") | |
| # Seleccionar procesador | |
| if engine == 'azure': | |
| processor = self._get_azure_processor() | |
| if processor is None: | |
| print("ADVERTENCIA: Azure no disponible, usando EasyOCR como fallback") | |
| processor = self.processors['easyocr'] | |
| ocr_config = {"engine": "easyocr", "mode": "block"} | |
| else: | |
| processor = self.processors.get(engine) | |
| if processor is None: | |
| print(f"ADVERTENCIA: Engine '{engine}' no disponible, usando EasyOCR") | |
| processor = self.processors['easyocr'] | |
| ocr_config = {"engine": "easyocr", "mode": "block"} | |
| # Procesar imagen | |
| try: | |
| text_blocks = processor.process(image, ocr_config) | |
| print(f"INFO: Extra铆dos {len(text_blocks)} bloques de texto con {engine}") | |
| # NO aplicar correcci贸n $ vs 8 para Azure (ya viene procesado) | |
| if engine != 'azure': | |
| dollar_correction_config = ocr_config.get("dollar_sign_correction", {}) | |
| if dollar_correction_config.get("enabled", False): | |
| print(f"INFO: Aplicando correcci贸n $ vs 8 para {vendor.value}") | |
| corrector = DollarSignCorrectionProcessor(dollar_correction_config) | |
| text_blocks = corrector.process(text_blocks) | |
| return text_blocks | |
| except Exception as e: | |
| print(f"ERROR en procesamiento OCR con {engine}: {e}") | |
| # Fallback a EasyOCR | |
| if engine != 'easyocr': | |
| print("INFO: Intentando con EasyOCR como fallback...") | |
| return self.processors['easyocr'].process(image, {"engine": "easyocr"}) | |
| raise |