import streamlit as st import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import io import base64 import chardet from pandas.api.types import is_numeric_dtype st.set_page_config(page_title="EDA y Limpieza de Datos", layout="wide") # Función para generar enlace de descarga def get_download_link(df, filename, text): csv = df.to_csv(index=False) b64 = base64.b64encode(csv.encode()).decode() href = f'{text}' return href # Función para crear un resumen detallado de los datos def generate_data_summary(df): # Información básica st.header("📊 Información General del Dataset") col1, col2, col3 = st.columns(3) with col1: st.metric("Filas", df.shape[0]) with col2: st.metric("Columnas", df.shape[1]) with col3: st.metric("Valores nulos totales", df.isna().sum().sum()) # Primeras filas st.subheader("Vista previa de los datos") st.dataframe(df.head()) # Tipos de datos st.subheader("Tipos de datos") dtypes_df = pd.DataFrame(df.dtypes, columns=['Tipo de dato']) dtypes_df.index.name = 'Columna' dtypes_df = dtypes_df.reset_index() st.dataframe(dtypes_df) # Resumen estadístico para columnas numéricas st.subheader("Resumen estadístico") st.dataframe(df.describe()) # Análisis de valores nulos st.subheader("Análisis de valores nulos") null_counts = df.isnull().sum() null_percentages = (null_counts / len(df) * 100).round(2) nulls_df = pd.DataFrame({ 'Valores nulos': null_counts, 'Porcentaje (%)': null_percentages }) nulls_df = nulls_df[nulls_df['Valores nulos'] > 0].sort_values('Valores nulos', ascending=False) if not nulls_df.empty: st.dataframe(nulls_df) # Visualización de valores nulos st.subheader("Visualización de valores nulos") fig, ax = plt.subplots(figsize=(10, 6)) sns.heatmap(df.isnull(), yticklabels=False, cbar=False, cmap='viridis', ax=ax) st.pyplot(fig) else: st.success("¡No hay valores nulos en el dataset!") # Función para visualizar distribuciones def visualize_distributions(df): st.header("📈 Visualización de Distribuciones") numeric_cols = df.select_dtypes(include='number').columns.tolist() categorical_cols = df.select_dtypes(exclude='number').columns.tolist() if numeric_cols: st.subheader("Columnas numéricas") selected_num_col = st.selectbox("Selecciona una columna numérica", numeric_cols) col1, col2 = st.columns(2) with col1: fig, ax = plt.subplots(figsize=(10, 6)) sns.histplot(df[selected_num_col].dropna(), kde=True, ax=ax) plt.title(f'Distribución de {selected_num_col}') plt.xlabel(selected_num_col) plt.ylabel('Frecuencia') st.pyplot(fig) with col2: fig, ax = plt.subplots(figsize=(10, 6)) sns.boxplot(y=df[selected_num_col].dropna(), ax=ax) plt.title(f'Boxplot de {selected_num_col}') st.pyplot(fig) if categorical_cols: st.subheader("Columnas categóricas") selected_cat_col = st.selectbox("Selecciona una columna categórica", categorical_cols) fig, ax = plt.subplots(figsize=(10, 6)) value_counts = df[selected_cat_col].value_counts().sort_values(ascending=False) # Limitar el número de categorías mostradas para mayor claridad if len(value_counts) > 15: other_count = value_counts[15:].sum() value_counts = value_counts[:15] value_counts['Otros'] = other_count sns.barplot(x=value_counts.index, y=value_counts.values, ax=ax) plt.title(f'Distribución de {selected_cat_col}') plt.xticks(rotation=45, ha='right') plt.tight_layout() st.pyplot(fig) # Función para correlaciones def visualize_correlations(df): st.header("🔄 Análisis de Correlaciones") numeric_cols = df.select_dtypes(include='number').columns.tolist() if len(numeric_cols) >= 2: # Matriz de correlación st.subheader("Matriz de correlación") corr_matrix = df[numeric_cols].corr() fig, ax = plt.subplots(figsize=(10, 8)) sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt=".2f", linewidths=0.5, ax=ax) plt.tight_layout() st.pyplot(fig) # Correlación entre dos variables específicas st.subheader("Correlación entre dos variables") col1, col2 = st.columns(2) with col1: x_var = st.selectbox("Variable X", numeric_cols) with col2: y_var = st.selectbox("Variable Y", numeric_cols, index=min(1, len(numeric_cols)-1)) fig, ax = plt.subplots(figsize=(10, 6)) sns.scatterplot(data=df, x=x_var, y=y_var, ax=ax) plt.title(f'Correlación entre {x_var} y {y_var}') st.pyplot(fig) else: st.info("Se necesitan al menos dos columnas numéricas para analizar correlaciones.") # Función para limpiar datos def clean_data(df): st.header("🧹 Limpieza de Datos") cleaned_df = df.copy() # 1. Manejo de valores nulos st.subheader("Manejo de valores nulos") null_columns = df.columns[df.isnull().any()].tolist() if null_columns: for column in null_columns: st.markdown(f"**Columna: {column}**") col_type = 'numérica' if is_numeric_dtype(df[column]) else 'categórica' method = st.radio( f"¿Cómo quieres manejar los valores nulos en '{column}' (columna {col_type})?", options=[ "Eliminar filas con valores nulos", f"Reemplazar con la media (para columnas numéricas)" if is_numeric_dtype(df[column]) else "Reemplazar con la moda (para columnas categóricas)", "Reemplazar con cero (para columnas numéricas)" if is_numeric_dtype(df[column]) else "Reemplazar con un valor específico", "No hacer nada" ], key=f"null_{column}" ) if method == "Eliminar filas con valores nulos": cleaned_df = cleaned_df.dropna(subset=[column]) st.info(f"Se eliminarán {df[column].isna().sum()} filas con valores nulos en '{column}'") elif method == "Reemplazar con la media (para columnas numéricas)": mean_value = df[column].mean() cleaned_df[column] = cleaned_df[column].fillna(mean_value) st.info(f"Los valores nulos en '{column}' serán reemplazados con la media: {mean_value:.2f}") elif method == "Reemplazar con la moda (para columnas categóricas)": mode_value = df[column].mode()[0] cleaned_df[column] = cleaned_df[column].fillna(mode_value) st.info(f"Los valores nulos en '{column}' serán reemplazados con la moda: {mode_value}") elif method == "Reemplazar con cero (para columnas numéricas)": cleaned_df[column] = cleaned_df[column].fillna(0) st.info(f"Los valores nulos en '{column}' serán reemplazados con cero") elif method == "Reemplazar con un valor específico": custom_value = st.text_input(f"Valor de reemplazo para '{column}':", key=f"custom_{column}") if custom_value: cleaned_df[column] = cleaned_df[column].fillna(custom_value) else: st.success("¡No hay valores nulos que tratar!") # 2. Manejo de duplicados st.subheader("Manejo de duplicados") duplicates = df.duplicated().sum() if duplicates > 0: st.warning(f"Se encontraron {duplicates} filas duplicadas en el dataset.") remove_duplicates = st.checkbox("Eliminar filas duplicadas") if remove_duplicates: cleaned_df = cleaned_df.drop_duplicates() st.info(f"Se eliminarán {duplicates} filas duplicadas.") else: st.success("¡No hay filas duplicadas en el dataset!") # 3. Manejo de valores atípicos (outliers) st.subheader("Manejo de valores atípicos (outliers)") numeric_cols = df.select_dtypes(include=['number']).columns.tolist() if numeric_cols: outlier_handling = st.checkbox("¿Quieres tratar los valores atípicos?") if outlier_handling: selected_col = st.selectbox("Selecciona una columna numérica para analizar outliers", numeric_cols) # Visualizar la distribución con posibles outliers fig, ax = plt.subplots(figsize=(10, 6)) sns.boxplot(y=df[selected_col], ax=ax) plt.title(f'Boxplot de {selected_col} - Identificación de outliers') st.pyplot(fig) # Calcular límites para outliers usando el método IQR Q1 = df[selected_col].quantile(0.25) Q3 = df[selected_col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = df[(df[selected_col] < lower_bound) | (df[selected_col] > upper_bound)][selected_col] if not outliers.empty: st.warning(f"Se encontraron {len(outliers)} valores atípicos en '{selected_col}'.") outlier_method = st.radio( f"¿Cómo quieres manejar los outliers en '{selected_col}'?", options=[ "Recortar (capping)", "Eliminar filas con outliers", "No hacer nada" ], key=f"outlier_{selected_col}" ) if outlier_method == "Recortar (capping)": cleaned_df[selected_col] = cleaned_df[selected_col].clip(lower_bound, upper_bound) st.info(f"Los valores atípicos en '{selected_col}' serán recortados a [{lower_bound:.2f}, {upper_bound:.2f}]") elif outlier_method == "Eliminar filas con outliers": mask = (cleaned_df[selected_col] >= lower_bound) & (cleaned_df[selected_col] <= upper_bound) cleaned_df = cleaned_df[mask] st.info(f"Se eliminarán {len(outliers)} filas con valores atípicos en '{selected_col}'") else: st.success(f"¡No se encontraron valores atípicos en '{selected_col}'!") # 4. Transformación de tipos de datos st.subheader("Transformación de tipos de datos") type_conversion = st.checkbox("¿Quieres convertir el tipo de alguna columna?") if type_conversion: col1, col2 = st.columns(2) with col1: column_to_convert = st.selectbox("Selecciona una columna", df.columns) with col2: new_type = st.selectbox("Nuevo tipo de dato", options=['int', 'float', 'string', 'datetime', 'category']) try: if new_type == 'int': cleaned_df[column_to_convert] = cleaned_df[column_to_convert].astype(int) elif new_type == 'float': cleaned_df[column_to_convert] = cleaned_df[column_to_convert].astype(float) elif new_type == 'string': cleaned_df[column_to_convert] = cleaned_df[column_to_convert].astype(str) elif new_type == 'datetime': cleaned_df[column_to_convert] = pd.to_datetime(cleaned_df[column_to_convert]) elif new_type == 'category': cleaned_df[column_to_convert] = cleaned_df[column_to_convert].astype('category') st.success(f"La columna '{column_to_convert}' ha sido convertida a tipo {new_type}") except Exception as e: st.error(f"Error al convertir el tipo de dato: {str(e)}") return cleaned_df # Aplicación principal def main(): st.title("📊 Análisis Exploratorio de Datos (EDA) y Limpieza") st.markdown(""" Esta aplicación te permite realizar un análisis exploratorio completo de tus datos, visualizar su distribución y realizar operaciones de limpieza paso a paso. """) # Subir archivo st.header("📁 Carga tu archivo") uploaded_file = st.file_uploader("Selecciona un archivo CSV o Excel", type=['csv', 'xlsx', 'xls']) # Opciones avanzadas de importación with st.expander("Opciones avanzadas de importación"): custom_encoding = st.text_input("Especificar codificación personalizada (opcional)", placeholder="Ejemplo: latin1, utf-8-sig, cp1252") csv_separator = st.text_input("Separador CSV personalizado (opcional)", placeholder="Por defecto: ," ) skip_rows = st.number_input("Saltar filas iniciales", min_value=0, value=0) detect_encoding = st.checkbox("Detectar automáticamente la codificación (puede ser lento para archivos grandes)") decimal_separator = st.radio("Separador decimal", options=[".", ","], index=0) if uploaded_file is not None: try: # Determinar tipo de archivo y leerlo if uploaded_file.name.endswith('.csv'): try: # Preparar opciones para read_csv csv_options = { 'skiprows': skip_rows, 'decimal': decimal_separator } # Agregar separador personalizado si se proporciona if csv_separator: csv_options['sep'] = csv_separator # Detectar codificación si está marcada la opción if detect_encoding: uploaded_file.seek(0) result = chardet.detect(uploaded_file.read()) detected_encoding = result['encoding'] confidence = result['confidence'] uploaded_file.seek(0) st.info(f"Codificación detectada: {detected_encoding} (confianza: {confidence:.2f})") csv_options['encoding'] = detected_encoding # Usar codificación personalizada si se proporciona elif custom_encoding: csv_options['encoding'] = custom_encoding else: # Intentar con diferentes codificaciones encodings = ['utf-8', 'latin1', 'ISO-8859-1', 'cp1252'] for encoding in encodings: try: # Reiniciar la posición del archivo para cada intento uploaded_file.seek(0) df = pd.read_csv(uploaded_file, encoding=encoding, **csv_options) st.success(f"Archivo leído correctamente usando codificación: {encoding}") break except UnicodeDecodeError: continue else: # Este bloque se ejecuta si el bucle termina sin un break raise Exception("No se pudo decodificar el archivo con ninguna de las codificaciones intentadas.") # Si llegamos aquí con una codificación personalizada o detectada if custom_encoding or detect_encoding: uploaded_file.seek(0) df = pd.read_csv(uploaded_file, **csv_options) except Exception as e: # Si todas las opciones fallan, intentar con reemplazo de caracteres uploaded_file.seek(0) # Agregamos low_memory=False para evitar problemas con archivos grandes df = pd.read_csv(uploaded_file, encoding_errors='replace', low_memory=False, **csv_options) st.warning(f"Se usó reemplazo de caracteres desconocidos. Algunos caracteres pueden no verse correctamente.") else: # Opciones para archivos Excel excel_options = {'skiprows': skip_rows} df = pd.read_excel(uploaded_file, **excel_options) # Crear pestañas para organizar el análisis tab1, tab2, tab3, tab4 = st.tabs(["📊 Resumen de datos", "📈 Visualizaciones", "🔄 Correlaciones", "🧹 Limpieza"]) with tab1: generate_data_summary(df) with tab2: visualize_distributions(df) with tab3: visualize_correlations(df) with tab4: cleaned_df = clean_data(df) if st.button("Aplicar cambios y descargar datos limpios"): st.success("¡Limpieza de datos completada!") # Mostrar comparación st.subheader("Comparación: Datos originales vs. Datos limpios") col1, col2 = st.columns(2) with col1: st.write("Datos originales") st.metric("Filas", df.shape[0]) st.metric("Valores nulos", df.isna().sum().sum()) with col2: st.write("Datos limpios") st.metric("Filas", cleaned_df.shape[0]) st.metric("Valores nulos", cleaned_df.isna().sum().sum()) # Generar enlace de descarga st.markdown(get_download_link(cleaned_df, "datos_limpios", "📥 Descargar datos limpios (CSV)"), unsafe_allow_html=True) except Exception as e: st.error(f"Error al procesar el archivo: {str(e)}") if __name__ == "__main__": main()