import streamlit as st import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import requests from prophet import Prophet from datetime import datetime import numpy as np # Configuración de la aplicación st.set_page_config(page_title="Análisis de Datos del Banco Mundial", layout="wide") st.cache_data.clear() # Diccionario de indicadores por área de enfoque INDICADORES = { "Personas": [ { "Indicador": "SP.POP.TOTL", "Nombre": "Población Total" }, { "Indicador": "SP.POP.GROW", "Nombre": "Crecimiento Poblacional (%)" }, { "Indicador": "SP.RUR.TOTL.ZS", "Nombre": "Porcentaje de Población Rural (%)" }, { "Indicador": "SP.DYN.LE00.IN", "Nombre": "Esperanza de Vida al Nacer (años)" }, { "Indicador": "SP.POP.1564.TO", "Nombre": "Población en Edad de Trabajar" } ], "Prosperidad": [ { "Indicador": "NY.GDP.PCAP.CD", "Nombre": "PIB per cápita (US$)" }, { "Indicador": "NY.GDP.MKTP.CD", "Nombre": "PIB Total (US$)" }, { "Indicador": "SL.UEM.TOTL.ZS", "Nombre": "Tasa de Desempleo (%)" }, { "Indicador": "SI.POV.DDAY", "Nombre": "Porcentaje de Población en Pobreza (%)" }, { "Indicador": "NE.EXP.GNFS.ZS", "Nombre": "Exportaciones de Bienes y Servicios (% del PIB)" } ], "Planeta": [ { "Indicador": "AG.LND.FRST.ZS", "Nombre": "Tasa de deforestación (%) anual" }, { "Indicador": "EN.ATM.PM25.MC.M3", "Nombre": "Concentración de partículas PM2.5 (µg/m³)" }, { "Indicador": "EG.USE.PCAP.KG.OE", "Nombre": "Uso de Energía per Cápita (kg petróleo equiv.)" }, { "Indicador": "ER.LND.PTLD.ZS", "Nombre": "Áreas Terrestres Protegidas (%)" }, { "Indicador": "AG.LND.TOTL.K2", "Nombre": "Superficie Total de Tierra (km²)" } ], "Infraestructura": [ { "Indicador": "EG.ELC.ACCS.ZS", "Nombre": "Acceso a Electricidad (% de población)" }, { "Indicador": "IT.MFD.TOTL.ZS", "Nombre": "Acceso a Tecnología Móvil (%)" }, { "Indicador": "IT.NET.USER.ZS", "Nombre": "Acceso a Internet (%)" }, { "Indicador": "SL.TLF.TOTL.IN", "Nombre": "Fuerza Laboral Total" }, { "Indicador": "EG.USE.PCAP.KG.OE", "Nombre": "Uso de Energía per Cápita (kg petróleo equiv.)" } ], "Digital": [ { "Indicador": "IT.NET.USER.ZS", "Nombre": "Usuarios de Internet (%)" }, { "Indicador": "IT.CEL.SETS.P2", "Nombre": "Suscripciones Móviles (por 100 personas)" }, { "Indicador": "SP.DYN.TFRT.IN", # Corregido el código del indicador "Nombre": "Tasa de Fertilidad (nacimientos por mujer)" }, { "Indicador": "IT.MFD.TOTL.ZS", "Nombre": "Acceso a Tecnología Móvil (%)" }, { "Indicador": "IT.NET.BBND.P2", # Corregido el código del indicador "Nombre": "Acceso a Internet de Banda Ancha (%)" } ] } # Lista de categorías no deseadas (agregaciones regionales, etc.) CATEGORIAS_NO_DESEADAS = [ ] @st.cache_data(ttl=3600) # Cache con tiempo de vida de 1 hora def obtener_datos(indicador): """Obtiene datos del Banco Mundial para todos los países.""" url = f"http://api.worldbank.org/v2/country/all/indicator/{indicador}?format=json&per_page=5000" try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json()[1] if not data: st.error(f"No se encontraron datos para el indicador {indicador}.") return None df = pd.json_normalize(data) df['country.value'] = df['country.value'].str.strip().str.title() df = df[~df['country.value'].isin([cat.strip().title() for cat in CATEGORIAS_NO_DESEADAS])] if df.empty: st.warning("No hay datos disponibles después de filtrar categorías no deseadas.") return None return df except requests.Timeout: st.error("Tiempo de espera agotado al conectar con el Banco Mundial.") return None except requests.RequestException as e: st.error(f"Error al conectar con el Banco Mundial: {str(e)}") return None except (IndexError, KeyError, TypeError) as e: st.error(f"Error al procesar los datos: {str(e)}") return None @st.cache_data(ttl=3600) def obtener_datos_mundo(indicador): """Obtiene datos del Banco Mundial solo para el mundo.""" url = f"http://api.worldbank.org/v2/country/WLD/indicator/{indicador}?format=json&per_page=5000" try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json()[1] if not data: st.error(f"No se encontraron datos mundiales para el indicador {indicador}.") return None df = pd.json_normalize(data) return df except Exception as e: st.error(f"Error al obtener datos mundiales: {str(e)}") return None def prepare_prophet_data(df): """Prepara los datos para Prophet.""" try: df = df.rename(columns={'date': 'ds', 'value': 'y'}) df['ds'] = pd.to_datetime(df['ds'], format='%Y') df = df[['ds', 'y']].sort_values('ds') df = df.dropna() return df except Exception as e: st.error(f"Error al preparar datos para Prophet: {str(e)}") return None def make_forecast(df, periods=60): """Realiza la predicción con Prophet.""" try: model = Prophet( yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False, seasonality_mode='multiplicative', interval_width=0.95 ) model.fit(df) future = model.make_future_dataframe(periods=periods, freq='Y') forecast = model.predict(future) return forecast except Exception as e: st.error(f"Error al realizar la predicción: {str(e)}") return None def plot_forecast_comparison(historical_data, forecast_data, title): """Crea un gráfico comparativo de datos históricos y predicción.""" try: fig = make_subplots( rows=1, cols=1, #subplot_titles=('Datos Históricos y Predicción'), vertical_spacing=0.15 ) # Datos históricos y predicción fig.add_trace( go.Scatter( x=historical_data['ds'], y=historical_data['y'], name='Datos Históricos', line=dict(color='blue') ), row=1, col=1 ) fig.add_trace( go.Scatter( x=forecast_data['ds'], y=forecast_data['yhat'], name='Predicción', line=dict(color='red') ), row=1, col=1 ) # Intervalos de confianza fig.add_trace( go.Scatter( x=forecast_data['ds'], y=forecast_data['yhat_upper'], fill=None, mode='lines', line=dict(color='rgba(255,0,0,0.2)'), name='Límite Superior' ), row=1, col=1 ) fig.add_trace( go.Scatter( x=forecast_data['ds'], y=forecast_data['yhat_lower'], fill='tonexty', mode='lines', line=dict(color='rgba(255,0,0,0.2)'), name='Límite Inferior' ), row=1, col=1 ) fig.update_layout( height=600, title_text=title, showlegend=True ) return fig except Exception as e: st.error(f"Error al crear el gráfico: {str(e)}") return None # Interfaz de usuario st.title("📊 Análisis de Datos del Banco Mundial por Áreas de Enfoque") # Selección de área de enfoque y indicador area_seleccionada = st.selectbox("Selecciona un área de enfoque", list(INDICADORES.keys())) indicador_seleccionado = st.selectbox( "Selecciona un indicador", [i["Nombre"] for i in INDICADORES[area_seleccionada]] ) # Obtener el indicador correspondiente indicador_info = next( i for i in INDICADORES[area_seleccionada] if i["Nombre"] == indicador_seleccionado ) # Obtener datos mundiales y realizar predicción df_mundo = obtener_datos_mundo(indicador_info["Indicador"]) if df_mundo is not None: # Preparar datos para visualización histórica df_mundo_hist = df_mundo[['date', 'value']].copy() df_mundo_hist['date'] = pd.to_datetime(df_mundo_hist['date'], format='%Y') df_mundo_hist = df_mundo_hist.sort_values(by='date', ascending=True) # Preparar datos para Prophet y realizar predicción df_prophet = prepare_prophet_data(df_mundo[['date', 'value']]) if df_prophet is not None: forecast = make_forecast(df_prophet) if forecast is not None: # Mostrar gráficos en pestañas tab1, tab2 = st.tabs(["📈 Datos Históricos", "🔮 Predicción"]) with tab1: st.subheader("📅 Evolución del Indicador a lo Largo de los Años") fig_hist = px.line( df_mundo_hist, x='date', y='value', title=f"Evolución de {indicador_info['Nombre']} (Mundial)", labels={'date': 'Año', 'value': indicador_info['Nombre']} ) st.plotly_chart(fig_hist, use_container_width=True) with tab2: st.subheader("🔮 Predicción para los Próximos 60 Años") fig_forecast = plot_forecast_comparison( df_prophet, forecast, f"Predicción de {indicador_info['Nombre']} - Mundial" ) if fig_forecast is not None: st.plotly_chart(fig_forecast, use_container_width=True) # Métricas de predicción st.subheader("📊 Métricas Clave de la Predicción") col1, col2, col3 = st.columns(3) with col1: ultimo_valor = df_prophet['y'].iloc[-1] st.metric("Último Valor Histórico", f"{ultimo_valor:.2f}") with col2: valor_predicho = forecast['yhat'].iloc[-1] st.metric("Valor Predicho (60 años)", f"{valor_predicho:.2f}") with col3: cambio_porcentual = ((valor_predicho - ultimo_valor) / ultimo_valor) * 100 st.metric("Cambio Porcentual Esperado", f"{cambio_porcentual:.1f}%") # Información sobre la predicción st.info(""" 📈 **Información sobre la Predicción** - La predicción se realiza utilizando Facebook Prophet - Se consideran tendencias anuales y patrones históricos - El área sombreada representa el intervalo de confianza de la predicción - Las tendencias se calculan utilizando medias móviles para datos históricos """) # Obtener y mostrar datos de países df_paises = obtener_datos(indicador_info["Indicador"]) if df_paises is not None and not df_paises.empty: # Filtrar los datos más recientes df_paises = df_paises[df_paises['value'].notna()] ultimo_anio = df_paises['date'].max() df_paises = df_paises[df_paises['date'] == ultimo_anio] # Ordenar de mayor a menor df_paises = df_paises.sort_values(by='value', ascending=False) # Mostrar gráfico de barras y tablas comparativas st.subheader(f"🌍 {indicador_info['Nombre']} - Comparativa por Países ({ultimo_anio})") col1, col2 = st.columns(2) with col1: st.write("📈 **Top 20 Valores Más Altos**") top_20 = df_paises[['country.value', 'value']].head(20).rename( columns={'country.value': 'País', 'value': 'Valor'} ) st.dataframe( top_20.style.format({'Valor': '{:.2f}'}), hide_index=True, use_container_width=True ) with col2: st.write("📉 **Top 20 Valores Más Bajos**") bottom_20 = df_paises[['country.value', 'value']].tail(20).rename( columns={'country.value': 'País', 'value': 'Valor'} ) st.dataframe( bottom_20.style.format({'Valor': '{:.2f}'}), hide_index=True, use_container_width=True ) # Visualización interactiva de los top 20 países fig_paises = px.bar( df_paises.head(20), x='country.value', y='value', title=f"{indicador_info['Nombre']} por País (Top 20)", labels={ 'country.value': 'País', 'value': indicador_info['Nombre'] }, text='value' ) fig_paises.update_traces( texttemplate='%{text:.2f}', textposition='outside' ) fig_paises.update_layout( xaxis_tickangle=-45, height=600, showlegend=False ) st.plotly_chart(fig_paises, use_container_width=True) else: st.warning(f"No hay datos disponibles de países para el indicador {indicador_info['Nombre']} en esta área de enfoque.") # Agregar una sección de chatbot basado en el contexto del indicador seleccionado st.markdown("---") st.subheader("💬 Consulta a nuestro asistente virtual sobre este indicador") # Configuración de la API de Hugging Face API_URL = "https://api-inference.huggingface.co/models/deepseek-ai/DeepSeek-R1-Distill-Qwen-32B" # Inicializar la sesión state para el historial de chat si no existe if 'chat_history' not in st.session_state: st.session_state.chat_history = [] # Función para obtener la API key de Hugging Face desde secrets @st.cache_resource def get_huggingface_api_key(): """Obtener la API key de Hugging Face desde secrets""" try: # Intenta acceder al token usando la clave HF_TOKEN return st.secrets["HF_TOKEN"] except KeyError: # Si no está disponible con esa clave, intenta el formato anterior try: return st.secrets["huggingface"]["api_key"] except: return None # Obtener la API key api_key = get_huggingface_api_key() if not api_key: api_key = st.text_input("Ingresa tu API key de Hugging Face:", type="password") if not api_key: st.warning("Por favor ingresa una API key de Hugging Face para usar el chatbot.") st.stop() # Función para enviar solicitudes a la API de Hugging Face def query_huggingface(payload): """Envía una solicitud a la API de Hugging Face y retorna la respuesta""" headers = {"Authorization": f"Bearer {api_key}"} try: response = requests.post(API_URL, headers=headers, json=payload, timeout=60) response.raise_for_status() return response.json() except requests.exceptions.Timeout: st.error("La solicitud a la API de Hugging Face ha excedido el tiempo de espera.") return None except requests.exceptions.HTTPError as e: st.error(f"Error HTTP: {e.response.status_code} - {e.response.text}") return None except Exception as e: st.error(f"Error al comunicarse con la API de Hugging Face: {str(e)}") return None # Preparar el contexto basado en los datos seleccionados def prepare_context(): """Prepara el contexto para el chatbot basado en el indicador seleccionado""" context = f""" Información sobre el indicador '{indicador_seleccionado}' ({indicador_info['Indicador']}): - Área de enfoque: {area_seleccionada} """ # Verificar si las variables existen en el contexto actual antes de usarlas if 'ultimo_anio' in locals() or 'ultimo_anio' in globals(): context += f"- Último año con datos: {ultimo_anio}\n" # Agregar información sobre valores mundiales si está disponible if ('df_mundo_hist' in locals() or 'df_mundo_hist' in globals()) and 'df_mundo_hist' is not None and not df_mundo_hist.empty: ultimo_valor_mundial = df_mundo_hist.iloc[-1]['value'] if not df_mundo_hist.empty else "No disponible" context += f"- Último valor mundial registrado: {ultimo_valor_mundial}\n" # Agregar información sobre predicción si está disponible if ('forecast' in locals() or 'forecast' in globals()) and forecast is not None: valor_predicho = forecast['yhat'].iloc[-1] context += f"- Valor predicho para dentro de 60 años: {valor_predicho:.2f}\n" if 'cambio_porcentual' in locals() or 'cambio_porcentual' in globals(): context += f"- Cambio porcentual esperado: {cambio_porcentual:.1f}%\n" # Agregar información sobre países top si está disponible if ('top_20' in locals() or 'top_20' in globals()) and top_20 is not None and not top_20.empty: top_3_paises = top_20.head(3) context += "- Top 3 países con valores más altos:\n" for _, row in top_3_paises.iterrows(): context += f" * {row['País']}: {row['Valor']:.2f}\n" return context # Interfaz del chatbot st.info("Puedes preguntar cualquier cosa sobre este indicador, su evolución histórica, predicciones futuras o comparar países.") # Crear el widget de entrada de usuario user_input = st.text_input("Tu pregunta:", key="user_query", placeholder="Ej: ¿Cuál es la tendencia esperada para este indicador?") # Crear un contenedor para el historial de chat chat_container = st.container() # Procesar la entrada del usuario if user_input: # Preparar el contexto context = prepare_context() # Construir el prompt para el modelo prompt = f""" Eres un asistente especializado en datos del Banco Mundial y análisis económico. CONTEXTO: {context} PREGUNTA DEL USUARIO: {user_input} Responde de manera concisa y útil, basándote en el contexto proporcionado. """ # Mostrar un mensaje de espera personalizado con icono with st.spinner("🧠 Pensando..."): # Llamar a la API de Hugging Face payload = { "inputs": prompt, "parameters": { "max_new_tokens": 250, "temperature": 0.7, "top_p": 0.9, "do_sample": True } } response = query_huggingface(payload) if response: # Extraer la respuesta del modelo if isinstance(response, list) and len(response) > 0: bot_response = response[0].get("generated_text", "") # Intentar extraer solo la respuesta del asistente (después del prompt) try: bot_response = bot_response.split("Responde de manera concisa y útil")[-1] if "PREGUNTA DEL USUARIO:" in bot_response: bot_response = bot_response.split("PREGUNTA DEL USUARIO:")[-1] bot_response = bot_response.strip() except: # Si falla la extracción, usar la respuesta completa pass else: bot_response = str(response) # Agregar al historial de chat st.session_state.chat_history.append({"role": "user", "content": user_input}) st.session_state.chat_history.append({"role": "assistant", "content": bot_response}) # Mostrar el historial de chat with chat_container: for message in st.session_state.chat_history: if message["role"] == "user": st.markdown(f"**😀 Tú:** {message['content']}") else: st.markdown(f"**🤖 Asistente:** {message['content']}") # Información sobre el modelo st.markdown("---") st.caption("Asistente virtual potenciado por deepseek-ai/DeepSeek-R1-Distill-Qwen-32B a través de Hugging Face")