Venezuela / ai_processor.py
Yofran23's picture
Actualización automática: ai_processor.py
ca84bcf verified
Raw
History Blame Contribute Delete
22 kB
import os
import re
import json
import time
import base64
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
try:
from openai import OpenAI
AI_AVAILABLE = True
except ImportError:
AI_AVAILABLE = False
try:
import PIL.Image
PIL_OK = True
except ImportError:
PIL_OK = False
try:
import pdfplumber
PDF_OK = True
except ImportError:
PDF_OK = False
try:
import pandas as pd
PANDAS_OK = True
except ImportError:
PANDAS_OK = False
_CLIENT = None
# Modelo de IA a través de Groq (Gratuito de emergencia)
MODELO = os.environ.get("AI_MODEL", "llama-3.2-90b-vision-preview")
TIPOS_IMAGEN = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp", ".tiff", ".tif"}
TIPOS_PDF = {".pdf"}
TIPOS_EXCEL = {".xlsx", ".xls", ".csv", ".ods"}
def _get_client():
global _CLIENT
if not AI_AVAILABLE:
return None
if _CLIENT is None:
key = os.environ.get("GROQ_API_KEY", "")
if not key:
return None
_CLIENT = OpenAI(
base_url="https://api.groq.com/openai/v1",
api_key=key,
)
return _CLIENT
def _tipo_archivo(path: str) -> str:
ext = Path(path).suffix.lower()
if ext in TIPOS_IMAGEN:
return "imagen"
if ext in TIPOS_PDF:
return "pdf"
if ext in TIPOS_EXCEL:
return "excel"
return "desconocido"
def _parse_json(text: str) -> dict:
clean = (text or "").strip()
if "```" in clean:
for bloque in clean.split("```")[1::2]:
candidato = bloque.lstrip("json").strip()
try:
return json.loads(candidato)
except Exception:
continue
try:
return json.loads(clean)
except Exception:
return {"error": "Respuesta de IA en formato inesperado.", "texto_libre": clean}
def _encode_image(image_path: str) -> str:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# ─── Prompts ──────────────────────────────────────────────────────────────────
PROMPT_IMAGEN = """Eres un asistente de emergencia médica. Se te envía una imagen que puede ser:
foto de un paciente, cédula de identidad, brazalete hospitalario, lista impresa, captura de WhatsApp u otro.
REGLAS ESTRICTAS — sin excepción:
1. Si no puedes leer un texto -> escribe "No legible". NUNCA inventes.
2. Si la imagen es borrosa o ilegible -> di exactamente eso.
3. NUNCA estimes ni inventes nombres, cédulas, edades u otros datos.
4. Solo reporta lo que puedes ver con certeza absoluta.
5. Si la imagen contiene una LISTA de varias personas, extrae TODAS en "personas_multiples".
6. LIMPIA LOS NOMBRES: Si el nombre tiene casillas como "( )", "[]" o ubicaciones al lado (ej. "Caribe", "Los Corales"), quita los símbolos y mueve la ubicación al campo "notas" o "hospital". El campo "nombre" SOLO debe tener nombre y apellido. Ignora líneas que sean títulos como "Primera Lista" o "GUAIRA)".
7. EXTRAE EL HOSPITAL CONTEXTUAL: Si el texto introductorio de la imagen menciona un hospital general o centro médico (ej. "Hospital Miguel Pérez Carreño (La Yaguara)"), aplícalo a TODOS los pacientes de la lista en su campo "hospital".
Responde ÚNICAMENTE en JSON:
{
"tipo_imagen": "descripción de qué es",
"procesable": true,
"nombre_detectado": null,
"cedula_detectada": null,
"edad_estimada": null,
"descripcion_fisica": null,
"texto_visible": "todo texto legible o 'Ninguno legible'",
"condicion_aparente": null,
"observaciones": null,
"personas_multiples": [],
"resumen": "resumen en 1-2 oraciones"
}
Cada objeto de "personas_multiples" debe tener: nombre, cedula, edad, hospital, condicion, notas (null si no aparecen).
Si la imagen no es procesable -> procesable: false y explica en observaciones."""
PROMPT_PDF = """Eres un asistente de emergencia médica. Se te comparte texto extraído de un documento PDF.
Puede ser un registro hospitalario, lista de pacientes, acta médica u otro documento de emergencia.
REGLAS ESTRICTAS:
1. Extrae SOLO información que está escrita claramente en el texto.
2. Si un campo no está -> usa null. NUNCA inventes datos.
3. Si hay varias personas, extráelas TODAS en "personas_multiples".
4. LIMPIA LOS NOMBRES: Elimina símbolos de casillas vacías "( )", "[]" y mueve ubicaciones (ej. "Los Corales") a "notas". Ignora encabezados y ruido.
Texto del PDF:
{texto}
Responde ÚNICAMENTE en JSON:
{
"tipo_documento": "descripción del documento",
"procesable": true,
"nombre_detectado": null,
"cedula_detectada": null,
"edad_estimada": null,
"hospital_detectado": null,
"condicion_detectada": null,
"observaciones": null,
"personas_multiples": [],
"resumen": "resumen en 1-2 oraciones"
}
Cada objeto en "personas_multiples" debe tener: nombre, cedula, edad, hospital, condicion, notas (null si no están)."""
PROMPT_EXCEL = """Eres un asistente de emergencia. Se te envía una tabla en formato texto extraída de un Excel o CSV.
Puede ser una lista de pacientes en un hospital, registro de heridos u otro listado de emergencia.
REGLAS ESTRICTAS:
1. Extrae EXACTAMENTE lo que está en la tabla, sin inventar.
2. Mapea las columnas al esquema de la base de datos.
3. Si un campo no existe en la tabla -> null.
Tabla:
{tabla}
Responde ÚNICAMENTE en JSON:
{
"tipo_documento": "descripción del documento",
"procesable": true,
"columnas_detectadas": ["col1", "col2"],
"personas_multiples": [
{"nombre": null, "cedula": null, "edad": null, "hospital": null, "condicion": null, "notas": null}
],
"observaciones": null,
"resumen": "resumen en 1-2 oraciones"
}"""
PROMPT_BUSCAR = """Eres un asistente de emergencia. Una familia busca a un ser querido perdido.
Se te envía una imagen (foto, cédula, captura u otro tipo).
REGLAS ESTRICTAS:
1. Describe SOLO lo que puedes ver con certeza.
2. NUNCA inventes rasgos, nombres ni datos.
3. Si la imagen es poco clara -> dilo.
Responde ÚNICAMENTE en JSON:
{
"tipo_imagen": "qué es la imagen",
"procesable": true,
"descripcion_busqueda": null,
"genero": "masculino / femenino / no determinable",
"edad_estimada": null,
"caracteristicas_clave": null,
"nombre_visible": null,
"cedula_visible": null,
"advertencia": null
}"""
# ─── Procesadores por tipo ────────────────────────────────────────────────────
def _es_error_cuota(msg: str) -> bool:
m = (msg or "").lower()
return ("429" in m or "insufficient_quota" in m or "rate limit" in m
or "exceeded" in m or "balance" in m)
def _ocr_tesseract(path: str):
"""OCR local gratuito (sin cuota). Devuelve el texto leído o None."""
try:
import pytesseract
if not PIL_OK:
return None
img = PIL.Image.open(path)
try:
return pytesseract.image_to_string(img, lang="spa")
except Exception:
return pytesseract.image_to_string(img)
except Exception:
return None
def _parse_lista_texto(texto: str) -> list:
personas = []
hospital_context = None
# Buscar posible nombre de hospital en el texto introductorio (primeros 300 caracteres)
m_hosp = re.search(r"(Hospital\s+[A-Za-zÁÉÍÓÚáéíóúÑñ\s()]+|Clínica\s+[A-Za-zÁÉÍÓÚáéíóúÑñ\s()]+)", (texto or "")[:300], re.IGNORECASE)
if m_hosp:
hospital_context = m_hosp.group(1).strip()
for linea in (texto or "").splitlines():
l = linea.strip()
if not l: continue
# Eliminar números de viñeta al inicio
l = re.sub(r"^\s*\d{1,3}\s*[.\)\-]\s*", "", l).strip()
# Eliminar marcas de casillas vacías como ( ) o [ ]
l = re.sub(r"\(\s*\)|\[\s*\]|\{\s*\}", "", l)
if not l: continue
edad = None
m_edad = re.search(r"(\d{1,3})\s*a\w{0,3}os?\b", l, re.IGNORECASE)
if m_edad:
edad = m_edad.group(1)
l = (l[:m_edad.start()] + " " + l[m_edad.end():]).strip()
cedula = None
# Soportar cédulas con puntos (ej 17.856.045) o sin puntos (ej 17856045)
m_ced = re.search(r"\b(\d{1,3}(?:\.\d{3}){1,2}|\d{6,9})\b", l)
if m_ced:
cedula = m_ced.group(1).replace(".", "")
l = (l[:m_ced.start()] + " " + l[m_ced.end():]).strip()
nombre = re.sub(r"[.\-•|()]+", " ", l)
nombre = re.sub(r"\s+", " ", nombre).strip(" .,-()")
# Extraer locaciones como "Caribe", "Los Corales", etc., del nombre y pasarlas a notas
locaciones_conocidas = r"\b(Caribe|Los Corales|Pariata|Maiquetia|Macuto|Hospital|Clinica|Clínica|Centro)\b"
notas = None
# Buscar locaciones
m_loc = re.search(locaciones_conocidas, nombre, re.IGNORECASE)
if m_loc:
notas = f"Ubicación detectada: {m_loc.group(1).title()}"
# Remover la locación del nombre
nombre = re.sub(locaciones_conocidas, "", nombre, flags=re.IGNORECASE).strip()
nombre = re.sub(r"\s+", " ", nombre) # limpiar dobles espacios que hayan quedado
# Ignorar encabezados o ruido obvio
ruido = r"LISTADO|PACIENTES|HOSPITAL|SIN FAMILIAR|NOMBRE|LISTA|GUAIRA|PAGINA|HOJA"
if re.search(ruido, nombre, re.IGNORECASE) and not edad and not cedula:
continue
# Si quedó algo muy corto o números sueltos
if len(nombre) < 3 and not cedula and not edad:
continue
personas.append({"nombre": nombre or None, "cedula": cedula, "edad": edad,
"hospital": hospital_context,
"condicion": "Sin información", "notas": notas})
return personas
def _fallback_ocr(path: str) -> dict:
texto = _ocr_tesseract(path)
if not texto or not texto.strip():
return {
"procesable": False,
"error": "La IA no está disponible y el OCR local no pudo leer la imagen. Revisa credenciales o Tesseract.",
"resumen": "Sin IA ni OCR disponible.",
}
personas = _parse_lista_texto(texto)
uno = personas[0] if len(personas) == 1 else {}
return {
"procesable": True,
"tipo_imagen": "Lista leída con OCR local (sin IA)",
"texto_visible": texto.strip(),
"personas_multiples": personas,
"nombre_detectado": uno.get("nombre"),
"cedula_detectada": uno.get("cedula"),
"edad_estimada": uno.get("edad"),
"nombre_visible": uno.get("nombre"),
"cedula_visible": uno.get("cedula"),
"descripcion_busqueda": "Texto leído por OCR local (sin IA).",
"resumen": f"⚠️ IA inactiva — se usó OCR local. Se leyeron {len(personas)} línea(s). Revisa bien los datos.",
"observaciones": "Extraído con OCR local (Tesseract).",
}
def _intentar_openrouter_imagen(path: str, prompt: str):
client = _get_client()
if not client or not PIL_OK:
return None
try:
PIL.Image.open(path).verify()
base64_img = _encode_image(path)
except Exception:
return {"procesable": False, "error": "Formato de imagen no soportado o archivo corrupto."}
ext = Path(path).suffix.lower().replace(".", "")
mime = "image/jpeg" if ext in ["jpg", "jpeg"] else f"image/{ext}"
for intento in range(3):
try:
response = client.chat.completions.create(
model=MODELO,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{base64_img}"}
}
]
}
]
)
return _parse_json(response.choices[0].message.content)
except Exception as e:
msg = str(e)
if _es_error_cuota(msg):
return None
if intento < 2:
time.sleep(1.5 * (intento + 1))
continue
return None
return None
def _procesar_imagen(path: str, prompt: str) -> dict:
resultado = _intentar_openrouter_imagen(path, prompt)
if resultado is not None and resultado.get("procesable") is not False:
return resultado
return _fallback_ocr(path)
def _procesar_pdf(path: str) -> dict:
if not PDF_OK:
return {"procesable": False, "error": "Librería pdfplumber no instalada."}
try:
textos = []
with pdfplumber.open(path) as pdf:
for pagina in pdf.pages:
t = pagina.extract_text()
if t: textos.append(t.strip())
if not textos:
return {
"procesable": False,
"error": "El PDF no contiene texto legible.",
"resumen": "PDF sin texto extraíble."
}
texto_completo = "\n\n".join(textos)
if len(texto_completo) > 8000:
texto_completo = texto_completo[:8000] + "\n...[texto truncado]"
client = _get_client()
if not client:
return _fallback_texto_pdf(texto_completo, "IA no configurada")
prompt = PROMPT_PDF.replace("{texto}", texto_completo)
for intento in range(3):
try:
response = client.chat.completions.create(
model=MODELO,
messages=[{"role": "user", "content": prompt}]
)
resultado = _parse_json(response.choices[0].message.content)
resultado["tipo_imagen"] = resultado.get("tipo_documento", "PDF")
return resultado
except Exception as e:
if _es_error_cuota(str(e)):
return _fallback_texto_pdf(texto_completo, "cuota agotada")
if intento < 2:
time.sleep(1.5 * (intento + 1))
continue
return _fallback_texto_pdf(texto_completo, "IA no disponible")
except Exception as e:
return {"procesable": False, "error": f"Error al leer el PDF: {str(e)}"}
def _fallback_texto_pdf(texto: str, motivo: str) -> dict:
personas = _parse_lista_texto(texto)
if not personas:
return {"procesable": False,
"error": f"No se pudo procesar con IA ({motivo}) y no hay personas detectadas en texto libre.",
"resumen": "Sin datos extraíbles."}
return {
"procesable": True,
"tipo_imagen": "PDF leído sin IA (texto directo)",
"personas_multiples": personas,
"resumen": f"⚠️ {motivo}: se detectaron {len(personas)} personas directamente del texto. Revisa datos.",
"observaciones": "Extraído del texto del PDF sin IA.",
}
def _procesar_excel(path: str) -> dict:
if not PANDAS_OK:
return {"procesable": False, "error": "Librería pandas no disponible."}
try:
ext = Path(path).suffix.lower()
if ext == ".csv":
df = pd.read_csv(path, encoding="utf-8", encoding_errors="replace")
else:
df = pd.read_excel(path)
if df.empty:
return {"procesable": False, "error": "El archivo está vacío."}
if not _mapear_columnas(df.columns.tolist()):
df = _redetectar_encabezado(path, ext, df)
mapa_cols = _mapear_columnas(df.columns.tolist())
if mapa_cols:
personas = []
for _, fila in df.iterrows():
p = {}
for campo, col_excel in mapa_cols.items():
val = fila.get(col_excel)
p[campo] = str(val).strip() if val is not None and str(val) != "nan" else None
if p.get("nombre") or p.get("cedula"):
personas.append(p)
return {
"procesable": True,
"tipo_imagen": f"Excel/CSV con {len(personas)} registros",
"personas_multiples": personas,
"resumen": f"Se detectaron {len(personas)} personas directamente del archivo.",
"columnas_detectadas": list(mapa_cols.keys())
}
tabla_texto = df.head(80).to_string(index=False)
if len(tabla_texto) > 6000:
tabla_texto = tabla_texto[:6000] + "\n...[tabla truncada]"
client = _get_client()
if not client:
return {"procesable": False, "error": "IA no configurada. Añade OPENROUTER_API_KEY."}
prompt = PROMPT_EXCEL.replace("{tabla}", tabla_texto)
response = client.chat.completions.create(
model=MODELO,
messages=[{"role": "user", "content": prompt}]
)
resultado = _parse_json(response.choices[0].message.content)
resultado["tipo_imagen"] = resultado.get("tipo_documento", "Excel/CSV")
return resultado
except Exception as e:
return {"procesable": False, "error": f"Error al leer el archivo: {str(e)}"}
def _redetectar_encabezado(path: str, ext: str, df_orig):
try:
crudo = (pd.read_csv(path, header=None, nrows=15, encoding="utf-8", encoding_errors="replace")
if ext == ".csv" else pd.read_excel(path, header=None, nrows=15))
for i in range(len(crudo)):
posibles = [str(x) for x in crudo.iloc[i].tolist()]
if _mapear_columnas(posibles):
return (pd.read_csv(path, header=i, encoding="utf-8", encoding_errors="replace")
if ext == ".csv" else pd.read_excel(path, header=i))
except Exception:
pass
return df_orig
def _mapear_columnas(columnas: list) -> dict | None:
c = {str(col).lower().strip(): col for col in columnas}
mapa = {}
for campo, variantes in {
"nombre": ["nombre", "name", "paciente", "apellido y nombre", "nombres y apellidos", "apellidos y nombres"],
"cedula": ["cedula", "cédula", "ci", "documento", "id", "cedula de identidad", "cédula / id"],
"edad": ["edad", "age", "años"],
"hospital": ["hospital", "centro", "clinica", "clínica", "lugar", "ubicacion", "ubicación"],
"condicion":["condicion", "condición", "estado", "status", "diagnostico", "diagnóstico"],
"contacto": ["telefono", "teléfono", "tel", "celular", "contacto"],
"notas": ["notas", "observaciones", "nota", "comentarios", "descripcion", "descripción", "direccion"],
}.items():
for v in variantes:
if v in c:
mapa[campo] = c[v]
break
if "nombre" in mapa or "cedula" in mapa:
return mapa
return None
# ─── Funciones públicas ───────────────────────────────────────────────────────
def analizar_archivo(path: str) -> dict:
if not path:
return {"procesable": False, "error": "No se recibió ningún archivo."}
tipo = _tipo_archivo(path)
if tipo == "imagen":
return _procesar_imagen(path, PROMPT_IMAGEN)
elif tipo == "pdf":
return _procesar_pdf(path)
elif tipo == "excel":
return _procesar_excel(path)
else:
ext = Path(path).suffix
return {
"procesable": False,
"error": f"Tipo de archivo '{ext}' no soportado."
}
def extraer_lista(path: str) -> dict:
res = analizar_archivo(path)
if not res.get("procesable", True):
return {"procesable": False, "personas": [],
"error": res.get("error") or res.get("observaciones") or "No procesable.",
"resumen": res.get("resumen", "")}
personas = list(res.get("personas_multiples") or [])
if not personas and (res.get("nombre_detectado") or res.get("cedula_detectada")):
personas = [{
"nombre": res.get("nombre_detectado"),
"cedula": res.get("cedula_detectada"),
"edad": res.get("edad_estimada"),
"condicion": res.get("condicion_aparente") or res.get("condicion_detectada"),
"notas": res.get("observaciones"),
}]
limpias = []
for p in personas:
nombre = _limpiar(p.get("nombre"))
cedula = _limpiar(p.get("cedula"))
if not nombre and not cedula:
continue
limpias.append({
"nombre": nombre,
"cedula": cedula,
"edad": _edad_limpia(p.get("edad")),
"condicion": _limpiar(p.get("condicion")) or "Sin información",
"notas": _limpiar(p.get("notas")),
})
return {
"procesable": True,
"personas": limpias,
"resumen": res.get("resumen", f"Se detectaron {len(limpias)} personas."),
"error": None,
}
def _limpiar(v):
if v in (None, "", "null", "None", "nan", "No legible", "No detectado"):
return None
return str(v).strip()
def _edad_limpia(v):
s = _limpiar(v)
if not s: return None
try:
import re as _re
m = _re.search(r"\d+", s)
return str(int(float(m.group()))) if m else None
except (TypeError, ValueError):
return s
def describir_para_busqueda(path: str) -> dict:
if not path:
return {"procesable": False, "error": "No se recibió ningún archivo."}
tipo = _tipo_archivo(path)
if tipo == "imagen":
return _procesar_imagen(path, PROMPT_BUSCAR)
return analizar_archivo(path)
def analizar_imagen_hospital(path: str) -> dict:
return analizar_archivo(path)
def describir_imagen_busqueda(path: str) -> dict:
return describir_para_busqueda(path)