Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import openai | |
| from dotenv import load_dotenv | |
| import PyPDF2 | |
| import nltk | |
| from nltk.tokenize import word_tokenize | |
| from nltk.corpus import stopwords | |
| from nltk.stem import SnowballStemmer | |
| import pandas as pd | |
| from fpdf import FPDF | |
| import streamlit as st | |
| import requests | |
| from google.cloud import vision_v1 | |
| from google.cloud import texttospeech_v1 as texttospeech | |
| import base64 | |
| from PIL import Image | |
| import pytesseract | |
| # Configurar la página primero | |
| st.set_page_config(page_title="Galatea OMARDENT", layout="wide") | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('stopwords', quiet=True) | |
| # Cargar las claves API desde el archivo .env | |
| load_dotenv() | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| brevo_api_key = os.getenv("BREVO_API_KEY") | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "botidinamix-g.json" | |
| # Verifica que las claves API están configuradas | |
| if not openai_api_key: | |
| st.error("No API key provided for OpenAI. Please set your API key in the .env file.") | |
| else: | |
| openai.api_key = openai_api_key | |
| if not brevo_api_key: | |
| st.error("No API key provided for Brevo. Please set your API key in the .env file.") | |
| def extraer_texto_pdf(archivo): | |
| texto = "" | |
| if archivo: | |
| with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
| temp_file.write(archivo.read()) | |
| temp_file_path = temp_file.name | |
| try: | |
| with open(temp_file_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| for page in range(len(reader.pages)): | |
| texto += reader.pages[page].extract_text() | |
| except Exception as e: | |
| st.error(f"Error al extraer texto del PDF: {e}") | |
| finally: | |
| os.unlink(temp_file_path) | |
| return texto | |
| def preprocesar_texto(texto): | |
| tokens = word_tokenize(texto, language='spanish') | |
| tokens = [word.lower() for word in tokens if word.isalpha()] | |
| stopwords_es = set(stopwords.words('spanish')) | |
| tokens = [word for word in tokens if word not in stopwords_es] | |
| stemmer = SnowballStemmer('spanish') | |
| tokens = [stemmer.stem(word) for word in tokens] | |
| return " ".join(tokens) | |
| def obtener_respuesta(pregunta, texto_preprocesado, modelo, temperatura=0.5, assistant_id="asst_4ZYvBvf4IUVQPjnugSZGLdV2", contexto=""): | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model=modelo, | |
| messages=[ | |
| {"role": "system", "content": "Actua como Galatea la asistente de la clinica Odontologica OMARDENT y resuelve las inquietudes"}, | |
| {"role": "user", "content": f"{contexto}\n\n{pregunta}\n\nContexto: {texto_preprocesado}"} | |
| ], | |
| temperature=temperatura | |
| ) | |
| respuesta = response.choices[0].message['content'].strip() | |
| # Configura la solicitud de síntesis de voz | |
| client = texttospeech.TextToSpeechClient() | |
| input_text = texttospeech.SynthesisInput(text=respuesta) | |
| voice = texttospeech.VoiceSelectionParams( | |
| language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE | |
| ) | |
| audio_config = texttospeech.AudioConfig( | |
| audio_encoding=texttospeech.AudioEncoding.MP3 | |
| ) | |
| # Realiza la solicitud de síntesis de voz | |
| response = client.synthesize_speech( | |
| input=input_text, voice=voice, audio_config=audio_config | |
| ) | |
| # Reproduce el audio en Streamlit | |
| st.audio(response.audio_content, format="audio/mp3") | |
| return respuesta | |
| except openai.OpenAIError as e: | |
| st.error(f"Error al comunicarse con OpenAI: {e}") | |
| return "Lo siento, no puedo procesar tu solicitud en este momento." | |
| except Exception as e: | |
| st.error(f"Error al generar la respuesta y el audio: {e}") | |
| return "Lo siento, ocurrió un error al procesar tu solicitud." | |
| def guardar_en_txt(nombre_archivo, datos): | |
| carpeta = "datos_guardados" | |
| os.makedirs(carpeta, exist_ok=True) | |
| ruta_archivo = os.path.join(carpeta, nombre_archivo) | |
| try: | |
| with open(ruta_archivo, 'a', encoding='utf-8') as archivo: | |
| archivo.write(datos + "\n") | |
| except Exception as e: | |
| st.error(f"Error al guardar datos en el archivo: {e}") | |
| return ruta_archivo | |
| def cargar_desde_txt(nombre_archivo): | |
| carpeta = "datos_guardados" | |
| ruta_archivo = os.path.join(carpeta, nombre_archivo) | |
| try: | |
| if os.path.exists(ruta_archivo): | |
| with open(ruta_archivo, 'r', encoding='utf-8') as archivo: | |
| return archivo.read() | |
| else: | |
| st.warning("Archivo no encontrado.") | |
| return "" | |
| except Exception as e: | |
| st.error(f"Error al cargar datos desde el archivo: {e}") | |
| return "" | |
| def listar_archivos_txt(): | |
| carpeta = "datos_guardados" | |
| try: | |
| if not os.path.exists(carpeta): | |
| return [] | |
| archivos = [f for f in os.listdir(carpeta) if f.endswith('.txt')] | |
| archivos_ordenados = sorted(archivos, key=lambda x: os.path.getctime(os.path.join(carpeta, x)), reverse=True) | |
| return archivos_ordenados | |
| except Exception as e: | |
| st.error(f"Error al listar archivos: {e}") | |
| return [] | |
| def generar_pdf(dataframe, titulo, filename): | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(200, 10, txt=titulo, ln=True, align='C') | |
| for i, row in dataframe.iterrows(): | |
| row_text = ", ".join(f"{col}: {val}" for col, val in row.items()) | |
| pdf.cell(200, 10, txt=row_text, ln=True) | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| pdf.output(tmp_file.name) | |
| return tmp_file.name | |
| except Exception as e: | |
| st.error(f"Error al generar PDF: {e}") | |
| return None | |
| def enviar_correo(destinatario, asunto, contenido): | |
| url = "https://api.brevo.com/v3/smtp/email" | |
| headers = { | |
| "accept": "application/json", | |
| "api-key": brevo_api_key, | |
| "content-type": "application/json" | |
| } | |
| payload = { | |
| "sender": {"email": "tu_correo@dominio.com"}, | |
| "to": [{"email": destinatario}], | |
| "subject": asunto, | |
| "htmlContent": contenido | |
| } | |
| try: | |
| response = requests.post(url, json=payload, headers=headers) | |
| if response.status_code == 201: | |
| st.success(f"Correo enviado a {destinatario}") | |
| else: | |
| st.error(f"Error al enviar el correo: {response.text}") | |
| except Exception as e: | |
| st.error(f"Error al enviar el correo: {e}") | |
| def enviar_whatsapp(numero, mensaje): | |
| url = "https://api.brevo.com/v3/whatsapp/send" | |
| headers = { | |
| "accept": "application/json", | |
| "api-key": brevo_api_key, | |
| "content-type": "application/json" | |
| } | |
| payload = { | |
| "recipient": {"number": numero}, | |
| "sender": {"number": "tu_numero_whatsapp"}, | |
| "content": mensaje | |
| } | |
| try: | |
| response = requests.post(url, json=payload, headers=headers) | |
| if response.status_code == 201: | |
| st.success(f"Mensaje de WhatsApp enviado a {numero}") | |
| else: | |
| st.error(f"Error al enviar el mensaje de WhatsApp: {response.text}") | |
| except Exception as e: | |
| st.error(f"Error al enviar el mensaje de WhatsApp: {e}") | |
| def analizar_disponibilidad(imagen): | |
| client = vision_v1.ImageAnnotatorClient() | |
| content = imagen.read() | |
| image = vision_v1.Image(content=content) | |
| response = client.text_detection(image=image) | |
| if response.error.message: | |
| st.error(f"Error en la interpretación de la imagen: {response.error.message}") | |
| return None | |
| texto = response.full_text_annotation.text | |
| return texto | |
| def asignar_cita(disponibilidad, fecha, hora, paciente, doctor): | |
| # Aquí se debería implementar la lógica de asignación | |
| # que verifique la disponibilidad y asigna la cita si es posible. | |
| # Esta es una implementación simplificada. | |
| if disponibilidad: | |
| if "disponible" in disponibilidad.lower(): | |
| return f"Cita asignada para {paciente} con {doctor} el {fecha} a las {hora}." | |
| else: | |
| return "No hay disponibilidad en ese horario. Por favor, elija otro." | |
| else: | |
| return "No se pudo verificar la disponibilidad." | |
| def flujo_agendamiento(): | |
| st.title("📅 Asistente de Agendamiento") | |
| if 'disponibilidad' not in st.session_state: | |
| st.session_state.disponibilidad = "" | |
| imagen = st.file_uploader("Sube una imagen de la agenda para analizar disponibilidad", type=['png', 'jpg', 'jpeg']) | |
| if imagen: | |
| disponibilidad = analizar_disponibilidad(imagen) | |
| st.session_state.disponibilidad = disponibilidad | |
| st.write("### Disponibilidad Analizada:") | |
| st.write(disponibilidad) | |
| with st.form("asignar_cita_form"): | |
| fecha = st.date_input("Fecha de la cita:") | |
| hora = st.time_input("Hora de la cita:") | |
| paciente = st.text_input("Nombre del Paciente:") | |
| doctor = st.selectbox("Doctor:", ["Dr. Jose Daniel C", "Dr. Jose Omar C"]) | |
| submitted = st.form_submit_button("Asignar Cita") | |
| if submitted: | |
| respuesta = asignar_cita(st.session_state.disponibilidad, fecha, hora, paciente, doctor) | |
| st.write(respuesta) | |
| # Respuesta por voz | |
| client = texttospeech.TextToSpeechClient() | |
| synthesis_input = texttospeech.SynthesisInput(text=respuesta) | |
| voice = texttospeech.VoiceSelectionParams(language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE) | |
| audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) | |
| response = client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config) | |
| st.audio(response.audio_content, format="audio/mp3") | |
| def main(): | |
| if 'modelo' not in st.session_state: | |
| st.session_state['modelo'] = "gpt-3.5-turbo" | |
| if 'temperatura' not in st.session_state: | |
| st.session_state['temperatura'] = 0.5 | |
| if 'mensajes_chat' not in st.session_state: | |
| st.session_state['mensajes_chat'] = [] | |
| if 'transcripcion_voz' not in st.session_state: | |
| st.session_state['transcripcion_voz'] = "" | |
| if 'imagen_asistente' not in st.session_state: | |
| st.session_state['imagen_asistente'] = None | |
| if 'video_estado' not in st.session_state: | |
| st.session_state['video_estado'] = 'paused' | |
| if 'assistant_id' not in st.session_state: | |
| st.session_state['assistant_id'] = 'asst_4ZYvBvf4IUVQPjnugSZGLdV2' | |
| if 'presupuesto_texto' not in st.session_state: | |
| st.session_state['presupuesto_texto'] = '' | |
| if 'mostrar_chat' not in st.session_state: | |
| st.session_state['mostrar_chat'] = False | |
| if 'memoria' not in st.session_state: | |
| st.session_state['memoria'] = {} | |
| with open("assets/assets_instrucciones.pdf", "rb") as file: | |
| texto_pdf = extraer_texto_pdf(file) | |
| st.session_state['texto_preprocesado_pdf'] = preprocesar_texto(texto_pdf) | |
| ruta_logo = os.path.join("assets", "assets_Logo Omardent.png") | |
| if os.path.exists(ruta_logo): | |
| st.sidebar.image(ruta_logo, use_column_width=True) | |
| else: | |
| st.sidebar.warning(f"Error: No se pudo encontrar la imagen en la ruta: {ruta_logo}") | |
| st.sidebar.title("🤖 Galatea OMARDENT") | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("🧠 Configuración del Modelo") | |
| st.session_state['modelo'] = st.sidebar.selectbox( | |
| "Selecciona el modelo:", | |
| ["gpt-3.5-turbo", "gpt-4", "gpt-4-32k", "gpt-4o"], | |
| index=0, | |
| key='modelo_selectbox' | |
| ) | |
| st.sidebar.markdown("---") | |
| st.session_state['temperatura'] = st.sidebar.slider( | |
| "🌡️ Temperatura", | |
| min_value=0.0, max_value=1.0, | |
| value=st.session_state['temperatura'], | |
| step=0.1, | |
| key='temperatura_slider' | |
| ) | |
| assistant_id = st.sidebar.text_input("Assistant ID", key="assistant_id", help="Introduce el Assistant ID del playground de OpenAI") | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("🌟 Navegación") | |
| lateral_page = st.sidebar.radio("Ir a", ["Página Principal", "Gestión de Trabajos", "Gestión de Insumos", "Registro de Radiografías", "Buscar Datos", "Notificaciones", "Recomendaciones", "Asistente de Presupuestos", "Comunicación", "Asistente de Agendamiento", "Interpretación de Imágenes"]) | |
| top_page = st.selectbox("Navegación Superior", ["Página Principal", "Galatea-Asistente"]) | |
| if top_page == "Galatea-Asistente": | |
| mostrar_galatea_asistente() | |
| else: | |
| if lateral_page == "Página Principal": | |
| mostrar_pagina_principal() | |
| elif lateral_page == "Gestión de Trabajos": | |
| flujo_laboratorio() | |
| elif lateral_page == "Gestión de Insumos": | |
| flujo_insumos() | |
| elif lateral_page == "Registro de Radiografías": | |
| flujo_radiografias() | |
| elif lateral_page == "Buscar Datos": | |
| buscar_datos_guardados() | |
| elif lateral_page == "Notificaciones": | |
| generar_notificaciones_pendientes() | |
| elif lateral_page == "Recomendaciones": | |
| mostrar_recomendaciones() | |
| elif lateral_page == "Asistente de Presupuestos": | |
| flujo_presupuestos() | |
| elif lateral_page == "Comunicación": | |
| st.write("Página de Comunicación") | |
| elif lateral_page == "Asistente de Agendamiento": | |
| flujo_agendamiento() | |
| elif lateral_page == "Interpretación de Imágenes": | |
| mostrar_interpretacion_imagen() | |
| if __name__ == "__main__": | |
| main() | |