import time import os import uuid import traceback import joblib import streamlit as st from dotenv import load_dotenv from streamlit.runtime.scriptrunner import get_script_run_ctx from system_prompts import get_unified_email_prompt from session_state import ( SessionState, DEFAULT_GEMINI_MODEL, DATA_DIR, ) # Inicializar el estado de la sesión state = SessionState() STREAM_SETTINGS = {'batch_size': 1, 'delay_seconds': 0.01} user_past_chats_list_path = None def get_user_namespace(): """ Obtiene un namespace para persistencia. - Si CHATBOT_USER_NAMESPACE está definido, se usa ese valor (recomendado para app de un solo usuario). - Si MULTI_USER_MODE=true, usa session_id para aislar por sesión. - Caso contrario, usa un user_id persistente en query params para aislar por usuario y sobrevivir reinicios. """ configured_namespace = os.environ.get('CHATBOT_USER_NAMESPACE') if configured_namespace: return configured_namespace is_multi_user_mode = os.environ.get('MULTI_USER_MODE', 'false').strip().lower() == 'true' if is_multi_user_mode: context = get_script_run_ctx() if context and getattr(context, 'session_id', None): return context.session_id user_id = st.query_params.get('uid') if not user_id: user_id = uuid.uuid4().hex st.query_params['uid'] = user_id return f'user_{user_id}' # Función para detectar saludos y generar respuestas personalizadas def is_greeting(text): """Detecta si el texto es un saludo simple""" text = text.lower().strip() greetings = ['hola', 'hey', 'saludos', 'buenos días', 'buenas tardes', 'buenas noches', 'hi', 'hello'] # Solo considerar como saludo si es el primer mensaje del usuario # y es un saludo simple is_simple_greeting = any(greeting in text for greeting in greetings) and len(text.split()) < 4 return is_simple_greeting and len(state.messages) == 0 # Función para procesar mensajes (unifica la lógica de procesamiento) def process_message(prompt, is_example=False): """Procesa un mensaje del usuario, ya sea directo o de un ejemplo""" handle_chat_title(prompt) with st.chat_message('user', avatar=USER_AVATAR_ICON): st.markdown(prompt) state.add_message('user', prompt, USER_AVATAR_ICON) # Obtener el prompt mejorado primero enhanced_prompt = get_enhanced_prompt(prompt, is_example) # Mover la respuesta del modelo después del mensaje del usuario with st.chat_message(MODEL_ROLE, avatar=AI_AVATAR_ICON): try: message_placeholder = st.empty() typing_indicator = st.empty() typing_indicator.markdown("*Generando respuesta...*") response = state.send_message(enhanced_prompt) full_response = stream_response(response, message_placeholder, typing_indicator, STREAM_SETTINGS) if full_response: state.add_message(MODEL_ROLE, full_response, AI_AVATAR_ICON) if hasattr(state.chat, 'get_history'): state.gemini_history = state.chat.get_history() else: state.gemini_history = getattr(state.chat, 'history', []) state.save_chat_history() except Exception as e: show_detailed_error("process_message", e) return def show_detailed_error(context, error): """Muestra errores con contexto y traza para facilitar debug en producción.""" st.error(f"Ocurrió un error en {context}. Intenta de nuevo.") with st.expander("Ver detalles técnicos del error"): st.code(f"{type(error).__name__}: {error}\n\n{traceback.format_exc()}") def handle_chat_title(prompt): """Maneja la lógica del título del chat""" if state.chat_id not in past_chats: temp_title = f'SesiónChat-{state.chat_id}' generated_title = state.generate_chat_title(prompt) state.chat_title = generated_title or temp_title past_chats[state.chat_id] = state.chat_title else: state.chat_title = past_chats[state.chat_id] joblib.dump(past_chats, user_past_chats_list_path) def get_enhanced_prompt(prompt, is_example): """Genera el prompt mejorado según el tipo de mensaje""" if is_greeting(prompt): return ( "Responde ÚNICAMENTE con esta frase, sin agregar nada más: " "\"¡Perfecto! Empecemos por la primera: " "¿Quién es tu audiencia ideal para este correo? " "Descríbela con detalle (contexto, problema principal, deseo y nivel de conciencia).\"" ) elif is_example: return ( f"El usuario seleccionó esta pregunta del menú: '{prompt}'. " "Respóndela de forma directa, útil y conversacional, con ejemplos concretos. " "Después de responder, invita al usuario a iniciar el flujo de 5 preguntas en este orden: audiencia, producto, nombre, CTA y ángulo." ) return prompt def stream_response(response, message_placeholder, typing_indicator, stream_settings): """Maneja el streaming de la respuesta""" full_response = '' batch_size = max(1, int(stream_settings.get('batch_size', 24))) delay_seconds = max(0.0, float(stream_settings.get('delay_seconds', 0.0))) pending_chars = 0 try: for chunk in response: if chunk.text: for ch in chunk.text: full_response += ch pending_chars += 1 if pending_chars >= batch_size: if delay_seconds: time.sleep(delay_seconds) message_placeholder.markdown(full_response + '▌') pending_chars = 0 except Exception as e: show_detailed_error("stream_response", e) return '' if pending_chars > 0: if delay_seconds: time.sleep(delay_seconds) message_placeholder.markdown(full_response + '▌') typing_indicator.empty() message_placeholder.markdown(full_response) return full_response # Función para cargar CSS personalizado def load_css(file_path): with open(file_path) as f: st.markdown(f'', unsafe_allow_html=True) # Intentar cargar el CSS personalizado con ruta absoluta para mayor seguridad try: css_path = os.path.join(os.path.dirname(__file__), 'static', 'css', 'style.css') load_css(css_path) except Exception as e: print(f"Error al cargar CSS: {e}") # Si el archivo no existe, crear un estilo básico en línea st.markdown(""" """, unsafe_allow_html=True) # Función de utilidad para mostrar la carátula inicial def display_initial_header(): col1, col2, col3 = st.columns([1, 2, 1]) with col2: # Centrar la imagen st.markdown(""" """, unsafe_allow_html=True) st.image("robocopy_logo.png", width=300, use_container_width=True) # Título con diseño responsivo (eliminado el símbolo ∞) st.markdown("""
By Jesús Cabrera
✉️ Experto en emails narrativos que conectan historias con ventas de forma natural