| import time |
| import os |
| import uuid |
| import traceback |
| import joblib |
| import streamlit as st |
| from dotenv import load_dotenv |
| from streamlit.runtime.scriptrunner import get_script_run_ctx |
| from system_prompts import get_unified_email_prompt |
| from session_state import ( |
| SessionState, |
| DEFAULT_GEMINI_MODEL, |
| DATA_DIR, |
| ) |
|
|
| |
| state = SessionState() |
| STREAM_SETTINGS = {'batch_size': 1, 'delay_seconds': 0.01} |
| user_past_chats_list_path = None |
|
|
| def get_user_namespace(): |
| """ |
| Obtiene un namespace para persistencia. |
| - Si CHATBOT_USER_NAMESPACE está definido, se usa ese valor (recomendado para app de un solo usuario). |
| - Si MULTI_USER_MODE=true, usa session_id para aislar por sesión. |
| - Caso contrario, usa un user_id persistente en query params para aislar por usuario y sobrevivir reinicios. |
| """ |
| configured_namespace = os.environ.get('CHATBOT_USER_NAMESPACE') |
| if configured_namespace: |
| return configured_namespace |
|
|
| is_multi_user_mode = os.environ.get('MULTI_USER_MODE', 'false').strip().lower() == 'true' |
| if is_multi_user_mode: |
| context = get_script_run_ctx() |
| if context and getattr(context, 'session_id', None): |
| return context.session_id |
|
|
| user_id = st.query_params.get('uid') |
| if not user_id: |
| user_id = uuid.uuid4().hex |
| st.query_params['uid'] = user_id |
| return f'user_{user_id}' |
|
|
| |
| def is_greeting(text): |
| """Detecta si el texto es un saludo simple""" |
| text = text.lower().strip() |
| greetings = ['hola', 'hey', 'saludos', 'buenos días', 'buenas tardes', 'buenas noches', 'hi', 'hello'] |
| |
| |
| |
| is_simple_greeting = any(greeting in text for greeting in greetings) and len(text.split()) < 4 |
| return is_simple_greeting and len(state.messages) == 0 |
|
|
| |
| def process_message(prompt, is_example=False): |
| """Procesa un mensaje del usuario, ya sea directo o de un ejemplo""" |
| handle_chat_title(prompt) |
| |
| with st.chat_message('user', avatar=USER_AVATAR_ICON): |
| st.markdown(prompt) |
| |
| state.add_message('user', prompt, USER_AVATAR_ICON) |
| |
| |
| enhanced_prompt = get_enhanced_prompt(prompt, is_example) |
| |
| |
| with st.chat_message(MODEL_ROLE, avatar=AI_AVATAR_ICON): |
| try: |
| message_placeholder = st.empty() |
| typing_indicator = st.empty() |
| typing_indicator.markdown("*Generando respuesta...*") |
| |
| response = state.send_message(enhanced_prompt) |
| full_response = stream_response(response, message_placeholder, typing_indicator, STREAM_SETTINGS) |
| |
| if full_response: |
| state.add_message(MODEL_ROLE, full_response, AI_AVATAR_ICON) |
| if hasattr(state.chat, 'get_history'): |
| state.gemini_history = state.chat.get_history() |
| else: |
| state.gemini_history = getattr(state.chat, 'history', []) |
| state.save_chat_history() |
| |
| except Exception as e: |
| show_detailed_error("process_message", e) |
| return |
|
|
| def show_detailed_error(context, error): |
| """Muestra errores con contexto y traza para facilitar debug en producción.""" |
| st.error(f"Ocurrió un error en {context}. Intenta de nuevo.") |
| with st.expander("Ver detalles técnicos del error"): |
| st.code(f"{type(error).__name__}: {error}\n\n{traceback.format_exc()}") |
|
|
| def handle_chat_title(prompt): |
| """Maneja la lógica del título del chat""" |
| if state.chat_id not in past_chats: |
| temp_title = f'SesiónChat-{state.chat_id}' |
| generated_title = state.generate_chat_title(prompt) |
| state.chat_title = generated_title or temp_title |
| past_chats[state.chat_id] = state.chat_title |
| else: |
| state.chat_title = past_chats[state.chat_id] |
| joblib.dump(past_chats, user_past_chats_list_path) |
|
|
| def get_enhanced_prompt(prompt, is_example): |
| """Genera el prompt mejorado según el tipo de mensaje""" |
| if is_greeting(prompt): |
| return ( |
| "Responde ÚNICAMENTE con esta frase, sin agregar nada más: " |
| "\"¡Perfecto! Empecemos por la primera: " |
| "¿Quién es tu audiencia ideal para este correo? " |
| "Descríbela con detalle (contexto, problema principal, deseo y nivel de conciencia).\"" |
| ) |
| elif is_example: |
| return ( |
| f"El usuario seleccionó esta pregunta del menú: '{prompt}'. " |
| "Respóndela de forma directa, útil y conversacional, con ejemplos concretos. " |
| "Después de responder, invita al usuario a iniciar el flujo de 5 preguntas en este orden: audiencia, producto, nombre, CTA y ángulo." |
| ) |
| return prompt |
|
|
| def stream_response(response, message_placeholder, typing_indicator, stream_settings): |
| """Maneja el streaming de la respuesta""" |
| full_response = '' |
| batch_size = max(1, int(stream_settings.get('batch_size', 24))) |
| delay_seconds = max(0.0, float(stream_settings.get('delay_seconds', 0.0))) |
| pending_chars = 0 |
|
|
| try: |
| for chunk in response: |
| if chunk.text: |
| for ch in chunk.text: |
| full_response += ch |
| pending_chars += 1 |
| if pending_chars >= batch_size: |
| if delay_seconds: |
| time.sleep(delay_seconds) |
| message_placeholder.markdown(full_response + '▌') |
| pending_chars = 0 |
| except Exception as e: |
| show_detailed_error("stream_response", e) |
| return '' |
|
|
| if pending_chars > 0: |
| if delay_seconds: |
| time.sleep(delay_seconds) |
| message_placeholder.markdown(full_response + '▌') |
|
|
| typing_indicator.empty() |
| message_placeholder.markdown(full_response) |
| return full_response |
|
|
| |
| def load_css(file_path): |
| with open(file_path) as f: |
| st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True) |
|
|
| |
| try: |
| css_path = os.path.join(os.path.dirname(__file__), 'static', 'css', 'style.css') |
| load_css(css_path) |
| except Exception as e: |
| print(f"Error al cargar CSS: {e}") |
| |
| st.markdown(""" |
| <style> |
| .robocopy-title { |
| color: white !important; |
| font-weight: bold; |
| font-size: clamp(2.5em, 5vw, 4em); |
| line-height: 1.2; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
| |
| def display_initial_header(): |
| col1, col2, col3 = st.columns([1, 2, 1]) |
| with col2: |
| |
| st.markdown(""" |
| <style> |
| div.stImage { |
| text-align: center; |
| display: block; |
| margin-left: auto; |
| margin-right: auto; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
| st.image("robocopy_logo.png", width=300, use_container_width=True) |
| |
| |
| st.markdown(""" |
| <div style='text-align: center; margin-top: -35px; width: 100%;'> |
| <h1 class='robocopy-title' style='width: 100%; text-align: center; color: white !important; font-size: clamp(2.5em, 5vw, 4em); line-height: 1.2;'>Email Story Creator</h1> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown(""" |
| <div style='text-align: center; width: 100%;'> |
| <p style='font-size: 16px; color: white; width: 100%; text-align: center; margin-top: -20px;'>By Jesús Cabrera</p> |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| st.markdown(""" |
| <div style='text-align: center; width: 100%;'> |
| <p style='font-size: 16px; background-color: transparent; padding: 12px; border-radius: 8px; margin-top: -20px; color: white; width: 100%; text-align: center;'> |
| ✉️ Experto en emails narrativos que conectan historias con ventas de forma natural |
| </p> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| def display_examples(): |
| ejemplos = [ |
| {"texto": "Definir audiencia 🎯", "prompt": "Ayúdame a definir una audiencia concreta para este correo: dolor principal, deseo y nivel de conciencia."}, |
| {"texto": "Propuesta de valor 💎", "prompt": "Convierte mi producto en una promesa clara de transformación sin listar características aburridas."}, |
| {"texto": "CTA que convierte 🚀", "prompt": "Dame 3 opciones de CTA claras para este email, con baja fricción y orientadas a una sola acción."}, |
| {"texto": "Asunto + gancho ✉️", "prompt": "Propón 5 asuntos y 3 ganchos de apertura para aumentar aperturas y clics de este correo."} |
| ] |
|
|
| |
| cols = st.columns(4) |
| for idx, ejemplo in enumerate(ejemplos): |
| with cols[idx]: |
| if st.button(ejemplo["texto"], key=f"ejemplo_{idx}", help=ejemplo["prompt"]): |
| st.session_state.pending_example_prompt = ejemplo["prompt"] |
| st.session_state.hide_initial_menu = True |
| st.rerun() |
|
|
| |
| load_dotenv() |
| GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY') |
| if not GOOGLE_API_KEY: |
| st.error("Falta la variable de entorno GOOGLE_API_KEY. Configúrala para continuar.") |
| st.stop() |
|
|
| |
| state.user_namespace = get_user_namespace() |
| user_past_chats_list_path = f'{DATA_DIR}/{state.user_namespace}/past_chats_list' |
| new_chat_id = f'{time.time()}' |
| MODEL_ROLE = 'ai' |
| AI_AVATAR_ICON = '🤖' |
| USER_AVATAR_ICON = '👤' |
|
|
| |
| os.makedirs(f'{DATA_DIR}/{state.user_namespace}', exist_ok=True) |
|
|
| |
| try: |
| past_chats = joblib.load(user_past_chats_list_path) |
| except (FileNotFoundError, EOFError): |
| past_chats = {} |
|
|
| |
| with st.sidebar: |
| st.write('# Chats Anteriores') |
|
|
| if state.chat_id is None: |
| state.chat_id = new_chat_id |
|
|
| if st.button('+ Nuevo chat', key='new_chat_sidebar', use_container_width=True): |
| state.chat_id = new_chat_id |
| st.session_state.pending_example_prompt = None |
| st.session_state.hide_initial_menu = False |
| st.session_state.editing_chat_id = None |
| st.rerun() |
|
|
| st.caption('Sesiones') |
| if 'editing_chat_id' not in st.session_state: |
| st.session_state.editing_chat_id = None |
|
|
| def chat_sort_key(chat_id): |
| try: |
| return float(chat_id) |
| except (TypeError, ValueError): |
| return 0.0 |
|
|
| sorted_chat_ids = sorted(past_chats.keys(), key=chat_sort_key, reverse=True) |
| for index, chat_id in enumerate(sorted_chat_ids): |
| chat_title = past_chats.get(chat_id, f'SesiónChat-{chat_id}') |
| is_active_chat = chat_id == state.chat_id |
| button_label = f'● {chat_title}' if is_active_chat else chat_title |
|
|
| if st.button( |
| button_label, |
| key=f'chat_session_{index}_{chat_id}', |
| use_container_width=True, |
| type='primary' if is_active_chat else 'secondary', |
| ): |
| if state.chat_id != chat_id: |
| state.chat_id = chat_id |
| st.rerun() |
|
|
| state.chat_title = past_chats.get(state.chat_id, f'SesiónChat-{state.chat_id}') |
|
|
| |
| state.load_chat_history() |
|
|
| if 'pending_example_prompt' not in st.session_state: |
| st.session_state.pending_example_prompt = None |
|
|
| if 'hide_initial_menu' not in st.session_state: |
| st.session_state.hide_initial_menu = False |
|
|
| if 'active_chat_id' not in st.session_state: |
| st.session_state.active_chat_id = state.chat_id |
| elif st.session_state.active_chat_id != state.chat_id: |
| st.session_state.active_chat_id = state.chat_id |
| st.session_state.pending_example_prompt = None |
| st.session_state.hide_initial_menu = state.has_messages() |
| st.session_state.editing_chat_id = None |
|
|
| |
| system_prompt = get_unified_email_prompt() |
| if ( |
| st.session_state.get('initialized_model_name') != DEFAULT_GEMINI_MODEL |
| or getattr(state, 'client', None) is None |
| ): |
| try: |
| state.initialize_model(DEFAULT_GEMINI_MODEL, api_key=GOOGLE_API_KEY) |
| st.session_state.initialized_model_name = DEFAULT_GEMINI_MODEL |
| except Exception as e: |
| show_detailed_error("initialize_model", e) |
| st.stop() |
|
|
| should_reinitialize_chat = ( |
| state.chat is None |
| or st.session_state.get('initialized_chat_id') != state.chat_id |
| or st.session_state.get('initialized_system_prompt') != system_prompt |
| ) |
| if should_reinitialize_chat: |
| try: |
| state.initialize_chat(system_instruction=system_prompt) |
| st.session_state.initialized_chat_id = state.chat_id |
| st.session_state.initialized_system_prompt = system_prompt |
| except Exception as e: |
| show_detailed_error("initialize_chat", e) |
| st.stop() |
|
|
| |
| for message in state.messages: |
| with st.chat_message( |
| name=message['role'], |
| avatar=message.get('avatar'), |
| ): |
| st.markdown(message['content']) |
|
|
| |
| user_prompt = st.chat_input('Escribe aquí tus instrucciones') |
|
|
| if state.has_messages(): |
| st.session_state.hide_initial_menu = True |
|
|
| |
| initial_menu_container = st.container() |
| if ( |
| not st.session_state.hide_initial_menu |
| and not state.has_messages() |
| and not user_prompt |
| and not st.session_state.pending_example_prompt |
| ): |
| with initial_menu_container: |
| display_initial_header() |
| display_examples() |
|
|
| |
| if user_prompt: |
| st.session_state.hide_initial_menu = True |
| initial_menu_container.empty() |
| process_message(user_prompt, is_example=False) |
| st.rerun() |
|
|
| |
| if st.session_state.pending_example_prompt: |
| initial_menu_container.empty() |
| process_message(st.session_state.pending_example_prompt, is_example=True) |
| st.session_state.pending_example_prompt = None |
| st.rerun() |
|
|