globalAnalytics / app.py
Migue1804's picture
Upload 2 files
617f328 verified
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import requests
from prophet import Prophet
from datetime import datetime
import numpy as np
# Configuración de la aplicación
st.set_page_config(page_title="Análisis de Datos del Banco Mundial", layout="wide")
st.cache_data.clear()
# Diccionario de indicadores por área de enfoque
INDICADORES = {
"Personas": [
{
"Indicador": "SP.POP.TOTL",
"Nombre": "Población Total"
},
{
"Indicador": "SP.POP.GROW",
"Nombre": "Crecimiento Poblacional (%)"
},
{
"Indicador": "SP.RUR.TOTL.ZS",
"Nombre": "Porcentaje de Población Rural (%)"
},
{
"Indicador": "SP.DYN.LE00.IN",
"Nombre": "Esperanza de Vida al Nacer (años)"
},
{
"Indicador": "SP.POP.1564.TO",
"Nombre": "Población en Edad de Trabajar"
}
],
"Prosperidad": [
{
"Indicador": "NY.GDP.PCAP.CD",
"Nombre": "PIB per cápita (US$)"
},
{
"Indicador": "NY.GDP.MKTP.CD",
"Nombre": "PIB Total (US$)"
},
{
"Indicador": "SL.UEM.TOTL.ZS",
"Nombre": "Tasa de Desempleo (%)"
},
{
"Indicador": "SI.POV.DDAY",
"Nombre": "Porcentaje de Población en Pobreza (%)"
},
{
"Indicador": "NE.EXP.GNFS.ZS",
"Nombre": "Exportaciones de Bienes y Servicios (% del PIB)"
}
],
"Planeta": [
{
"Indicador": "AG.LND.FRST.ZS",
"Nombre": "Tasa de deforestación (%) anual"
},
{
"Indicador": "EN.ATM.PM25.MC.M3",
"Nombre": "Concentración de partículas PM2.5 (µg/m³)"
},
{
"Indicador": "EG.USE.PCAP.KG.OE",
"Nombre": "Uso de Energía per Cápita (kg petróleo equiv.)"
},
{
"Indicador": "ER.LND.PTLD.ZS",
"Nombre": "Áreas Terrestres Protegidas (%)"
},
{
"Indicador": "AG.LND.TOTL.K2",
"Nombre": "Superficie Total de Tierra (km²)"
}
],
"Infraestructura": [
{
"Indicador": "EG.ELC.ACCS.ZS",
"Nombre": "Acceso a Electricidad (% de población)"
},
{
"Indicador": "IT.MFD.TOTL.ZS",
"Nombre": "Acceso a Tecnología Móvil (%)"
},
{
"Indicador": "IT.NET.USER.ZS",
"Nombre": "Acceso a Internet (%)"
},
{
"Indicador": "SL.TLF.TOTL.IN",
"Nombre": "Fuerza Laboral Total"
},
{
"Indicador": "EG.USE.PCAP.KG.OE",
"Nombre": "Uso de Energía per Cápita (kg petróleo equiv.)"
}
],
"Digital": [
{
"Indicador": "IT.NET.USER.ZS",
"Nombre": "Usuarios de Internet (%)"
},
{
"Indicador": "IT.CEL.SETS.P2",
"Nombre": "Suscripciones Móviles (por 100 personas)"
},
{
"Indicador": "SP.DYN.TFRT.IN", # Corregido el código del indicador
"Nombre": "Tasa de Fertilidad (nacimientos por mujer)"
},
{
"Indicador": "IT.MFD.TOTL.ZS",
"Nombre": "Acceso a Tecnología Móvil (%)"
},
{
"Indicador": "IT.NET.BBND.P2", # Corregido el código del indicador
"Nombre": "Acceso a Internet de Banda Ancha (%)"
}
]
}
# Lista de categorías no deseadas (agregaciones regionales, etc.)
CATEGORIAS_NO_DESEADAS = [
]
@st.cache_data(ttl=3600) # Cache con tiempo de vida de 1 hora
def obtener_datos(indicador):
"""Obtiene datos del Banco Mundial para todos los países."""
url = f"http://api.worldbank.org/v2/country/all/indicator/{indicador}?format=json&per_page=5000"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()[1]
if not data:
st.error(f"No se encontraron datos para el indicador {indicador}.")
return None
df = pd.json_normalize(data)
df['country.value'] = df['country.value'].str.strip().str.title()
df = df[~df['country.value'].isin([cat.strip().title() for cat in CATEGORIAS_NO_DESEADAS])]
if df.empty:
st.warning("No hay datos disponibles después de filtrar categorías no deseadas.")
return None
return df
except requests.Timeout:
st.error("Tiempo de espera agotado al conectar con el Banco Mundial.")
return None
except requests.RequestException as e:
st.error(f"Error al conectar con el Banco Mundial: {str(e)}")
return None
except (IndexError, KeyError, TypeError) as e:
st.error(f"Error al procesar los datos: {str(e)}")
return None
@st.cache_data(ttl=3600)
def obtener_datos_mundo(indicador):
"""Obtiene datos del Banco Mundial solo para el mundo."""
url = f"http://api.worldbank.org/v2/country/WLD/indicator/{indicador}?format=json&per_page=5000"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()[1]
if not data:
st.error(f"No se encontraron datos mundiales para el indicador {indicador}.")
return None
df = pd.json_normalize(data)
return df
except Exception as e:
st.error(f"Error al obtener datos mundiales: {str(e)}")
return None
def prepare_prophet_data(df):
"""Prepara los datos para Prophet."""
try:
df = df.rename(columns={'date': 'ds', 'value': 'y'})
df['ds'] = pd.to_datetime(df['ds'], format='%Y')
df = df[['ds', 'y']].sort_values('ds')
df = df.dropna()
return df
except Exception as e:
st.error(f"Error al preparar datos para Prophet: {str(e)}")
return None
def make_forecast(df, periods=60):
"""Realiza la predicción con Prophet."""
try:
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=False,
daily_seasonality=False,
seasonality_mode='multiplicative',
interval_width=0.95
)
model.fit(df)
future = model.make_future_dataframe(periods=periods, freq='Y')
forecast = model.predict(future)
return forecast
except Exception as e:
st.error(f"Error al realizar la predicción: {str(e)}")
return None
def plot_forecast_comparison(historical_data, forecast_data, title):
"""Crea un gráfico comparativo de datos históricos y predicción."""
try:
fig = make_subplots(
rows=1, cols=1,
#subplot_titles=('Datos Históricos y Predicción'),
vertical_spacing=0.15
)
# Datos históricos y predicción
fig.add_trace(
go.Scatter(
x=historical_data['ds'],
y=historical_data['y'],
name='Datos Históricos',
line=dict(color='blue')
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=forecast_data['ds'],
y=forecast_data['yhat'],
name='Predicción',
line=dict(color='red')
),
row=1, col=1
)
# Intervalos de confianza
fig.add_trace(
go.Scatter(
x=forecast_data['ds'],
y=forecast_data['yhat_upper'],
fill=None,
mode='lines',
line=dict(color='rgba(255,0,0,0.2)'),
name='Límite Superior'
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=forecast_data['ds'],
y=forecast_data['yhat_lower'],
fill='tonexty',
mode='lines',
line=dict(color='rgba(255,0,0,0.2)'),
name='Límite Inferior'
),
row=1, col=1
)
fig.update_layout(
height=600,
title_text=title,
showlegend=True
)
return fig
except Exception as e:
st.error(f"Error al crear el gráfico: {str(e)}")
return None
# Interfaz de usuario
st.title("📊 Análisis de Datos del Banco Mundial por Áreas de Enfoque")
# Selección de área de enfoque y indicador
area_seleccionada = st.selectbox("Selecciona un área de enfoque", list(INDICADORES.keys()))
indicador_seleccionado = st.selectbox(
"Selecciona un indicador",
[i["Nombre"] for i in INDICADORES[area_seleccionada]]
)
# Obtener el indicador correspondiente
indicador_info = next(
i for i in INDICADORES[area_seleccionada]
if i["Nombre"] == indicador_seleccionado
)
# Obtener datos mundiales y realizar predicción
df_mundo = obtener_datos_mundo(indicador_info["Indicador"])
if df_mundo is not None:
# Preparar datos para visualización histórica
df_mundo_hist = df_mundo[['date', 'value']].copy()
df_mundo_hist['date'] = pd.to_datetime(df_mundo_hist['date'], format='%Y')
df_mundo_hist = df_mundo_hist.sort_values(by='date', ascending=True)
# Preparar datos para Prophet y realizar predicción
df_prophet = prepare_prophet_data(df_mundo[['date', 'value']])
if df_prophet is not None:
forecast = make_forecast(df_prophet)
if forecast is not None:
# Mostrar gráficos en pestañas
tab1, tab2 = st.tabs(["📈 Datos Históricos", "🔮 Predicción"])
with tab1:
st.subheader("📅 Evolución del Indicador a lo Largo de los Años")
fig_hist = px.line(
df_mundo_hist,
x='date',
y='value',
title=f"Evolución de {indicador_info['Nombre']} (Mundial)",
labels={'date': 'Año', 'value': indicador_info['Nombre']}
)
st.plotly_chart(fig_hist, use_container_width=True)
with tab2:
st.subheader("🔮 Predicción para los Próximos 60 Años")
fig_forecast = plot_forecast_comparison(
df_prophet,
forecast,
f"Predicción de {indicador_info['Nombre']} - Mundial"
)
if fig_forecast is not None:
st.plotly_chart(fig_forecast, use_container_width=True)
# Métricas de predicción
st.subheader("📊 Métricas Clave de la Predicción")
col1, col2, col3 = st.columns(3)
with col1:
ultimo_valor = df_prophet['y'].iloc[-1]
st.metric("Último Valor Histórico", f"{ultimo_valor:.2f}")
with col2:
valor_predicho = forecast['yhat'].iloc[-1]
st.metric("Valor Predicho (60 años)", f"{valor_predicho:.2f}")
with col3:
cambio_porcentual = ((valor_predicho - ultimo_valor) / ultimo_valor) * 100
st.metric("Cambio Porcentual Esperado", f"{cambio_porcentual:.1f}%")
# Información sobre la predicción
st.info("""
📈 **Información sobre la Predicción**
- La predicción se realiza utilizando Facebook Prophet
- Se consideran tendencias anuales y patrones históricos
- El área sombreada representa el intervalo de confianza de la predicción
- Las tendencias se calculan utilizando medias móviles para datos históricos
""")
# Obtener y mostrar datos de países
df_paises = obtener_datos(indicador_info["Indicador"])
if df_paises is not None and not df_paises.empty:
# Filtrar los datos más recientes
df_paises = df_paises[df_paises['value'].notna()]
ultimo_anio = df_paises['date'].max()
df_paises = df_paises[df_paises['date'] == ultimo_anio]
# Ordenar de mayor a menor
df_paises = df_paises.sort_values(by='value', ascending=False)
# Mostrar gráfico de barras y tablas comparativas
st.subheader(f"🌍 {indicador_info['Nombre']} - Comparativa por Países ({ultimo_anio})")
col1, col2 = st.columns(2)
with col1:
st.write("📈 **Top 20 Valores Más Altos**")
top_20 = df_paises[['country.value', 'value']].head(20).rename(
columns={'country.value': 'País', 'value': 'Valor'}
)
st.dataframe(
top_20.style.format({'Valor': '{:.2f}'}),
hide_index=True,
use_container_width=True
)
with col2:
st.write("📉 **Top 20 Valores Más Bajos**")
bottom_20 = df_paises[['country.value', 'value']].tail(20).rename(
columns={'country.value': 'País', 'value': 'Valor'}
)
st.dataframe(
bottom_20.style.format({'Valor': '{:.2f}'}),
hide_index=True,
use_container_width=True
)
# Visualización interactiva de los top 20 países
fig_paises = px.bar(
df_paises.head(20),
x='country.value',
y='value',
title=f"{indicador_info['Nombre']} por País (Top 20)",
labels={
'country.value': 'País',
'value': indicador_info['Nombre']
},
text='value'
)
fig_paises.update_traces(
texttemplate='%{text:.2f}',
textposition='outside'
)
fig_paises.update_layout(
xaxis_tickangle=-45,
height=600,
showlegend=False
)
st.plotly_chart(fig_paises, use_container_width=True)
else:
st.warning(f"No hay datos disponibles de países para el indicador {indicador_info['Nombre']} en esta área de enfoque.")
# Agregar una sección de chatbot basado en el contexto del indicador seleccionado
st.markdown("---")
st.subheader("💬 Consulta a nuestro asistente virtual sobre este indicador")
# Configuración de la API de Hugging Face
API_URL = "https://api-inference.huggingface.co/models/deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
# Inicializar la sesión state para el historial de chat si no existe
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
# Función para obtener la API key de Hugging Face desde secrets
@st.cache_resource
def get_huggingface_api_key():
"""Obtener la API key de Hugging Face desde secrets"""
try:
# Intenta acceder al token usando la clave HF_TOKEN
return st.secrets["HF_TOKEN"]
except KeyError:
# Si no está disponible con esa clave, intenta el formato anterior
try:
return st.secrets["huggingface"]["api_key"]
except:
return None
# Obtener la API key
api_key = get_huggingface_api_key()
if not api_key:
api_key = st.text_input("Ingresa tu API key de Hugging Face:", type="password")
if not api_key:
st.warning("Por favor ingresa una API key de Hugging Face para usar el chatbot.")
st.stop()
# Función para enviar solicitudes a la API de Hugging Face
def query_huggingface(payload):
"""Envía una solicitud a la API de Hugging Face y retorna la respuesta"""
headers = {"Authorization": f"Bearer {api_key}"}
try:
response = requests.post(API_URL, headers=headers, json=payload, timeout=60)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
st.error("La solicitud a la API de Hugging Face ha excedido el tiempo de espera.")
return None
except requests.exceptions.HTTPError as e:
st.error(f"Error HTTP: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
st.error(f"Error al comunicarse con la API de Hugging Face: {str(e)}")
return None
# Preparar el contexto basado en los datos seleccionados
def prepare_context():
"""Prepara el contexto para el chatbot basado en el indicador seleccionado"""
context = f"""
Información sobre el indicador '{indicador_seleccionado}' ({indicador_info['Indicador']}):
- Área de enfoque: {area_seleccionada}
"""
# Verificar si las variables existen en el contexto actual antes de usarlas
if 'ultimo_anio' in locals() or 'ultimo_anio' in globals():
context += f"- Último año con datos: {ultimo_anio}\n"
# Agregar información sobre valores mundiales si está disponible
if ('df_mundo_hist' in locals() or 'df_mundo_hist' in globals()) and 'df_mundo_hist' is not None and not df_mundo_hist.empty:
ultimo_valor_mundial = df_mundo_hist.iloc[-1]['value'] if not df_mundo_hist.empty else "No disponible"
context += f"- Último valor mundial registrado: {ultimo_valor_mundial}\n"
# Agregar información sobre predicción si está disponible
if ('forecast' in locals() or 'forecast' in globals()) and forecast is not None:
valor_predicho = forecast['yhat'].iloc[-1]
context += f"- Valor predicho para dentro de 60 años: {valor_predicho:.2f}\n"
if 'cambio_porcentual' in locals() or 'cambio_porcentual' in globals():
context += f"- Cambio porcentual esperado: {cambio_porcentual:.1f}%\n"
# Agregar información sobre países top si está disponible
if ('top_20' in locals() or 'top_20' in globals()) and top_20 is not None and not top_20.empty:
top_3_paises = top_20.head(3)
context += "- Top 3 países con valores más altos:\n"
for _, row in top_3_paises.iterrows():
context += f" * {row['País']}: {row['Valor']:.2f}\n"
return context
# Interfaz del chatbot
st.info("Puedes preguntar cualquier cosa sobre este indicador, su evolución histórica, predicciones futuras o comparar países.")
# Crear el widget de entrada de usuario
user_input = st.text_input("Tu pregunta:", key="user_query", placeholder="Ej: ¿Cuál es la tendencia esperada para este indicador?")
# Crear un contenedor para el historial de chat
chat_container = st.container()
# Procesar la entrada del usuario
if user_input:
# Preparar el contexto
context = prepare_context()
# Construir el prompt para el modelo
prompt = f"""
Eres un asistente especializado en datos del Banco Mundial y análisis económico.
CONTEXTO:
{context}
PREGUNTA DEL USUARIO:
{user_input}
Responde de manera concisa y útil, basándote en el contexto proporcionado.
"""
# Mostrar un mensaje de espera personalizado con icono
with st.spinner("🧠 Pensando..."):
# Llamar a la API de Hugging Face
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 250,
"temperature": 0.7,
"top_p": 0.9,
"do_sample": True
}
}
response = query_huggingface(payload)
if response:
# Extraer la respuesta del modelo
if isinstance(response, list) and len(response) > 0:
bot_response = response[0].get("generated_text", "")
# Intentar extraer solo la respuesta del asistente (después del prompt)
try:
bot_response = bot_response.split("Responde de manera concisa y útil")[-1]
if "PREGUNTA DEL USUARIO:" in bot_response:
bot_response = bot_response.split("PREGUNTA DEL USUARIO:")[-1]
bot_response = bot_response.strip()
except:
# Si falla la extracción, usar la respuesta completa
pass
else:
bot_response = str(response)
# Agregar al historial de chat
st.session_state.chat_history.append({"role": "user", "content": user_input})
st.session_state.chat_history.append({"role": "assistant", "content": bot_response})
# Mostrar el historial de chat
with chat_container:
for message in st.session_state.chat_history:
if message["role"] == "user":
st.markdown(f"**😀 Tú:** {message['content']}")
else:
st.markdown(f"**🤖 Asistente:** {message['content']}")
# Información sobre el modelo
st.markdown("---")
st.caption("Asistente virtual potenciado por deepseek-ai/DeepSeek-R1-Distill-Qwen-32B a través de Hugging Face")