| import os |
| import re |
| import json |
| import time |
| import base64 |
| from pathlib import Path |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| try: |
| from openai import OpenAI |
| AI_AVAILABLE = True |
| except ImportError: |
| AI_AVAILABLE = False |
|
|
| try: |
| import PIL.Image |
| PIL_OK = True |
| except ImportError: |
| PIL_OK = False |
|
|
| try: |
| import pdfplumber |
| PDF_OK = True |
| except ImportError: |
| PDF_OK = False |
|
|
| try: |
| import pandas as pd |
| PANDAS_OK = True |
| except ImportError: |
| PANDAS_OK = False |
|
|
| _CLIENT = None |
|
|
| |
| MODELO = os.environ.get("AI_MODEL", "llama-3.2-90b-vision-preview") |
|
|
| TIPOS_IMAGEN = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp", ".tiff", ".tif"} |
| TIPOS_PDF = {".pdf"} |
| TIPOS_EXCEL = {".xlsx", ".xls", ".csv", ".ods"} |
|
|
|
|
| def _get_client(): |
| global _CLIENT |
| if not AI_AVAILABLE: |
| return None |
| if _CLIENT is None: |
| key = os.environ.get("GROQ_API_KEY", "") |
| if not key: |
| return None |
| _CLIENT = OpenAI( |
| base_url="https://api.groq.com/openai/v1", |
| api_key=key, |
| ) |
| return _CLIENT |
|
|
|
|
| def _tipo_archivo(path: str) -> str: |
| ext = Path(path).suffix.lower() |
| if ext in TIPOS_IMAGEN: |
| return "imagen" |
| if ext in TIPOS_PDF: |
| return "pdf" |
| if ext in TIPOS_EXCEL: |
| return "excel" |
| return "desconocido" |
|
|
|
|
| def _parse_json(text: str) -> dict: |
| clean = (text or "").strip() |
| if "```" in clean: |
| for bloque in clean.split("```")[1::2]: |
| candidato = bloque.lstrip("json").strip() |
| try: |
| return json.loads(candidato) |
| except Exception: |
| continue |
| try: |
| return json.loads(clean) |
| except Exception: |
| return {"error": "Respuesta de IA en formato inesperado.", "texto_libre": clean} |
|
|
|
|
| def _encode_image(image_path: str) -> str: |
| with open(image_path, "rb") as image_file: |
| return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
| |
|
|
| PROMPT_IMAGEN = """Eres un asistente de emergencia médica. Se te envía una imagen que puede ser: |
| foto de un paciente, cédula de identidad, brazalete hospitalario, lista impresa, captura de WhatsApp u otro. |
| |
| REGLAS ESTRICTAS — sin excepción: |
| 1. Si no puedes leer un texto -> escribe "No legible". NUNCA inventes. |
| 2. Si la imagen es borrosa o ilegible -> di exactamente eso. |
| 3. NUNCA estimes ni inventes nombres, cédulas, edades u otros datos. |
| 4. Solo reporta lo que puedes ver con certeza absoluta. |
| 5. Si la imagen contiene una LISTA de varias personas, extrae TODAS en "personas_multiples". |
| 6. LIMPIA LOS NOMBRES: Si el nombre tiene casillas como "( )", "[]" o ubicaciones al lado (ej. "Caribe", "Los Corales"), quita los símbolos y mueve la ubicación al campo "notas" o "hospital". El campo "nombre" SOLO debe tener nombre y apellido. Ignora líneas que sean títulos como "Primera Lista" o "GUAIRA)". |
| 7. EXTRAE EL HOSPITAL CONTEXTUAL: Si el texto introductorio de la imagen menciona un hospital general o centro médico (ej. "Hospital Miguel Pérez Carreño (La Yaguara)"), aplícalo a TODOS los pacientes de la lista en su campo "hospital". |
| |
| Responde ÚNICAMENTE en JSON: |
| { |
| "tipo_imagen": "descripción de qué es", |
| "procesable": true, |
| "nombre_detectado": null, |
| "cedula_detectada": null, |
| "edad_estimada": null, |
| "descripcion_fisica": null, |
| "texto_visible": "todo texto legible o 'Ninguno legible'", |
| "condicion_aparente": null, |
| "observaciones": null, |
| "personas_multiples": [], |
| "resumen": "resumen en 1-2 oraciones" |
| } |
| |
| Cada objeto de "personas_multiples" debe tener: nombre, cedula, edad, hospital, condicion, notas (null si no aparecen). |
| Si la imagen no es procesable -> procesable: false y explica en observaciones.""" |
|
|
|
|
| PROMPT_PDF = """Eres un asistente de emergencia médica. Se te comparte texto extraído de un documento PDF. |
| Puede ser un registro hospitalario, lista de pacientes, acta médica u otro documento de emergencia. |
| |
| REGLAS ESTRICTAS: |
| 1. Extrae SOLO información que está escrita claramente en el texto. |
| 2. Si un campo no está -> usa null. NUNCA inventes datos. |
| 3. Si hay varias personas, extráelas TODAS en "personas_multiples". |
| 4. LIMPIA LOS NOMBRES: Elimina símbolos de casillas vacías "( )", "[]" y mueve ubicaciones (ej. "Los Corales") a "notas". Ignora encabezados y ruido. |
| |
| Texto del PDF: |
| {texto} |
| |
| Responde ÚNICAMENTE en JSON: |
| { |
| "tipo_documento": "descripción del documento", |
| "procesable": true, |
| "nombre_detectado": null, |
| "cedula_detectada": null, |
| "edad_estimada": null, |
| "hospital_detectado": null, |
| "condicion_detectada": null, |
| "observaciones": null, |
| "personas_multiples": [], |
| "resumen": "resumen en 1-2 oraciones" |
| } |
| |
| Cada objeto en "personas_multiples" debe tener: nombre, cedula, edad, hospital, condicion, notas (null si no están).""" |
|
|
|
|
| PROMPT_EXCEL = """Eres un asistente de emergencia. Se te envía una tabla en formato texto extraída de un Excel o CSV. |
| Puede ser una lista de pacientes en un hospital, registro de heridos u otro listado de emergencia. |
| |
| REGLAS ESTRICTAS: |
| 1. Extrae EXACTAMENTE lo que está en la tabla, sin inventar. |
| 2. Mapea las columnas al esquema de la base de datos. |
| 3. Si un campo no existe en la tabla -> null. |
| |
| Tabla: |
| {tabla} |
| |
| Responde ÚNICAMENTE en JSON: |
| { |
| "tipo_documento": "descripción del documento", |
| "procesable": true, |
| "columnas_detectadas": ["col1", "col2"], |
| "personas_multiples": [ |
| {"nombre": null, "cedula": null, "edad": null, "hospital": null, "condicion": null, "notas": null} |
| ], |
| "observaciones": null, |
| "resumen": "resumen en 1-2 oraciones" |
| }""" |
|
|
|
|
| PROMPT_BUSCAR = """Eres un asistente de emergencia. Una familia busca a un ser querido perdido. |
| Se te envía una imagen (foto, cédula, captura u otro tipo). |
| |
| REGLAS ESTRICTAS: |
| 1. Describe SOLO lo que puedes ver con certeza. |
| 2. NUNCA inventes rasgos, nombres ni datos. |
| 3. Si la imagen es poco clara -> dilo. |
| |
| Responde ÚNICAMENTE en JSON: |
| { |
| "tipo_imagen": "qué es la imagen", |
| "procesable": true, |
| "descripcion_busqueda": null, |
| "genero": "masculino / femenino / no determinable", |
| "edad_estimada": null, |
| "caracteristicas_clave": null, |
| "nombre_visible": null, |
| "cedula_visible": null, |
| "advertencia": null |
| }""" |
|
|
|
|
| |
|
|
| def _es_error_cuota(msg: str) -> bool: |
| m = (msg or "").lower() |
| return ("429" in m or "insufficient_quota" in m or "rate limit" in m |
| or "exceeded" in m or "balance" in m) |
|
|
|
|
| def _ocr_tesseract(path: str): |
| """OCR local gratuito (sin cuota). Devuelve el texto leído o None.""" |
| try: |
| import pytesseract |
| if not PIL_OK: |
| return None |
| img = PIL.Image.open(path) |
| try: |
| return pytesseract.image_to_string(img, lang="spa") |
| except Exception: |
| return pytesseract.image_to_string(img) |
| except Exception: |
| return None |
|
|
|
|
| def _parse_lista_texto(texto: str) -> list: |
| personas = [] |
| hospital_context = None |
| |
| |
| m_hosp = re.search(r"(Hospital\s+[A-Za-zÁÉÍÓÚáéíóúÑñ\s()]+|Clínica\s+[A-Za-zÁÉÍÓÚáéíóúÑñ\s()]+)", (texto or "")[:300], re.IGNORECASE) |
| if m_hosp: |
| hospital_context = m_hosp.group(1).strip() |
| |
| for linea in (texto or "").splitlines(): |
| l = linea.strip() |
| if not l: continue |
| |
| |
| l = re.sub(r"^\s*\d{1,3}\s*[.\)\-]\s*", "", l).strip() |
| |
| l = re.sub(r"\(\s*\)|\[\s*\]|\{\s*\}", "", l) |
| if not l: continue |
|
|
| edad = None |
| m_edad = re.search(r"(\d{1,3})\s*a\w{0,3}os?\b", l, re.IGNORECASE) |
| if m_edad: |
| edad = m_edad.group(1) |
| l = (l[:m_edad.start()] + " " + l[m_edad.end():]).strip() |
|
|
| cedula = None |
| |
| m_ced = re.search(r"\b(\d{1,3}(?:\.\d{3}){1,2}|\d{6,9})\b", l) |
| if m_ced: |
| cedula = m_ced.group(1).replace(".", "") |
| l = (l[:m_ced.start()] + " " + l[m_ced.end():]).strip() |
|
|
| nombre = re.sub(r"[.\-•|()]+", " ", l) |
| nombre = re.sub(r"\s+", " ", nombre).strip(" .,-()") |
|
|
| |
| locaciones_conocidas = r"\b(Caribe|Los Corales|Pariata|Maiquetia|Macuto|Hospital|Clinica|Clínica|Centro)\b" |
| notas = None |
| |
| |
| m_loc = re.search(locaciones_conocidas, nombre, re.IGNORECASE) |
| if m_loc: |
| notas = f"Ubicación detectada: {m_loc.group(1).title()}" |
| |
| nombre = re.sub(locaciones_conocidas, "", nombre, flags=re.IGNORECASE).strip() |
| nombre = re.sub(r"\s+", " ", nombre) |
|
|
| |
| ruido = r"LISTADO|PACIENTES|HOSPITAL|SIN FAMILIAR|NOMBRE|LISTA|GUAIRA|PAGINA|HOJA" |
| if re.search(ruido, nombre, re.IGNORECASE) and not edad and not cedula: |
| continue |
| |
| if len(nombre) < 3 and not cedula and not edad: |
| continue |
|
|
| personas.append({"nombre": nombre or None, "cedula": cedula, "edad": edad, |
| "hospital": hospital_context, |
| "condicion": "Sin información", "notas": notas}) |
| return personas |
|
|
|
|
| def _fallback_ocr(path: str) -> dict: |
| texto = _ocr_tesseract(path) |
| if not texto or not texto.strip(): |
| return { |
| "procesable": False, |
| "error": "La IA no está disponible y el OCR local no pudo leer la imagen. Revisa credenciales o Tesseract.", |
| "resumen": "Sin IA ni OCR disponible.", |
| } |
| personas = _parse_lista_texto(texto) |
| uno = personas[0] if len(personas) == 1 else {} |
| return { |
| "procesable": True, |
| "tipo_imagen": "Lista leída con OCR local (sin IA)", |
| "texto_visible": texto.strip(), |
| "personas_multiples": personas, |
| "nombre_detectado": uno.get("nombre"), |
| "cedula_detectada": uno.get("cedula"), |
| "edad_estimada": uno.get("edad"), |
| "nombre_visible": uno.get("nombre"), |
| "cedula_visible": uno.get("cedula"), |
| "descripcion_busqueda": "Texto leído por OCR local (sin IA).", |
| "resumen": f"⚠️ IA inactiva — se usó OCR local. Se leyeron {len(personas)} línea(s). Revisa bien los datos.", |
| "observaciones": "Extraído con OCR local (Tesseract).", |
| } |
|
|
|
|
| def _intentar_openrouter_imagen(path: str, prompt: str): |
| client = _get_client() |
| if not client or not PIL_OK: |
| return None |
| try: |
| PIL.Image.open(path).verify() |
| base64_img = _encode_image(path) |
| except Exception: |
| return {"procesable": False, "error": "Formato de imagen no soportado o archivo corrupto."} |
|
|
| ext = Path(path).suffix.lower().replace(".", "") |
| mime = "image/jpeg" if ext in ["jpg", "jpeg"] else f"image/{ext}" |
|
|
| for intento in range(3): |
| try: |
| response = client.chat.completions.create( |
| model=MODELO, |
| messages=[ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": prompt}, |
| { |
| "type": "image_url", |
| "image_url": {"url": f"data:{mime};base64,{base64_img}"} |
| } |
| ] |
| } |
| ] |
| ) |
| return _parse_json(response.choices[0].message.content) |
| except Exception as e: |
| msg = str(e) |
| if _es_error_cuota(msg): |
| return None |
| if intento < 2: |
| time.sleep(1.5 * (intento + 1)) |
| continue |
| return None |
| return None |
|
|
|
|
| def _procesar_imagen(path: str, prompt: str) -> dict: |
| resultado = _intentar_openrouter_imagen(path, prompt) |
| if resultado is not None and resultado.get("procesable") is not False: |
| return resultado |
| return _fallback_ocr(path) |
|
|
|
|
| def _procesar_pdf(path: str) -> dict: |
| if not PDF_OK: |
| return {"procesable": False, "error": "Librería pdfplumber no instalada."} |
| try: |
| textos = [] |
| with pdfplumber.open(path) as pdf: |
| for pagina in pdf.pages: |
| t = pagina.extract_text() |
| if t: textos.append(t.strip()) |
|
|
| if not textos: |
| return { |
| "procesable": False, |
| "error": "El PDF no contiene texto legible.", |
| "resumen": "PDF sin texto extraíble." |
| } |
|
|
| texto_completo = "\n\n".join(textos) |
| if len(texto_completo) > 8000: |
| texto_completo = texto_completo[:8000] + "\n...[texto truncado]" |
|
|
| client = _get_client() |
| if not client: |
| return _fallback_texto_pdf(texto_completo, "IA no configurada") |
|
|
| prompt = PROMPT_PDF.replace("{texto}", texto_completo) |
| |
| for intento in range(3): |
| try: |
| response = client.chat.completions.create( |
| model=MODELO, |
| messages=[{"role": "user", "content": prompt}] |
| ) |
| resultado = _parse_json(response.choices[0].message.content) |
| resultado["tipo_imagen"] = resultado.get("tipo_documento", "PDF") |
| return resultado |
| except Exception as e: |
| if _es_error_cuota(str(e)): |
| return _fallback_texto_pdf(texto_completo, "cuota agotada") |
| if intento < 2: |
| time.sleep(1.5 * (intento + 1)) |
| continue |
| return _fallback_texto_pdf(texto_completo, "IA no disponible") |
|
|
| except Exception as e: |
| return {"procesable": False, "error": f"Error al leer el PDF: {str(e)}"} |
|
|
|
|
| def _fallback_texto_pdf(texto: str, motivo: str) -> dict: |
| personas = _parse_lista_texto(texto) |
| if not personas: |
| return {"procesable": False, |
| "error": f"No se pudo procesar con IA ({motivo}) y no hay personas detectadas en texto libre.", |
| "resumen": "Sin datos extraíbles."} |
| return { |
| "procesable": True, |
| "tipo_imagen": "PDF leído sin IA (texto directo)", |
| "personas_multiples": personas, |
| "resumen": f"⚠️ {motivo}: se detectaron {len(personas)} personas directamente del texto. Revisa datos.", |
| "observaciones": "Extraído del texto del PDF sin IA.", |
| } |
|
|
|
|
| def _procesar_excel(path: str) -> dict: |
| if not PANDAS_OK: |
| return {"procesable": False, "error": "Librería pandas no disponible."} |
| try: |
| ext = Path(path).suffix.lower() |
| if ext == ".csv": |
| df = pd.read_csv(path, encoding="utf-8", encoding_errors="replace") |
| else: |
| df = pd.read_excel(path) |
|
|
| if df.empty: |
| return {"procesable": False, "error": "El archivo está vacío."} |
|
|
| if not _mapear_columnas(df.columns.tolist()): |
| df = _redetectar_encabezado(path, ext, df) |
|
|
| mapa_cols = _mapear_columnas(df.columns.tolist()) |
| if mapa_cols: |
| personas = [] |
| for _, fila in df.iterrows(): |
| p = {} |
| for campo, col_excel in mapa_cols.items(): |
| val = fila.get(col_excel) |
| p[campo] = str(val).strip() if val is not None and str(val) != "nan" else None |
| if p.get("nombre") or p.get("cedula"): |
| personas.append(p) |
| return { |
| "procesable": True, |
| "tipo_imagen": f"Excel/CSV con {len(personas)} registros", |
| "personas_multiples": personas, |
| "resumen": f"Se detectaron {len(personas)} personas directamente del archivo.", |
| "columnas_detectadas": list(mapa_cols.keys()) |
| } |
|
|
| tabla_texto = df.head(80).to_string(index=False) |
| if len(tabla_texto) > 6000: |
| tabla_texto = tabla_texto[:6000] + "\n...[tabla truncada]" |
|
|
| client = _get_client() |
| if not client: |
| return {"procesable": False, "error": "IA no configurada. Añade OPENROUTER_API_KEY."} |
|
|
| prompt = PROMPT_EXCEL.replace("{tabla}", tabla_texto) |
| response = client.chat.completions.create( |
| model=MODELO, |
| messages=[{"role": "user", "content": prompt}] |
| ) |
| resultado = _parse_json(response.choices[0].message.content) |
| resultado["tipo_imagen"] = resultado.get("tipo_documento", "Excel/CSV") |
| return resultado |
|
|
| except Exception as e: |
| return {"procesable": False, "error": f"Error al leer el archivo: {str(e)}"} |
|
|
|
|
| def _redetectar_encabezado(path: str, ext: str, df_orig): |
| try: |
| crudo = (pd.read_csv(path, header=None, nrows=15, encoding="utf-8", encoding_errors="replace") |
| if ext == ".csv" else pd.read_excel(path, header=None, nrows=15)) |
| for i in range(len(crudo)): |
| posibles = [str(x) for x in crudo.iloc[i].tolist()] |
| if _mapear_columnas(posibles): |
| return (pd.read_csv(path, header=i, encoding="utf-8", encoding_errors="replace") |
| if ext == ".csv" else pd.read_excel(path, header=i)) |
| except Exception: |
| pass |
| return df_orig |
|
|
|
|
| def _mapear_columnas(columnas: list) -> dict | None: |
| c = {str(col).lower().strip(): col for col in columnas} |
| mapa = {} |
| for campo, variantes in { |
| "nombre": ["nombre", "name", "paciente", "apellido y nombre", "nombres y apellidos", "apellidos y nombres"], |
| "cedula": ["cedula", "cédula", "ci", "documento", "id", "cedula de identidad", "cédula / id"], |
| "edad": ["edad", "age", "años"], |
| "hospital": ["hospital", "centro", "clinica", "clínica", "lugar", "ubicacion", "ubicación"], |
| "condicion":["condicion", "condición", "estado", "status", "diagnostico", "diagnóstico"], |
| "contacto": ["telefono", "teléfono", "tel", "celular", "contacto"], |
| "notas": ["notas", "observaciones", "nota", "comentarios", "descripcion", "descripción", "direccion"], |
| }.items(): |
| for v in variantes: |
| if v in c: |
| mapa[campo] = c[v] |
| break |
| if "nombre" in mapa or "cedula" in mapa: |
| return mapa |
| return None |
|
|
|
|
| |
|
|
| def analizar_archivo(path: str) -> dict: |
| if not path: |
| return {"procesable": False, "error": "No se recibió ningún archivo."} |
|
|
| tipo = _tipo_archivo(path) |
| if tipo == "imagen": |
| return _procesar_imagen(path, PROMPT_IMAGEN) |
| elif tipo == "pdf": |
| return _procesar_pdf(path) |
| elif tipo == "excel": |
| return _procesar_excel(path) |
| else: |
| ext = Path(path).suffix |
| return { |
| "procesable": False, |
| "error": f"Tipo de archivo '{ext}' no soportado." |
| } |
|
|
|
|
| def extraer_lista(path: str) -> dict: |
| res = analizar_archivo(path) |
| if not res.get("procesable", True): |
| return {"procesable": False, "personas": [], |
| "error": res.get("error") or res.get("observaciones") or "No procesable.", |
| "resumen": res.get("resumen", "")} |
|
|
| personas = list(res.get("personas_multiples") or []) |
|
|
| if not personas and (res.get("nombre_detectado") or res.get("cedula_detectada")): |
| personas = [{ |
| "nombre": res.get("nombre_detectado"), |
| "cedula": res.get("cedula_detectada"), |
| "edad": res.get("edad_estimada"), |
| "condicion": res.get("condicion_aparente") or res.get("condicion_detectada"), |
| "notas": res.get("observaciones"), |
| }] |
|
|
| limpias = [] |
| for p in personas: |
| nombre = _limpiar(p.get("nombre")) |
| cedula = _limpiar(p.get("cedula")) |
| if not nombre and not cedula: |
| continue |
| limpias.append({ |
| "nombre": nombre, |
| "cedula": cedula, |
| "edad": _edad_limpia(p.get("edad")), |
| "condicion": _limpiar(p.get("condicion")) or "Sin información", |
| "notas": _limpiar(p.get("notas")), |
| }) |
|
|
| return { |
| "procesable": True, |
| "personas": limpias, |
| "resumen": res.get("resumen", f"Se detectaron {len(limpias)} personas."), |
| "error": None, |
| } |
|
|
|
|
| def _limpiar(v): |
| if v in (None, "", "null", "None", "nan", "No legible", "No detectado"): |
| return None |
| return str(v).strip() |
|
|
|
|
| def _edad_limpia(v): |
| s = _limpiar(v) |
| if not s: return None |
| try: |
| import re as _re |
| m = _re.search(r"\d+", s) |
| return str(int(float(m.group()))) if m else None |
| except (TypeError, ValueError): |
| return s |
|
|
|
|
| def describir_para_busqueda(path: str) -> dict: |
| if not path: |
| return {"procesable": False, "error": "No se recibió ningún archivo."} |
| tipo = _tipo_archivo(path) |
| if tipo == "imagen": |
| return _procesar_imagen(path, PROMPT_BUSCAR) |
| return analizar_archivo(path) |
|
|
|
|
| def analizar_imagen_hospital(path: str) -> dict: |
| return analizar_archivo(path) |
|
|
| def describir_imagen_busqueda(path: str) -> dict: |
| return describir_para_busqueda(path) |
|
|