OMARDENTvirtual3 / utils /data_manager.py
Josedcape's picture
Rename utils_data_manager (2).py to utils/data_manager.py
83a34e0 verified
import os
import tempfile
import openai
from dotenv import load_dotenv
import PyPDF2
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
import pandas as pd
from fpdf import FPDF
import streamlit as st
import requests
from google.cloud import texttospeech
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
# Cargar las claves API desde el archivo .env
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
brevo_api_key = os.getenv("BREVO_API_KEY")
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "botidinamix-g.json"
# Verifica que las claves API están configuradas
if not openai_api_key:
st.error("No API key provided for OpenAI. Please set your API key in the .env file.")
else:
openai.api_key = openai_api_key
if not brevo_api_key:
st.error("No API key provided for Brevo. Please set your API key in the .env file.")
def extraer_texto_pdf(archivo):
texto = ""
if archivo:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(archivo.read())
temp_file_path = temp_file.name
try:
with open(temp_file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
for page in range(len(reader.pages)):
texto += reader.pages[page].extract_text()
except Exception as e:
st.error(f"Error al extraer texto del PDF: {e}")
finally:
os.unlink(temp_file_path)
return texto
def preprocesar_texto(texto):
tokens = word_tokenize(texto, language='spanish')
tokens = [word.lower() for word in tokens if word.isalpha()]
stopwords_es = set(stopwords.words('spanish'))
tokens = [word for word in tokens if word not in stopwords_es]
stemmer = SnowballStemmer('spanish')
tokens = [stemmer.stem(word) for word in tokens]
return " ".join(tokens)
def obtener_respuesta(pregunta, texto_preprocesado, modelo, temperatura=0.5, assistant_id=""):
try:
response = openai.ChatCompletion.create(
model=modelo,
messages=[
{"role": "system", "content": "Actua como Galatea la asistente de la clinica Odontologica OMARDENT y resuelve las inquietudes"},
{"role": "user", "content": f"{pregunta}\n\nContexto: {texto_preprocesado}"}
],
temperature=temperatura
)
respuesta = response.choices[0].message['content'].strip()
# Configura la solicitud de síntesis de voz
client = texttospeech.TextToSpeechClient()
input_text = texttospeech.SynthesisInput(text=respuesta)
voice = texttospeech.VoiceSelectionParams(
language_code="es-ES", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE
)
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3
)
# Realiza la solicitud de síntesis de voz
response = client.synthesize_speech(
input=input_text, voice=voice, audio_config=audio_config
)
# Reproduce el audio en Streamlit
st.audio(response.audio_content, format="audio/mp3")
return respuesta
except openai.OpenAIError as e:
st.error(f"Error al comunicarse con OpenAI: {e}")
return "Lo siento, no puedo procesar tu solicitud en este momento."
except Exception as e:
st.error(f"Error al generar la respuesta y el audio: {e}")
return "Lo siento, ocurrió un error al procesar tu solicitud."
def guardar_en_txt(nombre_archivo, datos):
carpeta = "datos_guardados"
os.makedirs(carpeta, exist_ok=True)
ruta_archivo = os.path.join(carpeta, nombre_archivo)
try:
with open(ruta_archivo, 'a', encoding='utf-8') as archivo: # Append mode
archivo.write(datos + "\n")
except Exception as e:
st.error(f"Error al guardar datos en el archivo: {e}")
return ruta_archivo
def cargar_desde_txt(nombre_archivo):
carpeta = "datos_guardados"
ruta_archivo = os.path.join(carpeta, nombre_archivo)
try:
if os.path.exists(ruta_archivo):
with open(ruta_archivo, 'r', encoding='utf-8') as archivo:
return archivo.read()
else:
st.warning("Archivo no encontrado.")
return ""
except Exception as e:
st.error(f"Error al cargar datos desde el archivo: {e}")
return ""
def listar_archivos_txt():
carpeta = "datos_guardados"
try:
if not os.path.exists(carpeta):
return []
archivos = [f for f in os.listdir(carpeta) if f.endswith('.txt')]
archivos_ordenados = sorted(archivos, key=lambda x: os.path.getctime(os.path.join(carpeta, x)), reverse=True)
return archivos_ordenados
except Exception as e:
st.error(f"Error al listar archivos: {e}")
return []
def generar_pdf(dataframe, titulo, filename):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
pdf.cell(200, 10, txt=titulo, ln=True, align='C')
for i, row in dataframe.iterrows():
row_text = ", ".join(f"{col}: {val}" for col, val in row.items())
pdf.cell(200, 10, txt=row_text, ln=True)
try:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
pdf.output(tmp_file.name)
return tmp_file.name
except Exception as e:
st.error(f"Error al generar PDF: {e}")
return None
def enviar_correo(destinatario, asunto, contenido):
url = "https://api.brevo.com/v3/smtp/email"
headers = {
"accept": "application/json",
"api-key": brevo_api_key,
"content-type": "application/json"
}
payload = {
"sender": {"email": "tu_correo@dominio.com"},
"to": [{"email": destinatario}],
"subject": asunto,
"htmlContent": contenido
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 201:
st.success(f"Correo enviado a {destinatario}")
else:
st.error(f"Error al enviar el correo: {response.text}")
except Exception as e:
st.error(f"Error al enviar el correo: {e}")
def enviar_whatsapp(numero, mensaje):
url = "https://api.brevo.com/v3/whatsapp/send"
headers = {
"accept": "application/json",
"api-key": brevo_api_key,
"content-type": "application/json"
}
payload = {
"recipient": {"number": numero},
"sender": {"number": "tu_numero_whatsapp"},
"content": mensaje
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code == 201:
st.success(f"Mensaje de WhatsApp enviado a {numero}")
else:
st.error(f"Error al enviar el mensaje de WhatsApp: {response.text}")
except Exception as e:
st.error(f"Error al enviar el mensaje de WhatsApp: {e}")
def flujo_laboratorio():
st.title("🦷 Gestión de Trabajos de Laboratorio")
if 'laboratorio' not in st.session_state:
st.session_state.laboratorio = []
with st.form("laboratorio_form"):
tipo_trabajo = st.selectbox("Tipo de trabajo:", [
"Protesis total", "Protesis removible metal-acrilico", "Parcialita acrilico",
"Placa de blanqueamiento", "Placa de bruxismo", "Corona de acrilico",
"Corona en zirconio", "Protesis flexible", "Acker flexible"
])
doctor = st.selectbox("Doctor que requiere el trabajo:", ["Dr. Jose Daniel C", "Dr. Jose Omar C"])
fecha_entrega = st.date_input("Fecha de entrega:")
fecha_envio = st.date_input("Fecha de envío:")
laboratorio = st.selectbox("Laboratorio dental:", ["Ernesto Correa lab", "Formando Sonrisas"])
nombre_paciente = st.text_input("Nombre paciente:")
observaciones = st.text_input("Observaciones:")
numero_orden = st.text_input("Número de orden:")
cantidad = st.number_input("Cantidad:", min_value=1, step=1)
submitted = st.form_submit_button("Registrar Trabajo")
if submitted:
trabajo = {
"tipo_trabajo": tipo_trabajo,
"doctor": doctor,
"fecha_entrega": str(fecha_entrega),
"fecha_envio": str(fecha_envio),
"laboratorio": laboratorio,
"nombre_paciente": nombre_paciente,
"observaciones": observaciones,
"numero_orden": numero_orden,
"cantidad": cantidad,
"estado": "pendiente"
}
st.session_state.laboratorio.append(trabajo)
datos_guardados = mostrar_datos_como_texto([trabajo]) # Append only the new entry
guardar_en_txt('trabajos_laboratorio.txt', datos_guardados)
st.success("Trabajo registrado con éxito.")
if st.session_state.laboratorio:
st.write("### Trabajos Registrados")
df_trabajos = pd.DataFrame(st.session_state.laboratorio)
st.write(df_trabajos)
pdf_file = generar_pdf(df_trabajos, "Registro de Trabajos de Laboratorio", "trabajos_laboratorio.pdf")
st.download_button(
label="📥 Descargar PDF",
data=open(pdf_file, 'rb').read(),
file_name="trabajos_laboratorio.pdf",
mime="application/pdf"
)
def flujo_insumos():
st.title("📦 Gestión de Insumos")
if 'insumos' not in st.session_state:
st.session_state.insumos = []
with st.form("insumos_form"):
insumo_nombre = st.text_input("Nombre del Insumo:")
insumo_cantidad = st.number_input("Cantidad Faltante:", min_value=0, step=1)
submitted = st.form_submit_button("Agregar Insumo")
if submitted and insumo_nombre:
insumo = {"nombre": insumo_nombre, "cantidad": insumo_cantidad}
st.session_state.insumos.append(insumo)
datos_guardados = mostrar_datos_como_texto([insumo]) # Append only the new entry
guardar_en_txt('insumos.txt', datos_guardados)
st.success(f"Insumo '{insumo_nombre}' agregado con éxito.")
if st.session_state.insumos:
st.write("### Insumos Registrados")
insumos_df = pd.DataFrame(st.session_state.insumos)
st.write(insumos_df)
pdf_file = generar_pdf(insumos_df, "Registro de Insumos Faltantes", "insumos.pdf")
st.download_button(
label="📥 Descargar PDF",
data=open(pdf_file, 'rb').read(),
file_name="insumos_faltantes.pdf",
mime="application/pdf"
)
def buscar_datos_guardados():
st.title("🔍 Buscar Datos Guardados")
carpeta = "datos_guardados"
if not os.path.exists(carpeta):
st.info("No se encontraron archivos de datos guardados.")
return
archivos = listar_archivos_txt()
if archivos:
archivo_seleccionado = st.selectbox("Selecciona un archivo para ver:", archivos)
if archivo_seleccionado:
datos = cargar_desde_txt(os.path.join(carpeta, archivo_seleccionado))
if datos:
st.write(f"### Datos del archivo {archivo_seleccionado}")
st.text_area("Datos", datos, height=300)
# Link to download the file
try:
with open(os.path.join(carpeta, archivo_seleccionado), 'rb') as file:
st.download_button(
label="📥 Descargar Archivo TXT",
data=file,
file_name=archivo_seleccionado,
mime="text/plain"
)
except Exception as e:
st.error(f"Error al preparar la descarga: {e}")
# Enviar el archivo seleccionado por correo
if st.button("Enviar por correo"):
contenido = f"Datos del archivo {archivo_seleccionado}:\n\n{datos}"
enviar_correo("josedcape@gmail.com", f"Datos del archivo {archivo_seleccionado}", contenido)
# Enviar el archivo seleccionado por WhatsApp
if st.button("Enviar por WhatsApp"):
mensaje = f"Datos del archivo {archivo_seleccionado}:\n\n{datos}"
enviar_whatsapp("3114329322", mensaje)
else:
st.warning(f"No se encontraron datos en el archivo {archivo_seleccionado}")
else:
st.info("No se encontraron archivos de datos guardados.")
def generar_notificaciones_pendientes():
if 'laboratorio' not in st.session_state or not st.session_state.laboratorio:
st.info("No hay trabajos pendientes.")
return
pendientes = [trabajo for trabajo in st.session_state.laboratorio if trabajo["estado"] == "pendiente"]
if pendientes:
st.write("### Notificaciones de Trabajos Pendientes")
for trabajo in pendientes:
st.info(f"Pendiente: {trabajo['tipo_trabajo']} - {trabajo['numero_orden']} para {trabajo['doctor']}. Enviado a {trabajo['laboratorio']} el {trabajo['fecha_envio']}.")
def mostrar_datos_como_texto(datos):
texto = ""
if isinstance(datos, dict):
for key, value in datos.items():
texto += f"{key}: {value}\n"
elif isinstance(datos, list):
for item in datos:
if isinstance(item, dict):
for key, value in item.items():
texto += f"{key}: {value}\n"
texto += "\n"
else:
texto += f"{item}\n"
return texto
def flujo_presupuestos():
st.title("💰 Asistente de Presupuestos")
st.markdown("Hola Dr. cuénteme en que puedo ayudarle?")
lista_precios = {
"Restauraciones en resina de una superficie": 75000,
"Restauraciones en resina de dos superficies": 95000,
"Restauraciones en resina de tres o más superficies": 120000,
"Restauración en resina cervical": 60000,
"Coronas metal-porcelana": 750000,
"Provisional": 80000,
"Profilaxis simple": 75000,
"Profilaxis completa": 90000,
"Corona en zirconio": 980000,
"Blanqueamiento dental láser por sesión": 150000,
"Blanqueamiento dental casero": 330000,
"Blanqueamiento mixto": 430000,
"Prótesis parcial acrílico hasta 6 dientes": 530000,
"Prótesis parcial acrílico de más de 6 dientes": 580000,
"Prótesis flexible hasta 6 dientes": 800000,
"Prótesis flexible de más de 6 dientes": 900000,
"Prótesis total de alto impacto": 650000,
"Acker flexible hasta 2 dientes": 480000,
"Exodoncia por diente": 85000,
"Exodoncia cordal": 130000,
"Endodoncia con dientes terminados en 6": 580000,
"Endodoncia de un conducto": 380000,
"Endodoncia de premolares superiores": 480000,
}
tratamiento = st.selectbox("Selecciona el tratamiento", list(lista_precios.keys()))
cantidad = st.number_input("Cantidad", min_value=1, step=1)
if st.button("Calcular Presupuesto"):
if tratamiento and cantidad:
precio_total = lista_precios[tratamiento] * cantidad
st.success(f"El precio total para {cantidad} {tratamiento} es: {precio_total} COP")
def flujo_radiografias():
st.title("📸 Registro de Radiografías")
if 'radiografias' not in st.session_state:
st.session_state.radiografias = []
with st.form("radiografias_form"):
nombre_paciente = st.text_input("Nombre del Paciente:")
tipo_radiografia = st.selectbox("Tipo de Radiografía:", ["Periapical", "Panorámica", "Cefalométrica"])
fecha_realizacion = st.date_input("Fecha de Realización:")
observaciones = st.text_area("Observaciones:")
submitted = st.form_submit_button("Registrar Radiografía")
if submitted:
radiografia = {
"nombre_paciente": nombre_paciente,
"tipo_radiografia": tipo_radiografia,
"fecha_realizacion": str(fecha_realizacion),
"observaciones": observaciones
}
st.session_state.radiografias.append(radiografia)
datos_guardados = mostrar_datos_como_texto([radiografia])
guardar_en_txt('radiografias.txt', datos_guardados)
st.success("Radiografía registrada con éxito.")
if st.session_state.radiografias:
st.write("### Radiografías Registradas")
df_radiografias = pd.DataFrame(st.session_state.radiografias)
st.write(df_radiografias)
pdf_file = generar_pdf(df_radiografias, "Registro de Radiografías", "radiografias.pdf")
st.download_button(
label="📥 Descargar PDF",
data=open(pdf_file, 'rb').read(),
file_name="radiografias.pdf",
mime="application/pdf"
)