VIRTUALOMARDENT / app.py
Josedcape's picture
Update app.py
3b5430a verified
import os
import tempfile
import openai
from dotenv import load_dotenv
import PyPDF2
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
import pandas as pd
from fpdf import FPDF
import streamlit as st
import requests
from google.cloud import vision_v1
from google.cloud import texttospeech_v1 as texttospeech
import base64
from PIL import Image
import pytesseract
# Configurar la página primero
st.set_page_config(page_title="Galatea OMARDENT", layout="wide")
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
# Cargar las claves API desde el archivo .env
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
brevo_api_key = os.getenv("BREVO_API_KEY")
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "botidinamix-g.json"
# Verifica que las claves API están configuradas
if not openai_api_key:
st.error("No API key provided for OpenAI. Please set your API key in the .env file.")
else:
openai.api_key = openai_api_key
if not brevo_api_key:
st.error("No API key provided for Brevo. Please set your API key in the .env file.")
def extraer_texto_pdf(archivo):
texto = ""
if archivo:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(archivo.read())
temp_file_path = temp_file.name
try:
with open(temp_file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page in range(len(reader.pages)):
texto += reader.pages[page].extract_text()
except Exception as e:
st.error(f"Error al extraer texto del PDF: {e}")
finally:
os.unlink(temp_file_path)
return texto
def preprocesar_texto(texto):
tokens = word_tokenize(texto, language='spanish')
tokens = [word.lower() for word in tokens if word.isalpha()]
stopwords_es = set(stopwords.words('spanish'))
tokens = [word for word in tokens if word not in stopwords_es]
stemmer = SnowballStemmer('spanish')
tokens = [stemmer.stem(word) for word in tokens]
return " ".join(tokens)
def obtener_respuesta(pregunta, texto_preprocesado, modelo, temperatura=0.5, assistant_id="asst_4ZYvBvf4IUVQPjnugSZGLdV2", contexto=""):
try:
response = openai.ChatCompletion.create(
model=modelo,
messages=[
{"role": "system", "content": "Actua como Galatea la asistente de la clinica Odontologica OMARDENT y resuelve las inquietudes"},
{"role": "user", "content": f"{contexto}\n\n{pregunta}\n\nContexto: {texto_preprocesado}"}
],
temperature=temperatura
)
respuesta = response.choices[0].message['content'].strip()
# Configura la solicitud de síntesis de voz
client = texttospeech.TextToSpeechClient()
input_text = texttospeech.SynthesisInput(text=respuesta)
voice = texttospeech.VoiceSelectionParams(
language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
# Realiza la solicitud de síntesis de voz
response = client.synthesize_speech(
input=input_text, voice=voice, audio_config=audio_config
)
# Reproduce el audio en Streamlit
st.audio(response.audio_content, format="audio/mp3")
return respuesta
except openai.OpenAIError as e:
st.error(f"Error al comunicarse con OpenAI: {e}")
return "Lo siento, no puedo procesar tu solicitud en este momento."
except Exception as e:
st.error(f"Error al generar la respuesta y el audio: {e}")
return "Lo siento, ocurrió un error al procesar tu solicitud."
def guardar_en_txt(nombre_archivo, datos):
carpeta = "datos_guardados"
os.makedirs(carpeta, exist_ok=True)
ruta_archivo = os.path.join(carpeta, nombre_archivo)
try:
with open(ruta_archivo, 'a', encoding='utf-8') as archivo:
archivo.write(datos + "\n")
except Exception as e:
st.error(f"Error al guardar datos en el archivo: {e}")
return ruta_archivo
def cargar_desde_txt(nombre_archivo):
carpeta = "datos_guardados"
ruta_archivo = os.path.join(carpeta, nombre_archivo)
try:
if os.path.exists(ruta_archivo):
with open(ruta_archivo, 'r', encoding='utf-8') as archivo:
return archivo.read()
else:
st.warning("Archivo no encontrado.")
return ""
except Exception as e:
st.error(f"Error al cargar datos desde el archivo: {e}")
return ""
def listar_archivos_txt():
carpeta = "datos_guardados"
try:
if not os.path.exists(carpeta):
return []
archivos = [f for f in os.listdir(carpeta) if f.endswith('.txt')]
archivos_ordenados = sorted(archivos, key=lambda x: os.path.getctime(os.path.join(carpeta, x)), reverse=True)
return archivos_ordenados
except Exception as e:
st.error(f"Error al listar archivos: {e}")
return []
def generar_pdf(dataframe, titulo, filename):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt=titulo, ln=True, align='C')
for i, row in dataframe.iterrows():
row_text = ", ".join(f"{col}: {val}" for col, val in row.items())
pdf.cell(200, 10, txt=row_text, ln=True)
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
pdf.output(tmp_file.name)
return tmp_file.name
except Exception as e:
st.error(f"Error al generar PDF: {e}")
return None
def enviar_correo(destinatario, asunto, contenido):
url = "https://api.brevo.com/v3/smtp/email"
headers = {
"accept": "application/json",
"api-key": brevo_api_key,
"content-type": "application/json"
}
payload = {
"sender": {"email": "tu_correo@dominio.com"},
"to": [{"email": destinatario}],
"subject": asunto,
"htmlContent": contenido
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 201:
st.success(f"Correo enviado a {destinatario}")
else:
st.error(f"Error al enviar el correo: {response.text}")
except Exception as e:
st.error(f"Error al enviar el correo: {e}")
def enviar_whatsapp(numero, mensaje):
url = "https://api.brevo.com/v3/whatsapp/send"
headers = {
"accept": "application/json",
"api-key": brevo_api_key,
"content-type": "application/json"
}
payload = {
"recipient": {"number": numero},
"sender": {"number": "tu_numero_whatsapp"},
"content": mensaje
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 201:
st.success(f"Mensaje de WhatsApp enviado a {numero}")
else:
st.error(f"Error al enviar el mensaje de WhatsApp: {response.text}")
except Exception as e:
st.error(f"Error al enviar el mensaje de WhatsApp: {e}")
def analizar_disponibilidad(imagen):
client = vision_v1.ImageAnnotatorClient()
content = imagen.read()
image = vision_v1.Image(content=content)
response = client.text_detection(image=image)
if response.error.message:
st.error(f"Error en la interpretación de la imagen: {response.error.message}")
return None
texto = response.full_text_annotation.text
return texto
def asignar_cita(disponibilidad, fecha, hora, paciente, doctor):
# Aquí se debería implementar la lógica de asignación
# que verifique la disponibilidad y asigna la cita si es posible.
# Esta es una implementación simplificada.
if disponibilidad:
if "disponible" in disponibilidad.lower():
return f"Cita asignada para {paciente} con {doctor} el {fecha} a las {hora}."
else:
return "No hay disponibilidad en ese horario. Por favor, elija otro."
else:
return "No se pudo verificar la disponibilidad."
def flujo_agendamiento():
st.title("📅 Asistente de Agendamiento")
if 'disponibilidad' not in st.session_state:
st.session_state.disponibilidad = ""
imagen = st.file_uploader("Sube una imagen de la agenda para analizar disponibilidad", type=['png', 'jpg', 'jpeg'])
if imagen:
disponibilidad = analizar_disponibilidad(imagen)
st.session_state.disponibilidad = disponibilidad
st.write("### Disponibilidad Analizada:")
st.write(disponibilidad)
with st.form("asignar_cita_form"):
fecha = st.date_input("Fecha de la cita:")
hora = st.time_input("Hora de la cita:")
paciente = st.text_input("Nombre del Paciente:")
doctor = st.selectbox("Doctor:", ["Dr. Jose Daniel C", "Dr. Jose Omar C"])
submitted = st.form_submit_button("Asignar Cita")
if submitted:
respuesta = asignar_cita(st.session_state.disponibilidad, fecha, hora, paciente, doctor)
st.write(respuesta)
# Respuesta por voz
client = texttospeech.TextToSpeechClient()
synthesis_input = texttospeech.SynthesisInput(text=respuesta)
voice = texttospeech.VoiceSelectionParams(language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE)
audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3)
response = client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config)
st.audio(response.audio_content, format="audio/mp3")
def main():
if 'modelo' not in st.session_state:
st.session_state['modelo'] = "gpt-3.5-turbo"
if 'temperatura' not in st.session_state:
st.session_state['temperatura'] = 0.5
if 'mensajes_chat' not in st.session_state:
st.session_state['mensajes_chat'] = []
if 'transcripcion_voz' not in st.session_state:
st.session_state['transcripcion_voz'] = ""
if 'imagen_asistente' not in st.session_state:
st.session_state['imagen_asistente'] = None
if 'video_estado' not in st.session_state:
st.session_state['video_estado'] = 'paused'
if 'assistant_id' not in st.session_state:
st.session_state['assistant_id'] = 'asst_4ZYvBvf4IUVQPjnugSZGLdV2'
if 'presupuesto_texto' not in st.session_state:
st.session_state['presupuesto_texto'] = ''
if 'mostrar_chat' not in st.session_state:
st.session_state['mostrar_chat'] = False
if 'memoria' not in st.session_state:
st.session_state['memoria'] = {}
with open("assets/assets_instrucciones.pdf", "rb") as file:
texto_pdf = extraer_texto_pdf(file)
st.session_state['texto_preprocesado_pdf'] = preprocesar_texto(texto_pdf)
ruta_logo = os.path.join("assets", "assets_Logo Omardent.png")
if os.path.exists(ruta_logo):
st.sidebar.image(ruta_logo, use_column_width=True)
else:
st.sidebar.warning(f"Error: No se pudo encontrar la imagen en la ruta: {ruta_logo}")
st.sidebar.title("🤖 Galatea OMARDENT")
st.sidebar.markdown("---")
st.sidebar.subheader("🧠 Configuración del Modelo")
st.session_state['modelo'] = st.sidebar.selectbox(
"Selecciona el modelo:",
["gpt-3.5-turbo", "gpt-4", "gpt-4-32k", "gpt-4o"],
index=0,
key='modelo_selectbox'
)
st.sidebar.markdown("---")
st.session_state['temperatura'] = st.sidebar.slider(
"🌡️ Temperatura",
min_value=0.0, max_value=1.0,
value=st.session_state['temperatura'],
step=0.1,
key='temperatura_slider'
)
assistant_id = st.sidebar.text_input("Assistant ID", key="assistant_id", help="Introduce el Assistant ID del playground de OpenAI")
st.sidebar.markdown("---")
st.sidebar.subheader("🌟 Navegación")
lateral_page = st.sidebar.radio("Ir a", ["Página Principal", "Gestión de Trabajos", "Gestión de Insumos", "Registro de Radiografías", "Buscar Datos", "Notificaciones", "Recomendaciones", "Asistente de Presupuestos", "Comunicación", "Asistente de Agendamiento", "Interpretación de Imágenes"])
top_page = st.selectbox("Navegación Superior", ["Página Principal", "Galatea-Asistente"])
if top_page == "Galatea-Asistente":
mostrar_galatea_asistente()
else:
if lateral_page == "Página Principal":
mostrar_pagina_principal()
elif lateral_page == "Gestión de Trabajos":
flujo_laboratorio()
elif lateral_page == "Gestión de Insumos":
flujo_insumos()
elif lateral_page == "Registro de Radiografías":
flujo_radiografias()
elif lateral_page == "Buscar Datos":
buscar_datos_guardados()
elif lateral_page == "Notificaciones":
generar_notificaciones_pendientes()
elif lateral_page == "Recomendaciones":
mostrar_recomendaciones()
elif lateral_page == "Asistente de Presupuestos":
flujo_presupuestos()
elif lateral_page == "Comunicación":
st.write("Página de Comunicación")
elif lateral_page == "Asistente de Agendamiento":
flujo_agendamiento()
elif lateral_page == "Interpretación de Imágenes":
mostrar_interpretacion_imagen()
if __name__ == "__main__":
main()