Spaces:
Runtime error
Runtime error
| import os | |
| import tempfile | |
| import openai | |
| from dotenv import load_dotenv | |
| import PyPDF2 | |
| import nltk | |
| from nltk.tokenize import word_tokenize | |
| from nltk.corpus import stopwords | |
| from nltk.stem import SnowballStemmer | |
| import pandas as pd | |
| from fpdf import FPDF | |
| import streamlit as st | |
| import requests | |
| from google.cloud import texttospeech | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('stopwords', quiet=True) | |
| # Cargar las claves API desde el archivo .env | |
| load_dotenv() | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| brevo_api_key = os.getenv("BREVO_API_KEY") | |
| os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "botidinamix-g.json" | |
| # Verifica que las claves API están configuradas | |
| if not openai_api_key: | |
| st.error("No API key provided for OpenAI. Please set your API key in the .env file.") | |
| else: | |
| openai.api_key = openai_api_key | |
| if not brevo_api_key: | |
| st.error("No API key provided for Brevo. Please set your API key in the .env file.") | |
| def extraer_texto_pdf(archivo): | |
| texto = "" | |
| if archivo: | |
| with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
| temp_file.write(archivo.read()) | |
| temp_file_path = temp_file.name | |
| try: | |
| with open(temp_file_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| for page in range(len(reader.pages)): | |
| texto += reader.pages[page].extract_text() | |
| except Exception as e: | |
| st.error(f"Error al extraer texto del PDF: {e}") | |
| finally: | |
| os.unlink(temp_file_path) | |
| return texto | |
| def preprocesar_texto(texto): | |
| tokens = word_tokenize(texto, language='spanish') | |
| tokens = [word.lower() for word in tokens if word.isalpha()] | |
| stopwords_es = set(stopwords.words('spanish')) | |
| tokens = [word for word in tokens if word not in stopwords_es] | |
| stemmer = SnowballStemmer('spanish') | |
| tokens = [stemmer.stem(word) for word in tokens] | |
| return " ".join(tokens) | |
| def obtener_respuesta(pregunta, texto_preprocesado, modelo, temperatura=0.5, assistant_id=""): | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model=modelo, | |
| messages=[ | |
| {"role": "system", "content": "Actua como Galatea la asistente de la clinica Odontologica OMARDENT y resuelve las inquietudes"}, | |
| {"role": "user", "content": f"{pregunta}\n\nContexto: {texto_preprocesado}"} | |
| ], | |
| temperature=temperatura | |
| ) | |
| respuesta = response.choices[0].message['content'].strip() | |
| # Configura la solicitud de síntesis de voz | |
| client = texttospeech.TextToSpeechClient() | |
| input_text = texttospeech.SynthesisInput(text=respuesta) | |
| voice = texttospeech.VoiceSelectionParams( | |
| language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE | |
| ) | |
| audio_config = texttospeech.AudioConfig( | |
| audio_encoding=texttospeech.AudioEncoding.MP3 | |
| ) | |
| # Realiza la solicitud de síntesis de voz | |
| response = client.synthesize_speech( | |
| input=input_text, voice=voice, audio_config=audio_config | |
| ) | |
| # Reproduce el audio en Streamlit | |
| st.audio(response.audio_content, format="audio/mp3") | |
| return respuesta | |
| except openai.OpenAIError as e: | |
| st.error(f"Error al comunicarse con OpenAI: {e}") | |
| return "Lo siento, no puedo procesar tu solicitud en este momento." | |
| except Exception as e: | |
| st.error(f"Error al generar la respuesta y el audio: {e}") | |
| return "Lo siento, ocurrió un error al procesar tu solicitud." | |
| def guardar_en_txt(nombre_archivo, datos): | |
| carpeta = "datos_guardados" | |
| os.makedirs(carpeta, exist_ok=True) | |
| ruta_archivo = os.path.join(carpeta, nombre_archivo) | |
| try: | |
| with open(ruta_archivo, 'a', encoding='utf-8') as archivo: # Append mode | |
| archivo.write(datos + "\n") | |
| except Exception as e: | |
| st.error(f"Error al guardar datos en el archivo: {e}") | |
| return ruta_archivo | |
| def cargar_desde_txt(nombre_archivo): | |
| carpeta = "datos_guardados" | |
| ruta_archivo = os.path.join(carpeta, nombre_archivo) | |
| try: | |
| if os.path.exists(ruta_archivo): | |
| with open(ruta_archivo, 'r', encoding='utf-8') as archivo: | |
| return archivo.read() | |
| else: | |
| st.warning("Archivo no encontrado.") | |
| return "" | |
| except Exception as e: | |
| st.error(f"Error al cargar datos desde el archivo: {e}") | |
| return "" | |
| def listar_archivos_txt(): | |
| carpeta = "datos_guardados" | |
| try: | |
| if not os.path.exists(carpeta): | |
| return [] | |
| archivos = [f for f in os.listdir(carpeta) if f.endswith('.txt')] | |
| archivos_ordenados = sorted(archivos, key=lambda x: os.path.getctime(os.path.join(carpeta, x)), reverse=True) | |
| return archivos_ordenados | |
| except Exception as e: | |
| st.error(f"Error al listar archivos: {e}") | |
| return [] | |
| def generar_pdf(dataframe, titulo, filename): | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(200, 10, txt=titulo, ln=True, align='C') | |
| for i, row in dataframe.iterrows(): | |
| row_text = ", ".join(f"{col}: {val}" for col, val in row.items()) | |
| pdf.cell(200, 10, txt=row_text, ln=True) | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
| pdf.output(tmp_file.name) | |
| return tmp_file.name | |
| except Exception as e: | |
| st.error(f"Error al generar PDF: {e}") | |
| return None | |
| def enviar_correo(destinatario, asunto, contenido): | |
| url = "https://api.brevo.com/v3/smtp/email" | |
| headers = { | |
| "accept": "application/json", | |
| "api-key": brevo_api_key, | |
| "content-type": "application/json" | |
| } | |
| payload = { | |
| "sender": {"email": "tu_correo@dominio.com"}, | |
| "to": [{"email": destinatario}], | |
| "subject": asunto, | |
| "htmlContent": contenido | |
| } | |
| try: | |
| response = requests.post(url, json=payload, headers=headers) | |
| if response.status_code == 201: | |
| st.success(f"Correo enviado a {destinatario}") | |
| else: | |
| st.error(f"Error al enviar el correo: {response.text}") | |
| except Exception as e: | |
| st.error(f"Error al enviar el correo: {e}") | |
| def enviar_whatsapp(numero, mensaje): | |
| url = "https://api.brevo.com/v3/whatsapp/send" | |
| headers = { | |
| "accept": "application/json", | |
| "api-key": brevo_api_key, | |
| "content-type": "application/json" | |
| } | |
| payload = { | |
| "recipient": {"number": numero}, | |
| "sender": {"number": "tu_numero_whatsapp"}, | |
| "content": mensaje | |
| } | |
| try: | |
| response = requests.post(url, json=payload, headers=headers) | |
| if response.status_code == 201: | |
| st.success(f"Mensaje de WhatsApp enviado a {numero}") | |
| else: | |
| st.error(f"Error al enviar el mensaje de WhatsApp: {response.text}") | |
| except Exception as e: | |
| st.error(f"Error al enviar el mensaje de WhatsApp: {e}") | |
| def flujo_laboratorio(): | |
| st.title("🦷 Gestión de Trabajos de Laboratorio") | |
| if 'laboratorio' not in st.session_state: | |
| st.session_state.laboratorio = [] | |
| with st.form("laboratorio_form"): | |
| tipo_trabajo = st.selectbox("Tipo de trabajo:", [ | |
| "Protesis total", "Protesis removible metal-acrilico", "Parcialita acrilico", | |
| "Placa de blanqueamiento", "Placa de bruxismo", "Corona de acrilico", | |
| "Corona en zirconio", "Protesis flexible", "Acker flexible" | |
| ]) | |
| doctor = st.selectbox("Doctor que requiere el trabajo:", ["Dr. Jose Daniel C", "Dr. Jose Omar C"]) | |
| fecha_entrega = st.date_input("Fecha de entrega:") | |
| fecha_envio = st.date_input("Fecha de envío:") | |
| laboratorio = st.selectbox("Laboratorio dental:", ["Ernesto Correa lab", "Formando Sonrisas"]) | |
| nombre_paciente = st.text_input("Nombre paciente:") | |
| observaciones = st.text_input("Observaciones:") | |
| numero_orden = st.text_input("Número de orden:") | |
| cantidad = st.number_input("Cantidad:", min_value=1, step=1) | |
| submitted = st.form_submit_button("Registrar Trabajo") | |
| if submitted: | |
| trabajo = { | |
| "tipo_trabajo": tipo_trabajo, | |
| "doctor": doctor, | |
| "fecha_entrega": str(fecha_entrega), | |
| "fecha_envio": str(fecha_envio), | |
| "laboratorio": laboratorio, | |
| "nombre_paciente": nombre_paciente, | |
| "observaciones": observaciones, | |
| "numero_orden": numero_orden, | |
| "cantidad": cantidad, | |
| "estado": "pendiente" | |
| } | |
| st.session_state.laboratorio.append(trabajo) | |
| datos_guardados = mostrar_datos_como_texto([trabajo]) # Append only the new entry | |
| guardar_en_txt('trabajos_laboratorio.txt', datos_guardados) | |
| st.success("Trabajo registrado con éxito.") | |
| if st.session_state.laboratorio: | |
| st.write("### Trabajos Registrados") | |
| df_trabajos = pd.DataFrame(st.session_state.laboratorio) | |
| st.write(df_trabajos) | |
| pdf_file = generar_pdf(df_trabajos, "Registro de Trabajos de Laboratorio", "trabajos_laboratorio.pdf") | |
| st.download_button( | |
| label="📥 Descargar PDF", | |
| data=open(pdf_file, 'rb').read(), | |
| file_name="trabajos_laboratorio.pdf", | |
| mime="application/pdf" | |
| ) | |
| def flujo_insumos(): | |
| st.title("📦 Gestión de Insumos") | |
| if 'insumos' not in st.session_state: | |
| st.session_state.insumos = [] | |
| with st.form("insumos_form"): | |
| insumo_nombre = st.text_input("Nombre del Insumo:") | |
| insumo_cantidad = st.number_input("Cantidad Faltante:", min_value=0, step=1) | |
| submitted = st.form_submit_button("Agregar Insumo") | |
| if submitted and insumo_nombre: | |
| insumo = {"nombre": insumo_nombre, "cantidad": insumo_cantidad} | |
| st.session_state.insumos.append(insumo) | |
| datos_guardados = mostrar_datos_como_texto([insumo]) # Append only the new entry | |
| guardar_en_txt('insumos.txt', datos_guardados) | |
| st.success(f"Insumo '{insumo_nombre}' agregado con éxito.") | |
| if st.session_state.insumos: | |
| st.write("### Insumos Registrados") | |
| insumos_df = pd.DataFrame(st.session_state.insumos) | |
| st.write(insumos_df) | |
| pdf_file = generar_pdf(insumos_df, "Registro de Insumos Faltantes", "insumos.pdf") | |
| st.download_button( | |
| label="📥 Descargar PDF", | |
| data=open(pdf_file, 'rb').read(), | |
| file_name="insumos_faltantes.pdf", | |
| mime="application/pdf" | |
| ) | |
| def buscar_datos_guardados(): | |
| st.title("🔍 Buscar Datos Guardados") | |
| carpeta = "datos_guardados" | |
| if not os.path.exists(carpeta): | |
| st.info("No se encontraron archivos de datos guardados.") | |
| return | |
| archivos = listar_archivos_txt() | |
| if archivos: | |
| archivo_seleccionado = st.selectbox("Selecciona un archivo para ver:", archivos) | |
| if archivo_seleccionado: | |
| datos = cargar_desde_txt(os.path.join(carpeta, archivo_seleccionado)) | |
| if datos: | |
| st.write(f"### Datos del archivo {archivo_seleccionado}") | |
| st.text_area("Datos", datos, height=300) | |
| # Link to download the file | |
| try: | |
| with open(os.path.join(carpeta, archivo_seleccionado), 'rb') as file: | |
| st.download_button( | |
| label="📥 Descargar Archivo TXT", | |
| data=file, | |
| file_name=archivo_seleccionado, | |
| mime="text/plain" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error al preparar la descarga: {e}") | |
| # Enviar el archivo seleccionado por correo | |
| if st.button("Enviar por correo"): | |
| contenido = f"Datos del archivo {archivo_seleccionado}:\n\n{datos}" | |
| enviar_correo("josedcape@gmail.com", f"Datos del archivo {archivo_seleccionado}", contenido) | |
| # Enviar el archivo seleccionado por WhatsApp | |
| if st.button("Enviar por WhatsApp"): | |
| mensaje = f"Datos del archivo {archivo_seleccionado}:\n\n{datos}" | |
| enviar_whatsapp("3114329322", mensaje) | |
| else: | |
| st.warning(f"No se encontraron datos en el archivo {archivo_seleccionado}") | |
| else: | |
| st.info("No se encontraron archivos de datos guardados.") | |
| def generar_notificaciones_pendientes(): | |
| if 'laboratorio' not in st.session_state or not st.session_state.laboratorio: | |
| st.info("No hay trabajos pendientes.") | |
| return | |
| pendientes = [trabajo for trabajo in st.session_state.laboratorio if trabajo["estado"] == "pendiente"] | |
| if pendientes: | |
| st.write("### Notificaciones de Trabajos Pendientes") | |
| for trabajo in pendientes: | |
| st.info(f"Pendiente: {trabajo['tipo_trabajo']} - {trabajo['numero_orden']} para {trabajo['doctor']}. Enviado a {trabajo['laboratorio']} el {trabajo['fecha_envio']}.") | |
| def mostrar_datos_como_texto(datos): | |
| texto = "" | |
| if isinstance(datos, dict): | |
| for key, value in datos.items(): | |
| texto += f"{key}: {value}\n" | |
| elif isinstance(datos, list): | |
| for item in datos: | |
| if isinstance(item, dict): | |
| for key, value in item.items(): | |
| texto += f"{key}: {value}\n" | |
| texto += "\n" | |
| else: | |
| texto += f"{item}\n" | |
| return texto | |
| def flujo_presupuestos(): | |
| st.title("💰 Asistente de Presupuestos") | |
| st.markdown("Hola Dr. cuénteme en que puedo ayudarle?") | |
| lista_precios = { | |
| "Restauraciones en resina de una superficie": 75000, | |
| "Restauraciones en resina de dos superficies": 95000, | |
| "Restauraciones en resina de tres o más superficies": 120000, | |
| "Restauración en resina cervical": 60000, | |
| "Coronas metal-porcelana": 750000, | |
| "Provisional": 80000, | |
| "Profilaxis simple": 75000, | |
| "Profilaxis completa": 90000, | |
| "Corona en zirconio": 980000, | |
| "Blanqueamiento dental láser por sesión": 150000, | |
| "Blanqueamiento dental casero": 330000, | |
| "Blanqueamiento mixto": 430000, | |
| "Prótesis parcial acrílico hasta 6 dientes": 530000, | |
| "Prótesis parcial acrílico de más de 6 dientes": 580000, | |
| "Prótesis flexible hasta 6 dientes": 800000, | |
| "Prótesis flexible de más de 6 dientes": 900000, | |
| "Prótesis total de alto impacto": 650000, | |
| "Acker flexible hasta 2 dientes": 480000, | |
| "Exodoncia por diente": 85000, | |
| "Exodoncia cordal": 130000, | |
| "Endodoncia con dientes terminados en 6": 580000, | |
| "Endodoncia de un conducto": 380000, | |
| "Endodoncia de premolares superiores": 480000, | |
| } | |
| tratamiento = st.selectbox("Selecciona el tratamiento", list(lista_precios.keys())) | |
| cantidad = st.number_input("Cantidad", min_value=1, step=1) | |
| if st.button("Calcular Presupuesto"): | |
| if tratamiento and cantidad: | |
| precio_total = lista_precios[tratamiento] * cantidad | |
| st.success(f"El precio total para {cantidad} {tratamiento} es: {precio_total} COP") | |
| def flujo_radiografias(): | |
| st.title("📸 Registro de Radiografías") | |
| if 'radiografias' not in st.session_state: | |
| st.session_state.radiografias = [] | |
| with st.form("radiografias_form"): | |
| nombre_paciente = st.text_input("Nombre del Paciente:") | |
| tipo_radiografia = st.selectbox("Tipo de Radiografía:", ["Periapical", "Panorámica", "Cefalométrica"]) | |
| fecha_realizacion = st.date_input("Fecha de Realización:") | |
| observaciones = st.text_area("Observaciones:") | |
| submitted = st.form_submit_button("Registrar Radiografía") | |
| if submitted: | |
| radiografia = { | |
| "nombre_paciente": nombre_paciente, | |
| "tipo_radiografia": tipo_radiografia, | |
| "fecha_realizacion": str(fecha_realizacion), | |
| "observaciones": observaciones | |
| } | |
| st.session_state.radiografias.append(radiografia) | |
| datos_guardados = mostrar_datos_como_texto([radiografia]) | |
| guardar_en_txt('radiografias.txt', datos_guardados) | |
| st.success("Radiografía registrada con éxito.") | |
| if st.session_state.radiografias: | |
| st.write("### Radiografías Registradas") | |
| df_radiografias = pd.DataFrame(st.session_state.radiografias) | |
| st.write(df_radiografias) | |
| pdf_file = generar_pdf(df_radiografias, "Registro de Radiografías", "radiografias.pdf") | |
| st.download_button( | |
| label="📥 Descargar PDF", | |
| data=open(pdf_file, 'rb').read(), | |
| file_name="radiografias.pdf", | |
| mime="application/pdf" | |
| ) | |