import os import re import json import time import base64 from pathlib import Path from dotenv import load_dotenv load_dotenv() try: from openai import OpenAI AI_AVAILABLE = True except ImportError: AI_AVAILABLE = False try: import PIL.Image PIL_OK = True except ImportError: PIL_OK = False try: import pdfplumber PDF_OK = True except ImportError: PDF_OK = False try: import pandas as pd PANDAS_OK = True except ImportError: PANDAS_OK = False _CLIENT = None # Modelo de IA a través de Groq (Gratuito de emergencia) MODELO = os.environ.get("AI_MODEL", "llama-3.2-90b-vision-preview") TIPOS_IMAGEN = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp", ".tiff", ".tif"} TIPOS_PDF = {".pdf"} TIPOS_EXCEL = {".xlsx", ".xls", ".csv", ".ods"} def _get_client(): global _CLIENT if not AI_AVAILABLE: return None if _CLIENT is None: key = os.environ.get("GROQ_API_KEY", "") if not key: return None _CLIENT = OpenAI( base_url="https://api.groq.com/openai/v1", api_key=key, ) return _CLIENT def _tipo_archivo(path: str) -> str: ext = Path(path).suffix.lower() if ext in TIPOS_IMAGEN: return "imagen" if ext in TIPOS_PDF: return "pdf" if ext in TIPOS_EXCEL: return "excel" return "desconocido" def _parse_json(text: str) -> dict: clean = (text or "").strip() if "```" in clean: for bloque in clean.split("```")[1::2]: candidato = bloque.lstrip("json").strip() try: return json.loads(candidato) except Exception: continue try: return json.loads(clean) except Exception: return {"error": "Respuesta de IA en formato inesperado.", "texto_libre": clean} def _encode_image(image_path: str) -> str: with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # ─── Prompts ────────────────────────────────────────────────────────────────── PROMPT_IMAGEN = """Eres un asistente de emergencia médica. Se te envía una imagen que puede ser: foto de un paciente, cédula de identidad, brazalete hospitalario, lista impresa, captura de WhatsApp u otro. REGLAS ESTRICTAS — sin excepción: 1. Si no puedes leer un texto -> escribe "No legible". NUNCA inventes. 2. Si la imagen es borrosa o ilegible -> di exactamente eso. 3. NUNCA estimes ni inventes nombres, cédulas, edades u otros datos. 4. Solo reporta lo que puedes ver con certeza absoluta. 5. Si la imagen contiene una LISTA de varias personas, extrae TODAS en "personas_multiples". 6. LIMPIA LOS NOMBRES: Si el nombre tiene casillas como "( )", "[]" o ubicaciones al lado (ej. "Caribe", "Los Corales"), quita los símbolos y mueve la ubicación al campo "notas" o "hospital". El campo "nombre" SOLO debe tener nombre y apellido. Ignora líneas que sean títulos como "Primera Lista" o "GUAIRA)". 7. EXTRAE EL HOSPITAL CONTEXTUAL: Si el texto introductorio de la imagen menciona un hospital general o centro médico (ej. "Hospital Miguel Pérez Carreño (La Yaguara)"), aplícalo a TODOS los pacientes de la lista en su campo "hospital". Responde ÚNICAMENTE en JSON: { "tipo_imagen": "descripción de qué es", "procesable": true, "nombre_detectado": null, "cedula_detectada": null, "edad_estimada": null, "descripcion_fisica": null, "texto_visible": "todo texto legible o 'Ninguno legible'", "condicion_aparente": null, "observaciones": null, "personas_multiples": [], "resumen": "resumen en 1-2 oraciones" } Cada objeto de "personas_multiples" debe tener: nombre, cedula, edad, hospital, condicion, notas (null si no aparecen). Si la imagen no es procesable -> procesable: false y explica en observaciones.""" PROMPT_PDF = """Eres un asistente de emergencia médica. Se te comparte texto extraído de un documento PDF. Puede ser un registro hospitalario, lista de pacientes, acta médica u otro documento de emergencia. REGLAS ESTRICTAS: 1. Extrae SOLO información que está escrita claramente en el texto. 2. Si un campo no está -> usa null. NUNCA inventes datos. 3. Si hay varias personas, extráelas TODAS en "personas_multiples". 4. LIMPIA LOS NOMBRES: Elimina símbolos de casillas vacías "( )", "[]" y mueve ubicaciones (ej. "Los Corales") a "notas". Ignora encabezados y ruido. Texto del PDF: {texto} Responde ÚNICAMENTE en JSON: { "tipo_documento": "descripción del documento", "procesable": true, "nombre_detectado": null, "cedula_detectada": null, "edad_estimada": null, "hospital_detectado": null, "condicion_detectada": null, "observaciones": null, "personas_multiples": [], "resumen": "resumen en 1-2 oraciones" } Cada objeto en "personas_multiples" debe tener: nombre, cedula, edad, hospital, condicion, notas (null si no están).""" PROMPT_EXCEL = """Eres un asistente de emergencia. Se te envía una tabla en formato texto extraída de un Excel o CSV. Puede ser una lista de pacientes en un hospital, registro de heridos u otro listado de emergencia. REGLAS ESTRICTAS: 1. Extrae EXACTAMENTE lo que está en la tabla, sin inventar. 2. Mapea las columnas al esquema de la base de datos. 3. Si un campo no existe en la tabla -> null. Tabla: {tabla} Responde ÚNICAMENTE en JSON: { "tipo_documento": "descripción del documento", "procesable": true, "columnas_detectadas": ["col1", "col2"], "personas_multiples": [ {"nombre": null, "cedula": null, "edad": null, "hospital": null, "condicion": null, "notas": null} ], "observaciones": null, "resumen": "resumen en 1-2 oraciones" }""" PROMPT_BUSCAR = """Eres un asistente de emergencia. Una familia busca a un ser querido perdido. Se te envía una imagen (foto, cédula, captura u otro tipo). REGLAS ESTRICTAS: 1. Describe SOLO lo que puedes ver con certeza. 2. NUNCA inventes rasgos, nombres ni datos. 3. Si la imagen es poco clara -> dilo. Responde ÚNICAMENTE en JSON: { "tipo_imagen": "qué es la imagen", "procesable": true, "descripcion_busqueda": null, "genero": "masculino / femenino / no determinable", "edad_estimada": null, "caracteristicas_clave": null, "nombre_visible": null, "cedula_visible": null, "advertencia": null }""" # ─── Procesadores por tipo ──────────────────────────────────────────────────── def _es_error_cuota(msg: str) -> bool: m = (msg or "").lower() return ("429" in m or "insufficient_quota" in m or "rate limit" in m or "exceeded" in m or "balance" in m) def _ocr_tesseract(path: str): """OCR local gratuito (sin cuota). Devuelve el texto leído o None.""" try: import pytesseract if not PIL_OK: return None img = PIL.Image.open(path) try: return pytesseract.image_to_string(img, lang="spa") except Exception: return pytesseract.image_to_string(img) except Exception: return None def _parse_lista_texto(texto: str) -> list: personas = [] hospital_context = None # Buscar posible nombre de hospital en el texto introductorio (primeros 300 caracteres) m_hosp = re.search(r"(Hospital\s+[A-Za-zÁÉÍÓÚáéíóúÑñ\s()]+|Clínica\s+[A-Za-zÁÉÍÓÚáéíóúÑñ\s()]+)", (texto or "")[:300], re.IGNORECASE) if m_hosp: hospital_context = m_hosp.group(1).strip() for linea in (texto or "").splitlines(): l = linea.strip() if not l: continue # Eliminar números de viñeta al inicio l = re.sub(r"^\s*\d{1,3}\s*[.\)\-]\s*", "", l).strip() # Eliminar marcas de casillas vacías como ( ) o [ ] l = re.sub(r"\(\s*\)|\[\s*\]|\{\s*\}", "", l) if not l: continue edad = None m_edad = re.search(r"(\d{1,3})\s*a\w{0,3}os?\b", l, re.IGNORECASE) if m_edad: edad = m_edad.group(1) l = (l[:m_edad.start()] + " " + l[m_edad.end():]).strip() cedula = None # Soportar cédulas con puntos (ej 17.856.045) o sin puntos (ej 17856045) m_ced = re.search(r"\b(\d{1,3}(?:\.\d{3}){1,2}|\d{6,9})\b", l) if m_ced: cedula = m_ced.group(1).replace(".", "") l = (l[:m_ced.start()] + " " + l[m_ced.end():]).strip() nombre = re.sub(r"[.\-•|()]+", " ", l) nombre = re.sub(r"\s+", " ", nombre).strip(" .,-()") # Extraer locaciones como "Caribe", "Los Corales", etc., del nombre y pasarlas a notas locaciones_conocidas = r"\b(Caribe|Los Corales|Pariata|Maiquetia|Macuto|Hospital|Clinica|Clínica|Centro)\b" notas = None # Buscar locaciones m_loc = re.search(locaciones_conocidas, nombre, re.IGNORECASE) if m_loc: notas = f"Ubicación detectada: {m_loc.group(1).title()}" # Remover la locación del nombre nombre = re.sub(locaciones_conocidas, "", nombre, flags=re.IGNORECASE).strip() nombre = re.sub(r"\s+", " ", nombre) # limpiar dobles espacios que hayan quedado # Ignorar encabezados o ruido obvio ruido = r"LISTADO|PACIENTES|HOSPITAL|SIN FAMILIAR|NOMBRE|LISTA|GUAIRA|PAGINA|HOJA" if re.search(ruido, nombre, re.IGNORECASE) and not edad and not cedula: continue # Si quedó algo muy corto o números sueltos if len(nombre) < 3 and not cedula and not edad: continue personas.append({"nombre": nombre or None, "cedula": cedula, "edad": edad, "hospital": hospital_context, "condicion": "Sin información", "notas": notas}) return personas def _fallback_ocr(path: str) -> dict: texto = _ocr_tesseract(path) if not texto or not texto.strip(): return { "procesable": False, "error": "La IA no está disponible y el OCR local no pudo leer la imagen. Revisa credenciales o Tesseract.", "resumen": "Sin IA ni OCR disponible.", } personas = _parse_lista_texto(texto) uno = personas[0] if len(personas) == 1 else {} return { "procesable": True, "tipo_imagen": "Lista leída con OCR local (sin IA)", "texto_visible": texto.strip(), "personas_multiples": personas, "nombre_detectado": uno.get("nombre"), "cedula_detectada": uno.get("cedula"), "edad_estimada": uno.get("edad"), "nombre_visible": uno.get("nombre"), "cedula_visible": uno.get("cedula"), "descripcion_busqueda": "Texto leído por OCR local (sin IA).", "resumen": f"⚠️ IA inactiva — se usó OCR local. Se leyeron {len(personas)} línea(s). Revisa bien los datos.", "observaciones": "Extraído con OCR local (Tesseract).", } def _intentar_openrouter_imagen(path: str, prompt: str): client = _get_client() if not client or not PIL_OK: return None try: PIL.Image.open(path).verify() base64_img = _encode_image(path) except Exception: return {"procesable": False, "error": "Formato de imagen no soportado o archivo corrupto."} ext = Path(path).suffix.lower().replace(".", "") mime = "image/jpeg" if ext in ["jpg", "jpeg"] else f"image/{ext}" for intento in range(3): try: response = client.chat.completions.create( model=MODELO, messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": {"url": f"data:{mime};base64,{base64_img}"} } ] } ] ) return _parse_json(response.choices[0].message.content) except Exception as e: msg = str(e) if _es_error_cuota(msg): return None if intento < 2: time.sleep(1.5 * (intento + 1)) continue return None return None def _procesar_imagen(path: str, prompt: str) -> dict: resultado = _intentar_openrouter_imagen(path, prompt) if resultado is not None and resultado.get("procesable") is not False: return resultado return _fallback_ocr(path) def _procesar_pdf(path: str) -> dict: if not PDF_OK: return {"procesable": False, "error": "Librería pdfplumber no instalada."} try: textos = [] with pdfplumber.open(path) as pdf: for pagina in pdf.pages: t = pagina.extract_text() if t: textos.append(t.strip()) if not textos: return { "procesable": False, "error": "El PDF no contiene texto legible.", "resumen": "PDF sin texto extraíble." } texto_completo = "\n\n".join(textos) if len(texto_completo) > 8000: texto_completo = texto_completo[:8000] + "\n...[texto truncado]" client = _get_client() if not client: return _fallback_texto_pdf(texto_completo, "IA no configurada") prompt = PROMPT_PDF.replace("{texto}", texto_completo) for intento in range(3): try: response = client.chat.completions.create( model=MODELO, messages=[{"role": "user", "content": prompt}] ) resultado = _parse_json(response.choices[0].message.content) resultado["tipo_imagen"] = resultado.get("tipo_documento", "PDF") return resultado except Exception as e: if _es_error_cuota(str(e)): return _fallback_texto_pdf(texto_completo, "cuota agotada") if intento < 2: time.sleep(1.5 * (intento + 1)) continue return _fallback_texto_pdf(texto_completo, "IA no disponible") except Exception as e: return {"procesable": False, "error": f"Error al leer el PDF: {str(e)}"} def _fallback_texto_pdf(texto: str, motivo: str) -> dict: personas = _parse_lista_texto(texto) if not personas: return {"procesable": False, "error": f"No se pudo procesar con IA ({motivo}) y no hay personas detectadas en texto libre.", "resumen": "Sin datos extraíbles."} return { "procesable": True, "tipo_imagen": "PDF leído sin IA (texto directo)", "personas_multiples": personas, "resumen": f"⚠️ {motivo}: se detectaron {len(personas)} personas directamente del texto. Revisa datos.", "observaciones": "Extraído del texto del PDF sin IA.", } def _procesar_excel(path: str) -> dict: if not PANDAS_OK: return {"procesable": False, "error": "Librería pandas no disponible."} try: ext = Path(path).suffix.lower() if ext == ".csv": df = pd.read_csv(path, encoding="utf-8", encoding_errors="replace") else: df = pd.read_excel(path) if df.empty: return {"procesable": False, "error": "El archivo está vacío."} if not _mapear_columnas(df.columns.tolist()): df = _redetectar_encabezado(path, ext, df) mapa_cols = _mapear_columnas(df.columns.tolist()) if mapa_cols: personas = [] for _, fila in df.iterrows(): p = {} for campo, col_excel in mapa_cols.items(): val = fila.get(col_excel) p[campo] = str(val).strip() if val is not None and str(val) != "nan" else None if p.get("nombre") or p.get("cedula"): personas.append(p) return { "procesable": True, "tipo_imagen": f"Excel/CSV con {len(personas)} registros", "personas_multiples": personas, "resumen": f"Se detectaron {len(personas)} personas directamente del archivo.", "columnas_detectadas": list(mapa_cols.keys()) } tabla_texto = df.head(80).to_string(index=False) if len(tabla_texto) > 6000: tabla_texto = tabla_texto[:6000] + "\n...[tabla truncada]" client = _get_client() if not client: return {"procesable": False, "error": "IA no configurada. Añade OPENROUTER_API_KEY."} prompt = PROMPT_EXCEL.replace("{tabla}", tabla_texto) response = client.chat.completions.create( model=MODELO, messages=[{"role": "user", "content": prompt}] ) resultado = _parse_json(response.choices[0].message.content) resultado["tipo_imagen"] = resultado.get("tipo_documento", "Excel/CSV") return resultado except Exception as e: return {"procesable": False, "error": f"Error al leer el archivo: {str(e)}"} def _redetectar_encabezado(path: str, ext: str, df_orig): try: crudo = (pd.read_csv(path, header=None, nrows=15, encoding="utf-8", encoding_errors="replace") if ext == ".csv" else pd.read_excel(path, header=None, nrows=15)) for i in range(len(crudo)): posibles = [str(x) for x in crudo.iloc[i].tolist()] if _mapear_columnas(posibles): return (pd.read_csv(path, header=i, encoding="utf-8", encoding_errors="replace") if ext == ".csv" else pd.read_excel(path, header=i)) except Exception: pass return df_orig def _mapear_columnas(columnas: list) -> dict | None: c = {str(col).lower().strip(): col for col in columnas} mapa = {} for campo, variantes in { "nombre": ["nombre", "name", "paciente", "apellido y nombre", "nombres y apellidos", "apellidos y nombres"], "cedula": ["cedula", "cédula", "ci", "documento", "id", "cedula de identidad", "cédula / id"], "edad": ["edad", "age", "años"], "hospital": ["hospital", "centro", "clinica", "clínica", "lugar", "ubicacion", "ubicación"], "condicion":["condicion", "condición", "estado", "status", "diagnostico", "diagnóstico"], "contacto": ["telefono", "teléfono", "tel", "celular", "contacto"], "notas": ["notas", "observaciones", "nota", "comentarios", "descripcion", "descripción", "direccion"], }.items(): for v in variantes: if v in c: mapa[campo] = c[v] break if "nombre" in mapa or "cedula" in mapa: return mapa return None # ─── Funciones públicas ─────────────────────────────────────────────────────── def analizar_archivo(path: str) -> dict: if not path: return {"procesable": False, "error": "No se recibió ningún archivo."} tipo = _tipo_archivo(path) if tipo == "imagen": return _procesar_imagen(path, PROMPT_IMAGEN) elif tipo == "pdf": return _procesar_pdf(path) elif tipo == "excel": return _procesar_excel(path) else: ext = Path(path).suffix return { "procesable": False, "error": f"Tipo de archivo '{ext}' no soportado." } def extraer_lista(path: str) -> dict: res = analizar_archivo(path) if not res.get("procesable", True): return {"procesable": False, "personas": [], "error": res.get("error") or res.get("observaciones") or "No procesable.", "resumen": res.get("resumen", "")} personas = list(res.get("personas_multiples") or []) if not personas and (res.get("nombre_detectado") or res.get("cedula_detectada")): personas = [{ "nombre": res.get("nombre_detectado"), "cedula": res.get("cedula_detectada"), "edad": res.get("edad_estimada"), "condicion": res.get("condicion_aparente") or res.get("condicion_detectada"), "notas": res.get("observaciones"), }] limpias = [] for p in personas: nombre = _limpiar(p.get("nombre")) cedula = _limpiar(p.get("cedula")) if not nombre and not cedula: continue limpias.append({ "nombre": nombre, "cedula": cedula, "edad": _edad_limpia(p.get("edad")), "condicion": _limpiar(p.get("condicion")) or "Sin información", "notas": _limpiar(p.get("notas")), }) return { "procesable": True, "personas": limpias, "resumen": res.get("resumen", f"Se detectaron {len(limpias)} personas."), "error": None, } def _limpiar(v): if v in (None, "", "null", "None", "nan", "No legible", "No detectado"): return None return str(v).strip() def _edad_limpia(v): s = _limpiar(v) if not s: return None try: import re as _re m = _re.search(r"\d+", s) return str(int(float(m.group()))) if m else None except (TypeError, ValueError): return s def describir_para_busqueda(path: str) -> dict: if not path: return {"procesable": False, "error": "No se recibió ningún archivo."} tipo = _tipo_archivo(path) if tipo == "imagen": return _procesar_imagen(path, PROMPT_BUSCAR) return analizar_archivo(path) def analizar_imagen_hospital(path: str) -> dict: return analizar_archivo(path) def describir_imagen_busqueda(path: str) -> dict: return describir_para_busqueda(path)