ocr_service / ocr_processors.py
Sebastian Gonzalez
Deploy OCR Service via Script
0a6b0fb
# ocr_processors.py
# Procesadores OCR independientes y su gestor
import cv2
import numpy as np
import easyocr
from typing import Dict, List
from dollar_correction import DollarSignCorrectionProcessor
from unified_extractors import Vendor, VendorSchemaManager
try:
import pytesseract
from pytesseract import Output
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
print("ADVERTENCIA: pytesseract no est谩 disponible. Usando EasyOCR por defecto.")
from azure_ocr_processor import AzureOCRProcessor, AZURE_AVAILABLE
class OCRProcessor:
"""Clase base para procesadores OCR"""
def __init__(self):
pass
def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Procesa la imagen y retorna bloques de texto"""
raise NotImplementedError
class EasyOCRProcessor(OCRProcessor):
"""Procesador usando EasyOCR"""
def __init__(self):
super().__init__()
self.reader = easyocr.Reader(['en', 'fr'], gpu=False)
def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Extrae texto usando EasyOCR"""
results = self.reader.readtext(
image,
contrast_ths=0.05,
adjust_contrast=0.7,
low_text=0.3,
detail=1
)
text_blocks = []
for (bbox, text, confidence) in results:
if confidence > 0.3:
x_coords = [point[0] for point in bbox]
y_coords = [point[1] for point in bbox]
text_blocks.append({
'text': text.strip(),
'x': min(x_coords),
'y': min(y_coords),
'width': max(x_coords) - min(x_coords),
'height': max(y_coords) - min(y_coords),
'confidence': confidence * 100,
'engine': 'easyocr'
})
return sorted(text_blocks, key=lambda b: (b['y'], b['x']))
class PytesseractOCRProcessor(OCRProcessor):
"""Procesador usando Pytesseract con soporte para tablas"""
def __init__(self):
super().__init__()
if not PYTESSERACT_AVAILABLE:
raise RuntimeError("Pytesseract no est谩 disponible")
def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Extrae texto usando Pytesseract"""
mode = ocr_config.get("mode", "block")
# Preprocesar imagen
processed_image = self._preprocess_image(image, ocr_config)
if mode == "table":
text_blocks = self._extract_table_structure(processed_image, ocr_config)
# Si se requiere reconstrucci贸n multilinea
if ocr_config.get("requires_reconstruction", False):
reconstructed_text = self._reconstruct_multiline_text(text_blocks, ocr_config)
if reconstructed_text:
text_blocks.append({
'text': f"TEXTO_RECONSTRUIDO:\n{reconstructed_text}",
'x': 0,
'y': 0,
'width': 100,
'height': 100,
'confidence': 100,
'engine': 'reconstructed',
'is_reconstructed': True
})
else:
text_blocks = self._extract_block_structure(processed_image)
return text_blocks
def _preprocess_image(self, image: np.ndarray, ocr_config: Dict) -> np.ndarray:
"""Preprocesa la imagen seg煤n configuraci贸n"""
preprocessing = ocr_config.get("preprocessing", {})
# Convertir a escala de grises
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
# Aplicar denoising si est谩 configurado
if preprocessing.get("denoise", False):
gray = cv2.medianBlur(gray, 3)
# Aplicar enhancement si est谩 configurado
if preprocessing.get("enhance", False):
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
gray = clahe.apply(gray)
# Aplicar binarizaci贸n si est谩 configurado
if preprocessing.get("binarize", False):
gray = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 15, 8
)
# Limpieza morfol贸gica
kernel = np.ones((2,2), np.uint8)
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
gray = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel)
return gray
def _extract_table_structure(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Extrae estructura de tabla"""
custom_config = r'--oem 3 --psm 6 -c preserve_interword_spaces=1'
table_data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config)
text_blocks = []
n_boxes = len(table_data['text'])
for i in range(n_boxes):
text = table_data['text'][i].strip()
confidence = int(table_data['conf'][i])
if text and confidence > 20:
text_blocks.append({
'text': text,
'x': table_data['left'][i],
'y': table_data['top'][i],
'width': table_data['width'][i],
'height': table_data['height'][i],
'confidence': confidence,
'block_num': table_data['block_num'][i],
'par_num': table_data['par_num'][i],
'line_num': table_data['line_num'][i],
'word_num': table_data['word_num'][i],
'engine': 'pytesseract'
})
# Si hay muy pocos bloques, intentar con m茅todos alternativos
if len(text_blocks) < 10:
return self._extract_with_alternative_methods(image)
return text_blocks
def _extract_with_alternative_methods(self, image: np.ndarray) -> List[Dict]:
"""Intenta extraer con m煤ltiples configuraciones"""
configs = [
r'--oem 3 --psm 4',
r'--oem 3 --psm 6',
r'--oem 3 --psm 8',
r'--oem 3 --psm 11',
]
all_blocks = []
for config in configs:
try:
data = pytesseract.image_to_data(image, output_type=Output.DICT, config=config)
for i in range(len(data['text'])):
text = data['text'][i].strip()
if text and int(data['conf'][i]) > 10:
all_blocks.append({
'text': text,
'x': data['left'][i],
'y': data['top'][i],
'width': data['width'][i],
'height': data['height'][i],
'confidence': int(data['conf'][i]),
'engine': 'pytesseract_alt'
})
except Exception as e:
print(f"ADVERTENCIA: Fall贸 configuraci贸n {config}: {e}")
# Eliminar duplicados
unique_blocks = []
seen_positions = set()
for block in all_blocks:
position_key = (block['x'], block['y'], block['text'])
if position_key not in seen_positions:
seen_positions.add(position_key)
unique_blocks.append(block)
return sorted(unique_blocks, key=lambda b: (b['y'], b['x']))
def _extract_block_structure(self, image: np.ndarray) -> List[Dict]:
"""Extrae estructura de bloques"""
custom_config = r'--oem 3 --psm 1'
data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config)
text_blocks = []
n_boxes = len(data['text'])
for i in range(n_boxes):
text = data['text'][i].strip()
confidence = int(data['conf'][i])
if text and confidence > 30:
text_blocks.append({
'text': text,
'x': data['left'][i],
'y': data['top'][i],
'width': data['width'][i],
'height': data['height'][i],
'confidence': confidence,
'engine': 'pytesseract'
})
return sorted(text_blocks, key=lambda b: (b['y'], b['x']))
def _reconstruct_multiline_text(self, text_blocks: List[Dict], ocr_config: Dict) -> str:
"""Reconstruye texto multilinea para proveedores que lo requieren"""
# Filtrar bloques reconstruidos previos
original_blocks = [block for block in text_blocks if not block.get('is_reconstructed')]
if not original_blocks:
return ""
# Agrupar en l铆neas
line_threshold = ocr_config.get("line_threshold", 20)
lines = self._group_into_lines(original_blocks, line_threshold)
# Reconstruir texto
reconstructed_text = ""
for line_blocks in lines:
line_blocks.sort(key=lambda b: b['x'])
line_text = ' '.join(block['text'].strip() for block in line_blocks)
if line_text.strip():
reconstructed_text += line_text + "\n"
return reconstructed_text
def _group_into_lines(self, sorted_blocks: List[Dict], line_threshold: int = 20) -> List[List[Dict]]:
"""Agrupa bloques en l铆neas"""
if not sorted_blocks:
return []
sorted_blocks = sorted(sorted_blocks, key=lambda b: b['y'])
lines = []
current_line = [sorted_blocks[0]]
current_y = sorted_blocks[0]['y']
for block in sorted_blocks[1:]:
y_diff = abs(block['y'] - current_y)
if y_diff <= line_threshold:
current_line.append(block)
current_y = sum(b['y'] for b in current_line) / len(current_line)
else:
current_line.sort(key=lambda b: b['x'])
lines.append(current_line)
current_line = [block]
current_y = block['y']
if current_line:
current_line.sort(key=lambda b: b['x'])
lines.append(current_line)
return lines
# Modificar la clase OCRManager:
class OCRManager:
"""Gestiona los diferentes procesadores OCR seg煤n el proveedor"""
def __init__(self):
self.processors = {
'easyocr': EasyOCRProcessor(),
'pytesseract': PytesseractOCRProcessor() if PYTESSERACT_AVAILABLE else None,
'azure': None # Se inicializar谩 bajo demanda
}
def _get_azure_processor(self):
"""Inicializa el procesador Azure bajo demanda"""
if self.processors['azure'] is None and AZURE_AVAILABLE:
try:
self.processors['azure'] = AzureOCRProcessor()
print("INFO: Procesador Azure Document Intelligence inicializado")
except Exception as e:
print(f"ERROR al inicializar Azure: {e}")
return None
return self.processors['azure']
def extract_text_with_positions(self, image: np.ndarray, vendor: Vendor, schema_manager: VendorSchemaManager) -> List[Dict]:
"""Extrae texto usando el procesador apropiado para el proveedor"""
# Obtener configuraci贸n OCR del proveedor
ocr_config = schema_manager.get_ocr_config(vendor)
engine = ocr_config.get("engine", "easyocr")
print(f"INFO: Usando engine '{engine}' para proveedor {vendor.value}")
print(f"INFO: Configuraci贸n OCR: {ocr_config}")
# Seleccionar procesador
if engine == 'azure':
processor = self._get_azure_processor()
if processor is None:
print("ADVERTENCIA: Azure no disponible, usando EasyOCR como fallback")
processor = self.processors['easyocr']
ocr_config = {"engine": "easyocr", "mode": "block"}
else:
processor = self.processors.get(engine)
if processor is None:
print(f"ADVERTENCIA: Engine '{engine}' no disponible, usando EasyOCR")
processor = self.processors['easyocr']
ocr_config = {"engine": "easyocr", "mode": "block"}
# Procesar imagen
try:
text_blocks = processor.process(image, ocr_config)
print(f"INFO: Extra铆dos {len(text_blocks)} bloques de texto con {engine}")
# NO aplicar correcci贸n $ vs 8 para Azure (ya viene procesado)
if engine != 'azure':
dollar_correction_config = ocr_config.get("dollar_sign_correction", {})
if dollar_correction_config.get("enabled", False):
print(f"INFO: Aplicando correcci贸n $ vs 8 para {vendor.value}")
corrector = DollarSignCorrectionProcessor(dollar_correction_config)
text_blocks = corrector.process(text_blocks)
return text_blocks
except Exception as e:
print(f"ERROR en procesamiento OCR con {engine}: {e}")
# Fallback a EasyOCR
if engine != 'easyocr':
print("INFO: Intentando con EasyOCR como fallback...")
return self.processors['easyocr'].process(image, {"engine": "easyocr"})
raise