Spaces:
Running
Running
| # azure_ocr_processor.py | |
| # Procesador OCR usando Azure Document Intelligence | |
| import json | |
| import numpy as np | |
| from io import BytesIO | |
| from typing import Dict, List | |
| try: | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.ai.documentintelligence import DocumentIntelligenceClient | |
| AZURE_AVAILABLE = True | |
| except ImportError: | |
| AZURE_AVAILABLE = False | |
| print("ADVERTENCIA: azure-ai-documentintelligence no est谩 disponible.") | |
| class AzureOCRProcessor: | |
| """Procesador usando Azure Document Intelligence""" | |
| def __init__(self, endpoint: str = None, key: str = None): | |
| if not AZURE_AVAILABLE: | |
| raise RuntimeError("Azure Document Intelligence no est谩 disponible") | |
| # Usar credenciales desde variables de entorno o par谩metros | |
| import os | |
| # Prioridad: par谩metros > variables de entorno > valores por defecto | |
| self.endpoint = endpoint or os.environ.get( | |
| "AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT", | |
| "https://invoicerecog.cognitiveservices.azure.com/" | |
| ) | |
| self.key = key or os.environ.get( | |
| "AZURE_DOCUMENT_INTELLIGENCE_KEY", | |
| "BnvYqZbBSscFxbxZurfTEj9H6ZP4anDzvE2gQTB8fvau0wzlAk0TJQQJ99BKACYeBjFXJ3w3AAALACOGyauB" | |
| ) | |
| if not self.endpoint or not self.key: | |
| raise ValueError( | |
| "Se requieren credenciales de Azure. " | |
| "Define las variables de entorno AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT " | |
| "y AZURE_DOCUMENT_INTELLIGENCE_KEY, o p谩salas como par谩metros." | |
| ) | |
| print(f"INFO: Inicializando Azure Document Intelligence") | |
| print(f"INFO: Endpoint: {self.endpoint}") | |
| self.client = DocumentIntelligenceClient( | |
| endpoint=self.endpoint, | |
| credential=AzureKeyCredential(self.key) | |
| ) | |
| def process(self, image: np.ndarray, ocr_config: Dict) -> List[Dict]: | |
| """ | |
| Procesa la imagen usando Azure Document Intelligence. | |
| Retorna text_blocks simulando el formato de otros OCR pero con datos estructurados. | |
| """ | |
| model = ocr_config.get("model", "prebuilt-invoice") | |
| print(f"INFO: Procesando con Azure Document Intelligence, modelo: {model}") | |
| # === NUEVO: COMPRESI脫N DE IMAGEN PARA AZURE (PROCESO INDEPENDIENTE) === | |
| # Esta compresi贸n se ejecuta antes del procesamiento normal y no afecta la funcionalidad original | |
| image_to_process = self._compress_image_for_azure(image) | |
| # === FIN COMPRESI脫N === | |
| # Convertir numpy array a bytes (formato PNG) - C脫DIGO ORIGINAL INTACTO | |
| import cv2 | |
| success, encoded_image = cv2.imencode('.png', image_to_process) | |
| if not success: | |
| raise RuntimeError("No se pudo codificar la imagen") | |
| image_bytes = encoded_image.tobytes() | |
| print(f"INFO: Imagen codificada: {len(image_bytes)} bytes") | |
| # Analizar con Azure - C脫DIGO ORIGINAL INTACTO | |
| try: | |
| print("INFO: Enviando imagen a Azure Document Intelligence...") | |
| poller = self.client.begin_analyze_document( | |
| model, | |
| body=BytesIO(image_bytes), | |
| content_type="image/png" | |
| ) | |
| print("INFO: Esperando respuesta de Azure...") | |
| result = poller.result() | |
| print(f"INFO: An谩lisis completado. Documentos encontrados: {len(result.documents) if result.documents else 0}") | |
| # Convertir resultado de Azure a formato de texto estructurado | |
| formatted_text = self._format_azure_result_as_text(result) | |
| # Retornar como un 煤nico text_block con flag especial | |
| return [{ | |
| 'text': formatted_text, | |
| 'x': 0, | |
| 'y': 0, | |
| 'width': 0, | |
| 'height': 0, | |
| 'confidence': 95.0, | |
| 'engine': 'azure', | |
| 'is_azure_structured': True | |
| }] | |
| except Exception as e: | |
| print(f"ERROR en Azure Document Intelligence: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| def _compress_image_for_azure(self, image: np.ndarray) -> np.ndarray: | |
| """ | |
| COMPRESI脫N INDEPENDIENTE: Comprime la imagen para Azure sin afectar el procesamiento original. | |
| Esta funci贸n es completamente independiente y no modifica la l贸gica existente. | |
| """ | |
| import cv2 | |
| # Obtener informaci贸n de la imagen original | |
| height, width = image.shape[:2] | |
| original_size_mb = image.nbytes / (1024 * 1024) | |
| print(f"INFO: Compresi贸n Azure - Imagen original: {width}x{height}, {original_size_mb:.2f}MB") | |
| # Si la imagen ya es peque帽a, no comprimir | |
| if original_size_mb <= 4.5: | |
| print("INFO: Compresi贸n Azure - Imagen ya est谩 dentro del l铆mite, no se requiere compresi贸n") | |
| return image | |
| print("INFO: Compresi贸n Azure - Aplicando compresi贸n...") | |
| # Redimensionar si es muy grande (manteniendo relaci贸n de aspecto) | |
| max_dimension = 2000 | |
| if width > max_dimension or height > max_dimension: | |
| if width > height: | |
| new_width = max_dimension | |
| new_height = int((max_dimension / width) * height) | |
| else: | |
| new_height = max_dimension | |
| new_width = int((max_dimension / height) * width) | |
| print(f"INFO: Compresi贸n Azure - Redimensionando a {new_width}x{new_height}") | |
| compressed_image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) | |
| compressed_size_mb = compressed_image.nbytes / (1024 * 1024) | |
| print(f"INFO: Compresi贸n Azure - Despu茅s de redimensionar: {compressed_size_mb:.2f}MB") | |
| # Verificar si despu茅s de redimensionar ya est谩 dentro del l铆mite | |
| if compressed_size_mb <= 4.5: | |
| return compressed_image | |
| else: | |
| compressed_image = image | |
| # Si a煤n es grande despu茅s de redimensionar, aplicar compresi贸n JPEG temporal | |
| temp_quality = 85 | |
| while temp_quality >= 50: | |
| # Codificar temporalmente como JPEG para ver el tama帽o | |
| success, jpeg_encoded = cv2.imencode('.jpg', compressed_image, [cv2.IMWRITE_JPEG_QUALITY, temp_quality]) | |
| if success: | |
| jpeg_size_mb = len(jpeg_encoded.tobytes()) / (1024 * 1024) | |
| print(f"INFO: Compresi贸n Azure - Calidad {temp_quality}: {jpeg_size_mb:.2f}MB") | |
| if jpeg_size_mb <= 4.5: | |
| print(f"INFO: Compresi贸n Azure - Calidad {temp_quality} aceptada") | |
| # Decodificar de vuelta a numpy array para mantener compatibilidad | |
| decoded_image = cv2.imdecode(jpeg_encoded, cv2.IMREAD_COLOR) | |
| if decoded_image is not None: | |
| final_size_mb = decoded_image.nbytes / (1024 * 1024) | |
| print(f"INFO: Compresi贸n Azure - Imagen final: {final_size_mb:.2f}MB") | |
| return decoded_image | |
| temp_quality -= 10 | |
| # Si llegamos aqu铆, usar la imagen redimensionada sin compresi贸n JPEG | |
| print("INFO: Compresi贸n Azure - Usando imagen redimensionada sin compresi贸n JPEG adicional") | |
| return compressed_image | |
| def _format_azure_result_as_text(self, result) -> str: | |
| """ | |
| Convierte el resultado de Azure a un texto formateado limpio (sin l铆neas de confianza). | |
| """ | |
| output_lines = [] | |
| if not result.documents: | |
| return "ERROR: No se encontraron documentos en la factura" | |
| # Procesar el primer documento | |
| document = result.documents[0] | |
| fields = document.fields | |
| output_lines.append("-------- An谩lisis de Azure Document Intelligence --------") | |
| output_lines.append("") | |
| # Informaci贸n del proveedor | |
| vendor_name = fields.get("VendorName") | |
| if vendor_name: | |
| output_lines.append(f"Proveedor: {vendor_name.content}") | |
| vendor_address = fields.get("VendorAddress") | |
| if vendor_address: | |
| output_lines.append(f"Direcci贸n: {vendor_address.content}") | |
| vendor_tax = fields.get("VendorTaxId") | |
| if vendor_tax: | |
| output_lines.append(f"GST/HST: {vendor_tax.content}") | |
| output_lines.append("") | |
| # Informaci贸n de la factura | |
| invoice_id = fields.get("InvoiceId") | |
| if invoice_id: | |
| output_lines.append(f"Invoice ID: {invoice_id.content}") | |
| invoice_date = fields.get("InvoiceDate") | |
| if invoice_date: | |
| output_lines.append(f"Fecha: {invoice_date.content}") | |
| customer_name = fields.get("CustomerName") | |
| if customer_name: | |
| output_lines.append(f"Cliente: {customer_name.content}") | |
| output_lines.append("") | |
| output_lines.append("=" * 60) | |
| output_lines.append("脥TEMS DE LA FACTURA") | |
| output_lines.append("=" * 60) | |
| output_lines.append("") | |
| # Extraer items | |
| items_field = fields.get("Items") | |
| total_items = 0 | |
| if items_field and hasattr(items_field, "value_array"): | |
| total_items = len(items_field.value_array) | |
| print(f"INFO: Procesando {total_items} items...") | |
| for item_idx, item in enumerate(items_field.value_array): | |
| item_obj = item.value_object if hasattr(item, "value_object") else {} | |
| output_lines.append(f"--- 脥tem #{item_idx + 1} ---") | |
| # C贸digo de producto | |
| product_code = item_obj.get("ProductCode") | |
| if product_code and product_code.content: | |
| output_lines.append(f"C贸digo: {product_code.content}") | |
| # Descripci贸n | |
| description = item_obj.get("Description") | |
| if description and description.content: | |
| output_lines.append(f"Descripci贸n: {description.content}") | |
| # Cantidad | |
| quantity = item_obj.get("Quantity") | |
| if quantity and quantity.content: | |
| output_lines.append(f"Cantidad: {quantity.content}") | |
| # Precio unitario | |
| unit_price = item_obj.get("UnitPrice") | |
| if unit_price and unit_price.content: | |
| output_lines.append(f"Precio unitario: {unit_price.content}") | |
| # Impuesto por 铆tem - SOLO si es > 0 | |
| tax = item_obj.get("Tax") | |
| if tax and tax.content: | |
| try: | |
| # Extraer el valor num茅rico del tax | |
| tax_value_str = tax.content.replace('$', '').replace(',', '').strip() | |
| tax_value = float(tax_value_str) | |
| # Solo incluir si es mayor a 0 | |
| if tax_value > 0: | |
| output_lines.append(f"Impuesto (H): {tax.content}") | |
| except (ValueError, AttributeError): | |
| pass | |
| # Total por 铆tem | |
| amount = item_obj.get("Amount") | |
| if amount and amount.content: | |
| output_lines.append(f"Total por 铆tem: {amount.content}") | |
| output_lines.append("") | |
| else: | |
| output_lines.append("No se encontraron items en la factura") | |
| # Totales | |
| output_lines.append("=" * 60) | |
| output_lines.append("TOTALES") | |
| output_lines.append("=" * 60) | |
| output_lines.append("") | |
| subtotal = fields.get("SubTotal") | |
| if subtotal and subtotal.content: | |
| output_lines.append(f"Subtotal: {subtotal.content}") | |
| total_tax = fields.get("TotalTax") | |
| if total_tax and total_tax.content: | |
| output_lines.append(f"Total impuestos: {total_tax.content}") | |
| invoice_total = fields.get("InvoiceTotal") | |
| if invoice_total and invoice_total.content: | |
| output_lines.append(f"Total de la factura: {invoice_total.content}") | |
| output_lines.append("") | |
| output_lines.append("=" * 60) | |
| output_lines.append(f"Total de items extra铆dos: {total_items}") | |
| output_lines.append("=" * 60) | |
| formatted_text = "\n".join(output_lines) | |
| print(f"\n{'='*60}") | |
| print("TEXTO FORMATEADO GENERADO:") | |
| print(f"{'='*60}") | |
| print(formatted_text[:800] + "..." if len(formatted_text) > 800 else formatted_text) | |
| print(f"{'='*60}\n") | |
| return formatted_text |