| import os |
| import random |
| import pandas as pd |
| import chainlit as cl |
| from datetime import datetime |
| from urllib.parse import quote |
|
|
| |
| os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "False" |
| os.environ["ABSL_LOG_LEVEL"] = "ERROR" |
|
|
| |
| EXCEL_PATH = os.path.abspath("MasterData SISO - Actualizada - Jan-29-2025.xlsx") |
| AUDIO_PATH = os.path.abspath("Manual del Operador de Tienda.wav") |
| CERTIFICATE_TEMPLATE = os.path.abspath("certificate_template_.pdf") |
|
|
| |
| PREGUNTAS = [ |
| {"pregunta": "1. ¿Qué debe hacer el Operador cuando se active la alerta de la pantalla naranja? 🚨", |
| "opciones": ["A) Ignorar la alerta ❌🚨", "B) Informar al Jefe de Tienda 👨💼", |
| "C) Recontar el dinero 💰", "D) Cerrar el turno 🔒"], |
| "respuesta": "B"}, |
| |
| {"pregunta": "2. ¿Quién verifica la cantidad de dinero? 💼💵", |
| "opciones": ["A) El encargado de turno 👷♂️", "B) El Jefe de Tienda ❌", |
| "C) El Supervisor 👮♂️", "D) El Operador nuevamente 🔄"], |
| "respuesta": "A"}, |
| |
| {"pregunta": "3. ¿Hasta cuándo es responsable el Operador del efectivo? ⏳", |
| "opciones": ["A) Hasta finalizar su jornada 🕔", "B) Hasta la última recogida 📦", |
| "C) Hasta cerrar el turno 🔐", "D) Hasta entregar al Jefe 👨💼"], |
| "respuesta": "B"}, |
| |
| {"pregunta": "4. ¿Qué hace el Jefe tras recoger el dinero? 💼➡️📦", |
| "opciones": ["A) Depositar en caja 🏦", "B) Contar nuevamente 🔢", |
| "C) Actualizar ISSWEB 💻✅", "D) Entregar al Supervisor 👮♂️"], |
| "respuesta": "C"}, |
| |
| {"pregunta": "5. ¿Qué hacer con cada sobre antes de guardarlo? 📦👀", |
| "opciones": ["A) Mostrar 5s al CCTV ⏱️✅", "B) Contar otra vez 🔢", |
| "C) Entregar al Jefe 👨💼", "D) Guardar en bolsillo 👖"], |
| "respuesta": "A"} |
| ] |
|
|
| |
| def cargar_datos(): |
| """Carga los datos de Excel y los devuelve en un diccionario.""" |
| try: |
| df = pd.read_excel(EXCEL_PATH, dtype={'SAPId': 'int32'}) |
| for col in ['Fecha', 'Calificacion', 'Curso']: |
| if col not in df.columns: |
| df[col] = None |
| return df.set_index('SAPId').to_dict('index') |
| except Exception as e: |
| print(f"Error cargando datos: {str(e)}") |
| return {} |
|
|
| def actualizar_excel(sap_id, aprobado): |
| """Actualiza la hoja de Excel con el estado del usuario.""" |
| try: |
| df = pd.read_excel(EXCEL_PATH) |
| mask = df['SAPId'] == sap_id |
| df.loc[mask, 'Fecha'] = datetime.now().strftime('%Y-%m-%d') |
| df.loc[mask, 'Calificacion'] = 'aprobado' if aprobado else 'no aprobado' |
| df.loc[mask, 'Curso'] = 'Manual De Operador De Tienda' |
| df.to_excel(EXCEL_PATH, index=False) |
| except Exception as e: |
| print(f"Error actualizando Excel: {str(e)}") |
|
|
| |
| @cl.on_chat_start |
| async def inicio(): |
| """Inicia el chat y solicita el SAP ID.""" |
| SAP_DICT = cargar_datos() |
| cl.user_session.set("SAP_DICT", SAP_DICT) |
| await cl.Message("🚀 **Sistema de Certificación - Operador de Tienda**\nIngrese su SAP ID:").send() |
|
|
| @cl.on_message |
| async def manejar_mensaje(message: cl.Message): |
| """Maneja la entrada del usuario.""" |
| SAP_DICT = cl.user_session.get("SAP_DICT") |
| user_data = cl.user_session.get("user_data") |
|
|
| |
| if user_data and user_data.get("en_examen"): |
| await procesar_respuesta(message) |
| return |
|
|
| |
| try: |
| sap_id = int(message.content) |
| if sap_id not in SAP_DICT: |
| return await cl.Message("⛔️ ID no registrado").send() |
|
|
| usuario = SAP_DICT[sap_id] |
| respuesta = f""" |
| 🔍 **Usuario identificado** 🔍 |
| ├ N° Personal: {usuario['Número de personal']} |
| ├ Status: {usuario['Status ocupación']} |
| ├ Función: {usuario['FuncionName']} |
| └ Centro Coste: {usuario['Centro de coste']} |
| """ |
|
|
| cl.user_session.set("user_data", { |
| "sap_id": sap_id, |
| "puntaje": 0, |
| "pregunta_actual": 0, |
| "en_examen": True |
| }) |
|
|
| await cl.Message(content=respuesta).send() |
| await mostrar_pregunta() |
|
|
| except ValueError: |
| await cl.Message("⚠️ Solo números permitidos (Ej: 60000001)").send() |
|
|
| async def mostrar_pregunta(): |
| """Muestra la siguiente pregunta del examen.""" |
| user_data = cl.user_session.get("user_data") |
| num_pregunta = user_data["pregunta_actual"] |
|
|
| if num_pregunta >= len(PREGUNTAS): |
| await mostrar_resultado() |
| return |
|
|
| p = PREGUNTAS[num_pregunta] |
| mensaje = f"📝 **Pregunta {num_pregunta + 1}**\n{p['pregunta']}\n\n" + "\n".join(p['opciones']) |
| await cl.Message(content=mensaje + "\n\n🔘 Respuesta (A/B/C/D):").send() |
|
|
| async def procesar_respuesta(message: cl.Message): |
| """Procesa la respuesta de la pregunta actual.""" |
| user_data = cl.user_session.get("user_data") |
| num_pregunta = user_data["pregunta_actual"] |
|
|
| respuesta = message.content.strip().upper() |
| if respuesta not in ['A', 'B', 'C', 'D']: |
| await cl.Message("⚠️ **Formato incorrecto**. Respuesta = A/B/C/D").send() |
| return |
|
|
| correcta = PREGUNTAS[num_pregunta]['respuesta'] |
| if respuesta == correcta: |
| user_data["puntaje"] += 1 |
| await cl.Message("✅ ¡Respuesta correcta! +1 punto").send() |
| else: |
| await cl.Message(f"❌ Respuesta incorrecta. La correcta es: {correcta}").send() |
|
|
| user_data["pregunta_actual"] += 1 |
| cl.user_session.set("user_data", user_data) |
|
|
| if user_data["pregunta_actual"] < len(PREGUNTAS): |
| await mostrar_pregunta() |
| else: |
| await mostrar_resultado() |
|
|
| async def mostrar_resultado(): |
| """Muestra el resultado final del examen y adjunta el PDF o el audio.""" |
| user_data = cl.user_session.get("user_data") |
| aprobado = user_data['puntaje'] >= 3 |
| actualizar_excel(user_data['sap_id'], aprobado) |
|
|
| mensaje = f"🎉 **¡FELICITACIONES! APROBADO**\n" if aprobado else f"❌ **LO SENTIMOS, REPROBADO**\n" |
| mensaje += f"Puntaje Final: {user_data['puntaje']}/5" |
|
|
| if aprobado and os.path.exists(CERTIFICATE_TEMPLATE): |
| await cl.Message(content=mensaje, elements=[cl.File(path=CERTIFICATE_TEMPLATE, name="Certificado de Aprobación")]).send() |
| elif not aprobado and os.path.exists(AUDIO_PATH): |
| await cl.Message(content=mensaje, elements=[cl.Audio(name="🎧 Material de Refuerzo", path=AUDIO_PATH, display="inline", autoplay=False)]).send() |
| else: |
| await cl.Message(content=mensaje + "\n⚠️ **Error: No se encontró el archivo**").send() |
|
|
| cl.user_session.set("user_data", None) |
| await cl.Message(content="\nIngrese nuevo SAP ID para otro examen:").send() |
|
|
| if __name__ == "__main__": |
| from chainlit.cli import run_chainlit |
| run_chainlit(__file__) |
|
|