import os import tempfile import openai from dotenv import load_dotenv import PyPDF2 import nltk from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import SnowballStemmer import pandas as pd from fpdf import FPDF import streamlit as st import requests from google.cloud import texttospeech nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) # Cargar las claves API desde el archivo .env load_dotenv() openai_api_key = os.getenv("OPENAI_API_KEY") brevo_api_key = os.getenv("BREVO_API_KEY") os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "botidinamix-g.json" # Verifica que las claves API están configuradas if not openai_api_key: st.error("No API key provided for OpenAI. Please set your API key in the .env file.") else: openai.api_key = openai_api_key if not brevo_api_key: st.error("No API key provided for Brevo. Please set your API key in the .env file.") def extraer_texto_pdf(archivo): texto = "" if archivo: with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_file.write(archivo.read()) temp_file_path = temp_file.name try: with open(temp_file_path, 'rb') as file: reader = PyPDF2.PdfReader(file) for page in range(len(reader.pages)): texto += reader.pages[page].extract_text() except Exception as e: st.error(f"Error al extraer texto del PDF: {e}") finally: os.unlink(temp_file_path) return texto def preprocesar_texto(texto): tokens = word_tokenize(texto, language='spanish') tokens = [word.lower() for word in tokens if word.isalpha()] stopwords_es = set(stopwords.words('spanish')) tokens = [word for word in tokens if word not in stopwords_es] stemmer = SnowballStemmer('spanish') tokens = [stemmer.stem(word) for word in tokens] return " ".join(tokens) def obtener_respuesta(pregunta, texto_preprocesado, modelo, temperatura=0.5, assistant_id=""): try: response = openai.ChatCompletion.create( model=modelo, messages=[ {"role": "system", "content": "Actua como Galatea la asistente de la clinica Odontologica OMARDENT y resuelve las inquietudes"}, {"role": "user", "content": f"{pregunta}\n\nContexto: {texto_preprocesado}"} ], temperature=temperatura ) respuesta = response.choices[0].message['content'].strip() # Configura la solicitud de síntesis de voz client = texttospeech.TextToSpeechClient() input_text = texttospeech.SynthesisInput(text=respuesta) voice = texttospeech.VoiceSelectionParams( language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE ) audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3 ) # Realiza la solicitud de síntesis de voz response = client.synthesize_speech( input=input_text, voice=voice, audio_config=audio_config ) # Reproduce el audio en Streamlit st.audio(response.audio_content, format="audio/mp3") return respuesta except openai.OpenAIError as e: st.error(f"Error al comunicarse con OpenAI: {e}") return "Lo siento, no puedo procesar tu solicitud en este momento." except Exception as e: st.error(f"Error al generar la respuesta y el audio: {e}") return "Lo siento, ocurrió un error al procesar tu solicitud." def guardar_en_txt(nombre_archivo, datos): carpeta = "datos_guardados" os.makedirs(carpeta, existo_ok=True) ruta_archivo = os.path.join(carpeta, nombre_archivo) try: with open(ruta_archivo, 'a', encoding='utf-8') as archivo: # Append mode archivo.write(datos + "\n") except Exception as e: st.error(f"Error al guardar datos en el archivo: {e}") return ruta_archivo def cargar_desde_txt(nombre_archivo): carpeta = "datos_guardados" ruta_archivo = os.path.join(carpeta, nombre_archivo) try: if os.path.exists(ruta_archivo): with open(ruta_archivo, 'r', encoding='utf-8') as archivo: return archivo.read() else: st.warning("Archivo no encontrado.") return "" except Exception as e: st.error(f"Error al cargar datos desde el archivo: {e}") return "" def listar_archivos_txt(): carpeta = "datos_guardados" try: if not os.path.exists(carpeta): return [] archivos = [f for f in os.listdir(carpeta) if f.endswith('.txt')] archivos_ordenados = sorted(archivos, key=lambda x: os.path.getctime(os.path.join(carpeta, x)), reverse=True) return archivos_ordenados except Exception as e: st.error(f"Error al listar archivos: {e}") return [] def generar_pdf(dataframe, titulo, filename): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.cell(200, 10, txt=titulo, ln=True, align='C') for i, row in dataframe.iterrows(): row_text = ", ".join(f"{col}: {val}" for col, val in row.items()) pdf.cell(200, 10, txt=row_text, ln=True) try: with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: pdf.output(tmp_file.name) return tmp_file.name except Exception as e: st.error(f"Error al generar PDF: {e}") return None def enviar_correo(destinatario, asunto, contenido): url = "https://api.brevo.com/v3/smtp/email" headers = { "accept": "application/json", "api-key": brevo_api_key, "content-type": "application/json" } payload = { "sender": {"email": "tu_correo@dominio.com"}, "to": [{"email": destinatario}], "subject": asunto, "htmlContent": contenido } try: response = requests.post(url, json=payload, headers=headers) if response.status_code == 201: st.success(f"Correo enviado a {destinatario}") else: st.error(f"Error al enviar el correo: {response.text}") except Exception as e: st.error(f"Error al enviar el correo: {e}") def enviar_whatsapp(numero, mensaje): url = "https://api.brevo.com/v3/whatsapp/send" headers = { "accept": "application/json", "api-key": brevo_api_key, "content-type": "application/json" } payload = { "recipient": {"number": numero}, "sender": {"number": "tu_numero_whatsapp"}, "content": mensaje } try: response = requests.post(url, json=payload, headers=headers) if response.status_code == 201: st.success(f"Mensaje de WhatsApp enviado a {numero}") else: st.error(f"Error al enviar el mensaje de WhatsApp: {response.text}") except Exception as e: st.error(f"Error al enviar el mensaje de WhatsApp: {e}") def flujo_laboratorio(): st.title("🦷 Gestión de Trabajos de Laboratorio") if 'laboratorio' not in st.session_state: st.session_state.laboratorio = [] with st.form("laboratorio_form"): tipo_trabajo = st.selectbox("Tipo de trabajo:", [ "Protesis total", "Protesis removible metal-acrilico", "Parcialita acrilico", "Placa de blanqueamiento", "Placa de bruxismo", "Corona de acrilico", "Corona en zirconio", "Protesis flexible", "Acker flexible" ]) doctor = st.selectbox("Doctor que requiere el trabajo:", ["Dr. Jose Daniel C", "Dr. Jose Omar C"]) fecha_entrega = st.date_input("Fecha de entrega:") fecha_envio = st.date_input("Fecha de envío:") laboratorio = st.selectbox("Laboratorio dental:", ["Ernesto Correa lab", "Formando Sonrisas"]) nombre_paciente = st.text_input("Nombre paciente:") observaciones = st.text_input("Observaciones:") numero_orden = st.text_input("Número de orden:") cantidad = st.number_input("Cantidad:", min_value=1, step=1) submitted = st.form_submit_button("Registrar Trabajo") if submitted: trabajo = { "tipo_trabajo": tipo_trabajo, "doctor": doctor, "fecha_entrega": str(fecha_entrega), "fecha_envio": str(fecha_envio), "laboratorio": laboratorio, "nombre_paciente": nombre_paciente, "observaciones": observaciones, "numero_orden": numero_orden, "cantidad": cantidad, "estado": "pendiente" } st.session_state.laboratorio.append(trabajo) datos_guardados = mostrar_datos_como_texto([trabajo]) # Append only the new entry guardar_en_txt('trabajos_laboratorio.txt', datos_guardados) st.success("Trabajo registrado con éxito.") if st.session_state.laboratorio: st.write("### Trabajos Registrados") df_trabajos = pd.DataFrame(st.session_state.laboratorio) st.write(df_trabajos) pdf_file = generar_pdf(df_trabajos, "Registro de Trabajos de Laboratorio", "trabajos_laboratorio.pdf") st.download_button( label="📥 Descargar PDF", data=open(pdf_file, 'rb').read(), file_name="trabajos_laboratorio.pdf", mime="application/pdf" ) def flujo_insumos(): st.title("📦 Gestión de Insumos") if 'insumos' not in st.session_state: st.session_state.insumos = [] with st.form("insumos_form"): insumo_nombre = st.text_input("Nombre del Insumo:") insumo_cantidad = st.number_input("Cantidad Faltante:", min_value=0, step=1) submitted = st.form_submit_button("Agregar Insumo") if submitted and insumo_nombre: insumo = {"nombre": insumo_nombre, "cantidad": insumo_cantidad} st.session_state.insumos.append(insumo) datos_guardados = mostrar_datos_como_texto([insumo]) # Append only the new entry guardar_en_txt('insumos.txt', datos_guardados) st.success(f"Insumo '{insumo_nombre}' agregado con éxito.") if st.session_state.insumos: st.write("### Insumos Registrados") insumos_df = pd.DataFrame(st.session_state.insumos) st.write(insumos_df) pdf_file = generar_pdf(insumos_df, "Registro de Insumos Faltantes", "insumos.pdf") st.download_button( label="📥 Descargar PDF", data=open(pdf_file, 'rb').read(), file_name="insumos_faltantes.pdf", mime="application/pdf" ) def buscar_datos_guardados(): st.title("🔍 Buscar Datos Guardados") carpeta = "datos_guardados" if not os.path.exists(carpeta): st.info("No se encontraron archivos de datos guardados.") return archivos = listar_archivos_txt() if archivos: archivo_seleccionado = st.selectbox("Selecciona un archivo para ver:", archivos) if archivo_seleccionado: datos = cargar_desde_txt(os.path.join(carpeta, archivo_seleccionado)) if datos: st.write(f"### Datos del archivo {archivo_seleccionado}") st.text_area("Datos", datos, height=300) # Link to download the file try: with open(os.path.join(carpeta, archivo_seleccionado), 'rb') as file: st.download_button( label="📥 Descargar Archivo TXT", data=file, file_name=archivo_seleccionado, mime="text/plain" ) except Exception as e: st.error(f"Error al preparar la descarga: {e}") # Enviar el archivo seleccionado por correo if st.button("Enviar por correo"): contenido = f"Datos del archivo {archivo_seleccionado}:\n\n{datos}" enviar_correo("josedcape@gmail.com", f"Datos del archivo {archivo_seleccionado}", contenido) # Enviar el archivo seleccionado por WhatsApp if st.button("Enviar por WhatsApp"): mensaje = f"Datos del archivo {archivo_seleccionado}:\n\n{datos}" enviar_whatsapp("3114329322", mensaje) else: st.warning(f"No se encontraron datos en el archivo {archivo_seleccionado}") else: st.info("No se encontraron archivos de datos guardados.") def generar_notificaciones_pendientes(): if 'laboratorio' not in st.session_state or not st.session_state.laboratorio: st.info("No hay trabajos pendientes.") return pendientes = [trabajo for trabajo in st.session_state.laboratorio if trabajo["estado"] == "pendiente"] if pendientes: st.write("### Notificaciones de Trabajos Pendientes") for trabajo in pendientes: st.info(f"Pendiente: {trabajo['tipo_trabajo']} - {trabajo['numero_orden']} para {trabajo['doctor']}. Enviado a {trabajo['laboratorio']} el {trabajo['fecha_envio']}.") def mostrar_datos_como_texto(datos): texto = "" if isinstance(datos, dict): for key, value in datos.items(): texto += f"{key}: {value}\n" elif isinstance(datos, list): for item in datos: if isinstance(item, dict): for key, value in item.items(): texto += f"{key}: {value}\n" texto += "\n" else: texto += f"{item}\n" return texto def flujo_presupuestos(): st.title("💰 Asistente de Presupuestos") st.markdown("Hola Dr. cuénteme en que puedo ayudarle?") lista_precios = { "Restauraciones en resina de una superficie": 75000, "Restauraciones en resina de dos superficies": 95000, "Restauraciones en resina de tres o más superficies": 120000, "Restauración en resina cervical": 60000, "Coronas metal-porcelana": 750000, "Provisional": 80000, "Profilaxis simple": 75000, "Profilaxis completa": 90000, "Corona en zirconio": 980000, "Blanqueamiento dental láser por sesión": 150000, "Blanqueamiento dental casero": 330000, "Blanqueamiento mixto": 430000, "Prótesis parcial acrílico hasta 6 dientes": 530000, "Prótesis parcial acrílico de más de 6 dientes": 580000, "Prótesis flexible hasta 6 dientes": 800000, "Prótesis flexible de más de 6 dientes": 900000, "Prótesis total de alto impacto": 650000, "Acker flexible hasta 2 dientes": 480000, "Exodoncia por diente": 85000, "Exodoncia cordal": 130000, "Endodoncia con dientes terminados en 6": 580000, "Endodoncia de un conducto": 380000, "Endodoncia de premolares superiores": 480000, } if 'presupuesto' not in st.session_state: st.session_state['presupuesto'] = [] with st.form("presupuesto_form"): tratamiento = st.selectbox("Selecciona el tratamiento", list(lista_precios.keys())) cantidad = st.number_input("Cantidad", min_value=1, step=1) agregar = st.form_submit_button("Agregar al Presupuesto") if agregar: precio_total = lista_precios[tratamiento] * cantidad st.session_state['presupuesto'].append({"tratamiento": tratamiento, "cantidad": cantidad, "precio_total": precio_total}) st.success(f"Agregado: {cantidad} {tratamiento} - Total: {precio_total} COP") if st.session_state['presupuesto']: st.write("### Servicios Seleccionados") total_presupuesto = sum(item['precio_total'] for item in st.session_state['presupuesto']) for item in st.session_state['presupuesto']: st.write(f"{item['cantidad']} x {item['tratamiento']} - {item['precio_total']} COP") st.write(f"**Total: {total_presupuesto} COP**") if st.button("Copiar Presupuesto al Asistente"): servicios = "\n".join([f"{item['cantidad']} x {item['tratamiento']} - {item['precio_total']} COP" for item in st.session_state['presupuesto']]) total = f"**Total: {total_presupuesto} COP**" st.session_state['presupuesto_texto'] = f"{servicios}\n{total}" st.success("Presupuesto copiado al asistente de chat.") st.session_state['mostrar_chat'] = True if st.session_state['mostrar_chat']: st.markdown("### Chat con Asistente") pregunta_usuario = st.text_input("Escribe tu pregunta aquí:", value=st.session_state.get('presupuesto_texto', '')) if st.button("Enviar Pregunta"): manejar_pregunta_usuario(pregunta_usuario) def flujo_radiografias(): st.title("📸 Registro de Radiografías") if 'radiografias' not in st.session_state: st.session_state.radiografias = [] with st.form("radiografias_form"): nombre_paciente = st.text_input("Nombre del Paciente:") tipo_radiografia = st.selectbox("Tipo de Radiografía:", ["Periapical", "Panorámica", "Cefalométrica"]) fecha_realizacion = st.date_input("Fecha de Realización:") observaciones = st.text_area("Observaciones:") submitted = st.form_submit_button("Registrar Radiografía") if submitted: radiografia = { "nombre_paciente": nombre_paciente, "tipo_radiografia": tipo_radiografia, "fecha_realizacion": str(fecha_realizacion), "observaciones": observaciones } st.session_state.radiografias.append(radiografia) datos_guardados = mostrar_datos_como_texto([radiografia]) guardar_en_txt('radiografias.txt', datos_guardados) st.success("Radiografía registrada con éxito.") if st.session_state.radiografias: st.write("### Radiografías Registradas") df_radiografias = pd.DataFrame(st.session_state.radiografias) st.write(df_radiografias) pdf_file = generar_pdf(df_radiografias, "Registro de Radiografías", "radiografias.pdf") st.download_button( label="📥 Descargar PDF", data=open(pdf_file, 'rb').read(), file_name="radiografias.pdf", mime="application/pdf" ) def mostrar_recomendaciones(): st.title("⭐ Recomendaciones") st.write("Aquí puedes encontrar recomendaciones y consejos útiles.") def main(): st.set_page_config(page_title="Galatea OMARDENT", layout="wide") # Inicializar el estado de la sesión if 'modelo' not in st.session_state: st.session_state['modelo'] = "gpt-3.5-turbo" if 'temperatura' not in st.session_state: st.session_state['temperatura'] = 0.5 if 'mensajes_chat' not in st.session_state: st.session_state['mensajes_chat'] = [] if 'transcripcion_voz' not in st.session_state: st.session_state['transcripcion_voz'] = "" if 'imagen_asistente' not in st.session_state: st.session_state['imagen_asistente'] = None if 'video_estado' not in st.session_state: st.session_state['video_estado'] = 'paused' if 'assistant_id' not in st.session_state: st.session_state['assistant_id'] = 'asst_4ZYvBvf4IUVQPjnugSZGLdV2' if 'presupuesto_texto' not in st.session_state: st.session_state['presupuesto_texto'] = '' if 'mostrar_chat' not in st.session_state: st.session_state['mostrar_chat'] = False # Barra lateral ruta_logo = os.path.join("assets", "Logo Omardent.png") if os.path.exists(ruta_logo): st.sidebar.image(ruta_logo, use_column_width=True) else: st.sidebar.warning(f"Error: No se pudo encontrar la imagen en la ruta: {ruta_logo}") st.sidebar.title("🤖 Galatea OMARDENT") st.sidebar.markdown("---") st.sidebar.subheader("🧠 Configuración del Modelo") st.session_state['modelo'] = st.sidebar.selectbox( "Selecciona el modelo:", ["gpt-3.5-turbo", "gpt-4", "gpt-4-32k", "gpt-4o"], index=0, key='modelo_selectbox', # Clave única help="Elige el modelo de lenguaje de OpenAI que prefieras." ) st.sidebar.markdown("---") st.session_state['temperatura'] = st.sidebar.slider( "🌡️ Temperatura", min_value=0.0, max_value=1.0, value=st.session_state['temperatura'], step=0.1, key='temperatura_slider' # Clave única ) assistant_id = st.sidebar.text_input("Assistant ID", key="assistant_id", help="Introduce el Assistant ID del playground de OpenAI") st.sidebar.markdown("---") st.sidebar.subheader("🌟 Navegación") lateral_page = st.sidebar.radio("Ir a", ["Página Principal", "Gestión de Trabajos", "Gestión de Insumos", "Registro de Radiografías", "Buscar Datos", "Notificaciones", "Recomendaciones", "Asistente de Presupuestos", "Comunicación", "Asistente de Agendamiento"]) top_page = st.selectbox("Navegación Superior", ["Página Principal", "Galatea-Asistente"]) if top_page == "Galatea-Asistente": mostrar_galatea_asistente() else: if lateral_page == "Página Principal": mostrar_pagina_principal() elif lateral_page == "Gestión de Trabajos": flujo_laboratorio() elif lateral_page == "Gestión de Insumos": flujo_insumos() elif lateral_page == "Registro de Radiografías": flujo_radiografias() elif lateral_page == "Buscar Datos": buscar_datos_guardados() elif lateral_page == "Notificaciones": generar_notificaciones_pendientes() elif lateral_page == "Recomendaciones": mostrar_recomendaciones() elif lateral_page == "Asistente de Presupuestos": flujo_presupuestos() elif lateral_page == "Comunicación": st.write("Página de Comunicación") # Implementar según sea necesario elif lateral_page == "Asistente de Agendamiento": st.write("Página de Agendamiento") # Implementar según sea necesario def mostrar_pagina_principal(): st.title("VIRTUAL OMARDENT AI-BOTIDINAMIX") st.markdown( f"""