import os import tempfile import openai from dotenv import load_dotenv import PyPDF2 import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import SnowballStemmer import pandas as pd from fpdf import FPDF import streamlit as st import requests from google.cloud import vision_v1 from google.cloud import texttospeech_v1 as texttospeech import base64 from PIL import Image import pytesseract # Configurar la página primero st.set_page_config(page_title="Galatea OMARDENT", layout="wide") nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) # Cargar las claves API desde el archivo .env load_dotenv() openai_api_key = os.getenv("OPENAI_API_KEY") brevo_api_key = os.getenv("BREVO_API_KEY") os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "botidinamix-g.json" # Verifica que las claves API están configuradas if not openai_api_key: st.error("No API key provided for OpenAI. Please set your API key in the .env file.") else: openai.api_key = openai_api_key if not brevo_api_key: st.error("No API key provided for Brevo. Please set your API key in the .env file.") def extraer_texto_pdf(archivo): texto = "" if archivo: with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(archivo.read()) temp_file_path = temp_file.name try: with open(temp_file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in range(len(reader.pages)): texto += reader.pages[page].extract_text() except Exception as e: st.error(f"Error al extraer texto del PDF: {e}") finally: os.unlink(temp_file_path) return texto def preprocesar_texto(texto): tokens = word_tokenize(texto, language='spanish') tokens = [word.lower() for word in tokens if word.isalpha()] stopwords_es = set(stopwords.words('spanish')) tokens = [word for word in tokens if word not in stopwords_es] stemmer = SnowballStemmer('spanish') tokens = [stemmer.stem(word) for word in tokens] return " ".join(tokens) def obtener_respuesta(pregunta, texto_preprocesado, modelo, temperatura=0.5, assistant_id="asst_4ZYvBvf4IUVQPjnugSZGLdV2", contexto=""): try: response = openai.ChatCompletion.create( model=modelo, messages=[ {"role": "system", "content": "Actua como Galatea la asistente de la clinica Odontologica OMARDENT y resuelve las inquietudes"}, {"role": "user", "content": f"{contexto}\n\n{pregunta}\n\nContexto: {texto_preprocesado}"} ], temperature=temperatura ) respuesta = response.choices[0].message['content'].strip() # Configura la solicitud de síntesis de voz client = texttospeech.TextToSpeechClient() input_text = texttospeech.SynthesisInput(text=respuesta) voice = texttospeech.VoiceSelectionParams( language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE ) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3 ) # Realiza la solicitud de síntesis de voz response = client.synthesize_speech( input=input_text, voice=voice, audio_config=audio_config ) # Reproduce el audio en Streamlit st.audio(response.audio_content, format="audio/mp3") return respuesta except openai.OpenAIError as e: st.error(f"Error al comunicarse con OpenAI: {e}") return "Lo siento, no puedo procesar tu solicitud en este momento." except Exception as e: st.error(f"Error al generar la respuesta y el audio: {e}") return "Lo siento, ocurrió un error al procesar tu solicitud." def guardar_en_txt(nombre_archivo, datos): carpeta = "datos_guardados" os.makedirs(carpeta, exist_ok=True) ruta_archivo = os.path.join(carpeta, nombre_archivo) try: with open(ruta_archivo, 'a', encoding='utf-8') as archivo: archivo.write(datos + "\n") except Exception as e: st.error(f"Error al guardar datos en el archivo: {e}") return ruta_archivo def cargar_desde_txt(nombre_archivo): carpeta = "datos_guardados" ruta_archivo = os.path.join(carpeta, nombre_archivo) try: if os.path.exists(ruta_archivo): with open(ruta_archivo, 'r', encoding='utf-8') as archivo: return archivo.read() else: st.warning("Archivo no encontrado.") return "" except Exception as e: st.error(f"Error al cargar datos desde el archivo: {e}") return "" def listar_archivos_txt(): carpeta = "datos_guardados" try: if not os.path.exists(carpeta): return [] archivos = [f for f in os.listdir(carpeta) if f.endswith('.txt')] archivos_ordenados = sorted(archivos, key=lambda x: os.path.getctime(os.path.join(carpeta, x)), reverse=True) return archivos_ordenados except Exception as e: st.error(f"Error al listar archivos: {e}") return [] def generar_pdf(dataframe, titulo, filename): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.cell(200, 10, txt=titulo, ln=True, align='C') for i, row in dataframe.iterrows(): row_text = ", ".join(f"{col}: {val}" for col, val in row.items()) pdf.cell(200, 10, txt=row_text, ln=True) try: with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: pdf.output(tmp_file.name) return tmp_file.name except Exception as e: st.error(f"Error al generar PDF: {e}") return None def enviar_correo(destinatario, asunto, contenido): url = "https://api.brevo.com/v3/smtp/email" headers = { "accept": "application/json", "api-key": brevo_api_key, "content-type": "application/json" } payload = { "sender": {"email": "tu_correo@dominio.com"}, "to": [{"email": destinatario}], "subject": asunto, "htmlContent": contenido } try: response = requests.post(url, json=payload, headers=headers) if response.status_code == 201: st.success(f"Correo enviado a {destinatario}") else: st.error(f"Error al enviar el correo: {response.text}") except Exception as e: st.error(f"Error al enviar el correo: {e}") def enviar_whatsapp(numero, mensaje): url = "https://api.brevo.com/v3/whatsapp/send" headers = { "accept": "application/json", "api-key": brevo_api_key, "content-type": "application/json" } payload = { "recipient": {"number": numero}, "sender": {"number": "tu_numero_whatsapp"}, "content": mensaje } try: response = requests.post(url, json=payload, headers=headers) if response.status_code == 201: st.success(f"Mensaje de WhatsApp enviado a {numero}") else: st.error(f"Error al enviar el mensaje de WhatsApp: {response.text}") except Exception as e: st.error(f"Error al enviar el mensaje de WhatsApp: {e}") def analizar_disponibilidad(imagen): client = vision_v1.ImageAnnotatorClient() content = imagen.read() image = vision_v1.Image(content=content) response = client.text_detection(image=image) if response.error.message: st.error(f"Error en la interpretación de la imagen: {response.error.message}") return None texto = response.full_text_annotation.text return texto def asignar_cita(disponibilidad, fecha, hora, paciente, doctor): # Aquí se debería implementar la lógica de asignación # que verifique la disponibilidad y asigna la cita si es posible. # Esta es una implementación simplificada. if disponibilidad: if "disponible" in disponibilidad.lower(): return f"Cita asignada para {paciente} con {doctor} el {fecha} a las {hora}." else: return "No hay disponibilidad en ese horario. Por favor, elija otro." else: return "No se pudo verificar la disponibilidad." def flujo_agendamiento(): st.title("📅 Asistente de Agendamiento") if 'disponibilidad' not in st.session_state: st.session_state.disponibilidad = "" imagen = st.file_uploader("Sube una imagen de la agenda para analizar disponibilidad", type=['png', 'jpg', 'jpeg']) if imagen: disponibilidad = analizar_disponibilidad(imagen) st.session_state.disponibilidad = disponibilidad st.write("### Disponibilidad Analizada:") st.write(disponibilidad) with st.form("asignar_cita_form"): fecha = st.date_input("Fecha de la cita:") hora = st.time_input("Hora de la cita:") paciente = st.text_input("Nombre del Paciente:") doctor = st.selectbox("Doctor:", ["Dr. Jose Daniel C", "Dr. Jose Omar C"]) submitted = st.form_submit_button("Asignar Cita") if submitted: respuesta = asignar_cita(st.session_state.disponibilidad, fecha, hora, paciente, doctor) st.write(respuesta) # Respuesta por voz client = texttospeech.TextToSpeechClient() synthesis_input = texttospeech.SynthesisInput(text=respuesta) voice = texttospeech.VoiceSelectionParams(language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE) audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) response = client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config) st.audio(response.audio_content, format="audio/mp3") def main(): if 'modelo' not in st.session_state: st.session_state['modelo'] = "gpt-3.5-turbo" if 'temperatura' not in st.session_state: st.session_state['temperatura'] = 0.5 if 'mensajes_chat' not in st.session_state: st.session_state['mensajes_chat'] = [] if 'transcripcion_voz' not in st.session_state: st.session_state['transcripcion_voz'] = "" if 'imagen_asistente' not in st.session_state: st.session_state['imagen_asistente'] = None if 'video_estado' not in st.session_state: st.session_state['video_estado'] = 'paused' if 'assistant_id' not in st.session_state: st.session_state['assistant_id'] = 'asst_4ZYvBvf4IUVQPjnugSZGLdV2' if 'presupuesto_texto' not in st.session_state: st.session_state['presupuesto_texto'] = '' if 'mostrar_chat' not in st.session_state: st.session_state['mostrar_chat'] = False if 'memoria' not in st.session_state: st.session_state['memoria'] = {} with open("assets/assets_instrucciones.pdf", "rb") as file: texto_pdf = extraer_texto_pdf(file) st.session_state['texto_preprocesado_pdf'] = preprocesar_texto(texto_pdf) ruta_logo = os.path.join("assets", "assets_Logo Omardent.png") if os.path.exists(ruta_logo): st.sidebar.image(ruta_logo, use_column_width=True) else: st.sidebar.warning(f"Error: No se pudo encontrar la imagen en la ruta: {ruta_logo}") st.sidebar.title("🤖 Galatea OMARDENT") st.sidebar.markdown("---") st.sidebar.subheader("🧠 Configuración del Modelo") st.session_state['modelo'] = st.sidebar.selectbox( "Selecciona el modelo:", ["gpt-3.5-turbo", "gpt-4", "gpt-4-32k", "gpt-4o"], index=0, key='modelo_selectbox' ) st.sidebar.markdown("---") st.session_state['temperatura'] = st.sidebar.slider( "🌡️ Temperatura", min_value=0.0, max_value=1.0, value=st.session_state['temperatura'], step=0.1, key='temperatura_slider' ) assistant_id = st.sidebar.text_input("Assistant ID", key="assistant_id", help="Introduce el Assistant ID del playground de OpenAI") st.sidebar.markdown("---") st.sidebar.subheader("🌟 Navegación") lateral_page = st.sidebar.radio("Ir a", ["Página Principal", "Gestión de Trabajos", "Gestión de Insumos", "Registro de Radiografías", "Buscar Datos", "Notificaciones", "Recomendaciones", "Asistente de Presupuestos", "Comunicación", "Asistente de Agendamiento", "Interpretación de Imágenes"]) top_page = st.selectbox("Navegación Superior", ["Página Principal", "Galatea-Asistente"]) if top_page == "Galatea-Asistente": mostrar_galatea_asistente() else: if lateral_page == "Página Principal": mostrar_pagina_principal() elif lateral_page == "Gestión de Trabajos": flujo_laboratorio() elif lateral_page == "Gestión de Insumos": flujo_insumos() elif lateral_page == "Registro de Radiografías": flujo_radiografias() elif lateral_page == "Buscar Datos": buscar_datos_guardados() elif lateral_page == "Notificaciones": generar_notificaciones_pendientes() elif lateral_page == "Recomendaciones": mostrar_recomendaciones() elif lateral_page == "Asistente de Presupuestos": flujo_presupuestos() elif lateral_page == "Comunicación": st.write("Página de Comunicación") elif lateral_page == "Asistente de Agendamiento": flujo_agendamiento() elif lateral_page == "Interpretación de Imágenes": mostrar_interpretacion_imagen() if __name__ == "__main__": main()