Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.impute import SimpleImputer, KNNImputer | |
| from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler, MinMaxScaler | |
| from sklearn.compose import ColumnTransformer | |
| import plotly.express as px | |
| import io | |
| # Metadata | |
| AUTHOR = "Eduardo Nacimiento García" | |
| EMAIL = "enacimie@ull.edu.es" | |
| LICENSE = "Apache 2.0" | |
| # Page config | |
| st.set_page_config( | |
| page_title="SimpleClean", | |
| page_icon="🧹", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| ) | |
| # Title | |
| st.title("🧹 SimpleClean") | |
| st.markdown(f"**Author:** {AUTHOR} | **Email:** {EMAIL} | **License:** {LICENSE}") | |
| st.write(""" | |
| Upload a CSV or use the demo dataset to interactively clean your data: handle missing values, encode categories, scale features, and more. | |
| """) | |
| # === GENERATE DEMO DATASET === | |
| def create_demo_data(): | |
| np.random.seed(42) | |
| n = 300 | |
| data = { | |
| "Age": np.random.normal(35, 12, n).astype(int), | |
| "Income": np.random.normal(45000, 15000, n), | |
| "City": np.random.choice(["Madrid", "Barcelona", "Valencia", "Seville", None], n, p=[0.25, 0.25, 0.25, 0.2, 0.05]), | |
| "Gender": np.random.choice(["M", "F", None], n, p=[0.45, 0.45, 0.10]), | |
| "Has_Children": np.random.choice([0, 1, None], n, p=[0.4, 0.4, 0.2]), | |
| "Satisfaction": np.random.randint(1, 11, n) | |
| } | |
| df = pd.DataFrame(data) | |
| # Introduce some nulls | |
| df.loc[np.random.choice(df.index, 15), "Age"] = np.nan | |
| df.loc[np.random.choice(df.index, 20), "Income"] = np.nan | |
| df.loc[np.random.choice(df.index, 10), "Satisfaction"] = np.nan | |
| return df | |
| # === LOAD DATA === | |
| if st.button("🧪 Load Demo Dataset"): | |
| st.session_state['df_original'] = create_demo_data() | |
| st.session_state['df_clean'] = st.session_state['df_original'].copy() | |
| st.success("✅ Demo dataset loaded!") | |
| uploaded_file = st.file_uploader("📂 Upload your CSV file", type=["csv"]) | |
| if uploaded_file: | |
| df = pd.read_csv(uploaded_file) | |
| st.session_state['df_original'] = df | |
| st.session_state['df_clean'] = df.copy() | |
| st.success("✅ File uploaded successfully.") | |
| if 'df_clean' not in st.session_state: | |
| st.info("👆 Upload a CSV or click 'Load Demo Dataset' to begin.") | |
| st.stop() | |
| df_original = st.session_state['df_original'] | |
| df_clean = st.session_state['df_clean'] | |
| # Show data preview | |
| st.subheader("🔍 Data Preview") | |
| with st.expander("Original Data (first 10 rows)"): | |
| st.dataframe(df_original.head(10)) | |
| with st.expander("Current Cleaned Data (first 10 rows)"): | |
| st.dataframe(df_clean.head(10)) | |
| # === DATA QUALITY REPORT === | |
| st.header("📊 Data Quality Report") | |
| col1, col2, col3, col4 = st.columns(4) | |
| col1.metric("Rows", df_clean.shape[0]) | |
| col2.metric("Columns", df_clean.shape[1]) | |
| col3.metric("Missing Cells", df_clean.isnull().sum().sum()) | |
| col4.metric("Duplicate Rows", df_clean.duplicated().sum()) | |
| # Missing values by column | |
| st.subheader("🕳️ Missing Values by Column") | |
| missing_data = df_clean.isnull().sum() | |
| fig_missing = px.bar( | |
| missing_data, | |
| title="Missing Values per Column", | |
| labels={'value': 'Count', 'index': 'Column'}, | |
| color=missing_data | |
| ) | |
| st.plotly_chart(fig_missing, use_container_width=True) | |
| # Data types | |
| st.subheader("🔤 Column Data Types") | |
| dtypes_df = pd.DataFrame(df_clean.dtypes, columns=['Data Type']).reset_index() | |
| dtypes_df.columns = ['Column', 'Data Type'] | |
| st.dataframe(dtypes_df) | |
| # === CLEANING OPTIONS === | |
| st.header("🧼 Cleaning Actions") | |
| tab1, tab2, tab3, tab4 = st.tabs([ | |
| "🧹 Remove Duplicates", | |
| "🩹 Handle Missing Values", | |
| "🔠 Encode Categorical Variables", | |
| "📏 Scale Numeric Variables" | |
| ]) | |
| # Tab 1: Remove Duplicates | |
| with tab1: | |
| st.subheader("Remove Duplicate Rows") | |
| if st.button("Remove All Duplicates"): | |
| original_count = len(df_clean) | |
| df_clean = df_clean.drop_duplicates().reset_index(drop=True) | |
| st.session_state['df_clean'] = df_clean | |
| removed = original_count - len(df_clean) | |
| st.success(f"✅ Removed {removed} duplicate rows.") | |
| # Tab 2: Handle Missing Values | |
| with tab2: | |
| st.subheader("Impute Missing Values") | |
| # Select column with missing values | |
| cols_with_missing = df_clean.columns[df_clean.isnull().any()].tolist() | |
| if not cols_with_missing: | |
| st.success("✅ No missing values to impute.") | |
| else: | |
| col_to_impute = st.selectbox("Select column to impute:", cols_with_missing) | |
| # Detect column type | |
| col_dtype = df_clean[col_to_impute].dtype | |
| if col_dtype in ['object', 'category']: | |
| strategy = st.selectbox( | |
| f"Imputation strategy for {col_to_impute} (categorical):", | |
| ["Most Frequent", "Constant"] | |
| ) | |
| if strategy == "Constant": | |
| fill_value = st.text_input("Constant value:", value="Unknown") | |
| else: | |
| fill_value = None | |
| else: | |
| strategy = st.selectbox( | |
| f"Imputation strategy for {col_to_impute} (numeric):", | |
| ["Mean", "Median", "Most Frequent", "Constant", "KNN Imputer"] | |
| ) | |
| if strategy == "Constant": | |
| fill_value = st.number_input("Constant value:", value=0.0) | |
| else: | |
| fill_value = None | |
| if st.button(f"Apply Imputation to '{col_to_impute}'"): | |
| try: | |
| if strategy == "Mean": | |
| imputer = SimpleImputer(strategy='mean') | |
| elif strategy == "Median": | |
| imputer = SimpleImputer(strategy='median') | |
| elif strategy == "Most Frequent": | |
| imputer = SimpleImputer(strategy='most_frequent') | |
| elif strategy == "Constant": | |
| imputer = SimpleImputer(strategy='constant', fill_value=fill_value) | |
| elif strategy == "KNN Imputer" and col_dtype in [np.number, 'float64', 'int64']: | |
| # Only for numeric | |
| imputer = KNNImputer(n_neighbors=5) | |
| else: | |
| st.error("Invalid strategy for this column type.") | |
| st.stop() | |
| # Apply imputation | |
| if strategy == "KNN Imputer": | |
| # Only apply to numeric columns for KNN | |
| numeric_cols = df_clean.select_dtypes(include=[np.number]).columns | |
| df_clean[numeric_cols] = imputer.fit_transform(df_clean[numeric_cols]) | |
| else: | |
| df_clean[col_to_impute] = imputer.fit_transform(df_clean[[col_to_impute]]).ravel() | |
| st.session_state['df_clean'] = df_clean | |
| st.success(f"✅ Missing values in '{col_to_impute}' imputed using '{strategy}'.") | |
| except Exception as e: | |
| st.error(f"❌ Error during imputation: {e}") | |
| # Tab 3: Encode Categorical Variables | |
| with tab3: | |
| st.subheader("Encode Categorical Variables") | |
| categorical_cols = df_clean.select_dtypes(include=['object', 'category']).columns.tolist() | |
| if not categorical_cols: | |
| st.info("ℹ️ No categorical columns to encode.") | |
| else: | |
| col_to_encode = st.selectbox("Select categorical column to encode:", categorical_cols) | |
| encoding_method = st.radio( | |
| "Encoding method:", | |
| ["Label Encoding", "One-Hot Encoding"] | |
| ) | |
| if st.button(f"Apply {encoding_method} to '{col_to_encode}'"): | |
| try: | |
| if encoding_method == "Label Encoding": | |
| le = LabelEncoder() | |
| df_clean[col_to_encode] = le.fit_transform(df_clean[col_to_encode].astype(str)) | |
| st.session_state['df_clean'] = df_clean | |
| st.success(f"✅ '{col_to_encode}' label encoded.") | |
| else: # One-Hot Encoding | |
| df_encoded = pd.get_dummies(df_clean[col_to_encode], prefix=col_to_encode) | |
| df_clean = df_clean.drop(columns=[col_to_encode]) | |
| df_clean = pd.concat([df_clean, df_encoded], axis=1) | |
| st.session_state['df_clean'] = df_clean | |
| st.success(f"✅ '{col_to_encode}' one-hot encoded. {df_encoded.shape[1]} new columns added.") | |
| except Exception as e: | |
| st.error(f"❌ Error during encoding: {e}") | |
| # Tab 4: Scale Numeric Variables | |
| with tab4: | |
| st.subheader("Scale Numeric Variables") | |
| numeric_cols = df_clean.select_dtypes(include=[np.number]).columns.tolist() | |
| if not numeric_cols: | |
| st.info("ℹ️ No numeric columns to scale.") | |
| else: | |
| cols_to_scale = st.multiselect("Select numeric columns to scale:", numeric_cols, default=numeric_cols[:2] if len(numeric_cols) >= 2 else numeric_cols) | |
| scaling_method = st.radio("Scaling method:", ["StandardScaler (Z-score)", "MinMaxScaler (0-1)"]) | |
| if st.button("Apply Scaling"): | |
| try: | |
| if scaling_method == "StandardScaler (Z-score)": | |
| scaler = StandardScaler() | |
| else: | |
| scaler = MinMaxScaler() | |
| df_clean[cols_to_scale] = scaler.fit_transform(df_clean[cols_to_scale]) | |
| st.session_state['df_clean'] = df_clean | |
| st.success(f"✅ Columns {cols_to_scale} scaled using {scaling_method}.") | |
| except Exception as e: | |
| st.error(f"❌ Error during scaling: {e}") | |
| # === DOWNLOAD CLEANED DATA === | |
| st.header("📥 Download Cleaned Data") | |
| df_clean_final = st.session_state['df_clean'] | |
| # Show final preview | |
| with st.expander("Final Cleaned Data Preview"): | |
| st.dataframe(df_clean_final.head(10)) | |
| # Download button | |
| csv = df_clean_final.to_csv(index=False).encode('utf-8') | |
| st.download_button( | |
| label="💾 Download Cleaned CSV", | |
| data=csv, | |
| file_name="cleaned_data.csv", | |
| mime="text/csv", | |
| ) | |
| # Reset button | |
| if st.button("🔄 Reset to Original Data"): | |
| st.session_state['df_clean'] = st.session_state['df_original'].copy() | |
| st.success("✅ Data reset to original state.") | |
| # Footer | |
| st.markdown("---") | |
| st.caption(f"© {AUTHOR} | License {LICENSE} | Contact: {EMAIL}") |