Spaces:
Running
Running
File size: 13,846 Bytes
0a6b0fb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# ocr_processors.py
# Procesadores OCR independientes y su gestor
import cv2
import numpy as np
import easyocr
from typing import Dict, List
from dollar_correction import DollarSignCorrectionProcessor
from unified_extractors import Vendor, VendorSchemaManager
try:
import pytesseract
from pytesseract import Output
PYTESSERACT_AVAILABLE = True
except ImportError:
PYTESSERACT_AVAILABLE = False
print("ADVERTENCIA: pytesseract no est谩 disponible. Usando EasyOCR por defecto.")
from azure_ocr_processor import AzureOCRProcessor, AZURE_AVAILABLE
class OCRProcessor:
"""Clase base para procesadores OCR"""
def __init__(self):
pass
def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Procesa la imagen y retorna bloques de texto"""
raise NotImplementedError
class EasyOCRProcessor(OCRProcessor):
"""Procesador usando EasyOCR"""
def __init__(self):
super().__init__()
self.reader = easyocr.Reader(['en', 'fr'], gpu=False)
def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Extrae texto usando EasyOCR"""
results = self.reader.readtext(
image,
contrast_ths=0.05,
adjust_contrast=0.7,
low_text=0.3,
detail=1
)
text_blocks = []
for (bbox, text, confidence) in results:
if confidence > 0.3:
x_coords = [point[0] for point in bbox]
y_coords = [point[1] for point in bbox]
text_blocks.append({
'text': text.strip(),
'x': min(x_coords),
'y': min(y_coords),
'width': max(x_coords) - min(x_coords),
'height': max(y_coords) - min(y_coords),
'confidence': confidence * 100,
'engine': 'easyocr'
})
return sorted(text_blocks, key=lambda b: (b['y'], b['x']))
class PytesseractOCRProcessor(OCRProcessor):
"""Procesador usando Pytesseract con soporte para tablas"""
def __init__(self):
super().__init__()
if not PYTESSERACT_AVAILABLE:
raise RuntimeError("Pytesseract no est谩 disponible")
def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Extrae texto usando Pytesseract"""
mode = ocr_config.get("mode", "block")
# Preprocesar imagen
processed_image = self._preprocess_image(image, ocr_config)
if mode == "table":
text_blocks = self._extract_table_structure(processed_image, ocr_config)
# Si se requiere reconstrucci贸n multilinea
if ocr_config.get("requires_reconstruction", False):
reconstructed_text = self._reconstruct_multiline_text(text_blocks, ocr_config)
if reconstructed_text:
text_blocks.append({
'text': f"TEXTO_RECONSTRUIDO:\n{reconstructed_text}",
'x': 0,
'y': 0,
'width': 100,
'height': 100,
'confidence': 100,
'engine': 'reconstructed',
'is_reconstructed': True
})
else:
text_blocks = self._extract_block_structure(processed_image)
return text_blocks
def _preprocess_image(self, image: np.ndarray, ocr_config: Dict) -> np.ndarray:
"""Preprocesa la imagen seg煤n configuraci贸n"""
preprocessing = ocr_config.get("preprocessing", {})
# Convertir a escala de grises
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
# Aplicar denoising si est谩 configurado
if preprocessing.get("denoise", False):
gray = cv2.medianBlur(gray, 3)
# Aplicar enhancement si est谩 configurado
if preprocessing.get("enhance", False):
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
gray = clahe.apply(gray)
# Aplicar binarizaci贸n si est谩 configurado
if preprocessing.get("binarize", False):
gray = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 15, 8
)
# Limpieza morfol贸gica
kernel = np.ones((2,2), np.uint8)
gray = cv2.morphologyEx(gray, cv2.MORPH_CLOSE, kernel)
gray = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel)
return gray
def _extract_table_structure(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]:
"""Extrae estructura de tabla"""
custom_config = r'--oem 3 --psm 6 -c preserve_interword_spaces=1'
table_data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config)
text_blocks = []
n_boxes = len(table_data['text'])
for i in range(n_boxes):
text = table_data['text'][i].strip()
confidence = int(table_data['conf'][i])
if text and confidence > 20:
text_blocks.append({
'text': text,
'x': table_data['left'][i],
'y': table_data['top'][i],
'width': table_data['width'][i],
'height': table_data['height'][i],
'confidence': confidence,
'block_num': table_data['block_num'][i],
'par_num': table_data['par_num'][i],
'line_num': table_data['line_num'][i],
'word_num': table_data['word_num'][i],
'engine': 'pytesseract'
})
# Si hay muy pocos bloques, intentar con m茅todos alternativos
if len(text_blocks) < 10:
return self._extract_with_alternative_methods(image)
return text_blocks
def _extract_with_alternative_methods(self, image: np.ndarray) -> List[Dict]:
"""Intenta extraer con m煤ltiples configuraciones"""
configs = [
r'--oem 3 --psm 4',
r'--oem 3 --psm 6',
r'--oem 3 --psm 8',
r'--oem 3 --psm 11',
]
all_blocks = []
for config in configs:
try:
data = pytesseract.image_to_data(image, output_type=Output.DICT, config=config)
for i in range(len(data['text'])):
text = data['text'][i].strip()
if text and int(data['conf'][i]) > 10:
all_blocks.append({
'text': text,
'x': data['left'][i],
'y': data['top'][i],
'width': data['width'][i],
'height': data['height'][i],
'confidence': int(data['conf'][i]),
'engine': 'pytesseract_alt'
})
except Exception as e:
print(f"ADVERTENCIA: Fall贸 configuraci贸n {config}: {e}")
# Eliminar duplicados
unique_blocks = []
seen_positions = set()
for block in all_blocks:
position_key = (block['x'], block['y'], block['text'])
if position_key not in seen_positions:
seen_positions.add(position_key)
unique_blocks.append(block)
return sorted(unique_blocks, key=lambda b: (b['y'], b['x']))
def _extract_block_structure(self, image: np.ndarray) -> List[Dict]:
"""Extrae estructura de bloques"""
custom_config = r'--oem 3 --psm 1'
data = pytesseract.image_to_data(image, output_type=Output.DICT, config=custom_config)
text_blocks = []
n_boxes = len(data['text'])
for i in range(n_boxes):
text = data['text'][i].strip()
confidence = int(data['conf'][i])
if text and confidence > 30:
text_blocks.append({
'text': text,
'x': data['left'][i],
'y': data['top'][i],
'width': data['width'][i],
'height': data['height'][i],
'confidence': confidence,
'engine': 'pytesseract'
})
return sorted(text_blocks, key=lambda b: (b['y'], b['x']))
def _reconstruct_multiline_text(self, text_blocks: List[Dict], ocr_config: Dict) -> str:
"""Reconstruye texto multilinea para proveedores que lo requieren"""
# Filtrar bloques reconstruidos previos
original_blocks = [block for block in text_blocks if not block.get('is_reconstructed')]
if not original_blocks:
return ""
# Agrupar en l铆neas
line_threshold = ocr_config.get("line_threshold", 20)
lines = self._group_into_lines(original_blocks, line_threshold)
# Reconstruir texto
reconstructed_text = ""
for line_blocks in lines:
line_blocks.sort(key=lambda b: b['x'])
line_text = ' '.join(block['text'].strip() for block in line_blocks)
if line_text.strip():
reconstructed_text += line_text + "\n"
return reconstructed_text
def _group_into_lines(self, sorted_blocks: List[Dict], line_threshold: int = 20) -> List[List[Dict]]:
"""Agrupa bloques en l铆neas"""
if not sorted_blocks:
return []
sorted_blocks = sorted(sorted_blocks, key=lambda b: b['y'])
lines = []
current_line = [sorted_blocks[0]]
current_y = sorted_blocks[0]['y']
for block in sorted_blocks[1:]:
y_diff = abs(block['y'] - current_y)
if y_diff <= line_threshold:
current_line.append(block)
current_y = sum(b['y'] for b in current_line) / len(current_line)
else:
current_line.sort(key=lambda b: b['x'])
lines.append(current_line)
current_line = [block]
current_y = block['y']
if current_line:
current_line.sort(key=lambda b: b['x'])
lines.append(current_line)
return lines
# Modificar la clase OCRManager:
class OCRManager:
"""Gestiona los diferentes procesadores OCR seg煤n el proveedor"""
def __init__(self):
self.processors = {
'easyocr': EasyOCRProcessor(),
'pytesseract': PytesseractOCRProcessor() if PYTESSERACT_AVAILABLE else None,
'azure': None # Se inicializar谩 bajo demanda
}
def _get_azure_processor(self):
"""Inicializa el procesador Azure bajo demanda"""
if self.processors['azure'] is None and AZURE_AVAILABLE:
try:
self.processors['azure'] = AzureOCRProcessor()
print("INFO: Procesador Azure Document Intelligence inicializado")
except Exception as e:
print(f"ERROR al inicializar Azure: {e}")
return None
return self.processors['azure']
def extract_text_with_positions(self, image: np.ndarray, vendor: Vendor, schema_manager: VendorSchemaManager) -> List[Dict]:
"""Extrae texto usando el procesador apropiado para el proveedor"""
# Obtener configuraci贸n OCR del proveedor
ocr_config = schema_manager.get_ocr_config(vendor)
engine = ocr_config.get("engine", "easyocr")
print(f"INFO: Usando engine '{engine}' para proveedor {vendor.value}")
print(f"INFO: Configuraci贸n OCR: {ocr_config}")
# Seleccionar procesador
if engine == 'azure':
processor = self._get_azure_processor()
if processor is None:
print("ADVERTENCIA: Azure no disponible, usando EasyOCR como fallback")
processor = self.processors['easyocr']
ocr_config = {"engine": "easyocr", "mode": "block"}
else:
processor = self.processors.get(engine)
if processor is None:
print(f"ADVERTENCIA: Engine '{engine}' no disponible, usando EasyOCR")
processor = self.processors['easyocr']
ocr_config = {"engine": "easyocr", "mode": "block"}
# Procesar imagen
try:
text_blocks = processor.process(image, ocr_config)
print(f"INFO: Extra铆dos {len(text_blocks)} bloques de texto con {engine}")
# NO aplicar correcci贸n $ vs 8 para Azure (ya viene procesado)
if engine != 'azure':
dollar_correction_config = ocr_config.get("dollar_sign_correction", {})
if dollar_correction_config.get("enabled", False):
print(f"INFO: Aplicando correcci贸n $ vs 8 para {vendor.value}")
corrector = DollarSignCorrectionProcessor(dollar_correction_config)
text_blocks = corrector.process(text_blocks)
return text_blocks
except Exception as e:
print(f"ERROR en procesamiento OCR con {engine}: {e}")
# Fallback a EasyOCR
if engine != 'easyocr':
print("INFO: Intentando con EasyOCR como fallback...")
return self.processors['easyocr'].process(image, {"engine": "easyocr"})
raise |