SimpleClean / src /streamlit_app.py
enacimie's picture
Update src/streamlit_app.py
67cadf7 verified
import streamlit as st
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler, MinMaxScaler
from sklearn.compose import ColumnTransformer
import plotly.express as px
import io
# Metadata
AUTHOR = "Eduardo Nacimiento García"
EMAIL = "enacimie@ull.edu.es"
LICENSE = "Apache 2.0"
# Page config
st.set_page_config(
page_title="SimpleClean",
page_icon="🧹",
layout="wide",
initial_sidebar_state="expanded",
)
# Title
st.title("🧹 SimpleClean")
st.markdown(f"**Author:** {AUTHOR} | **Email:** {EMAIL} | **License:** {LICENSE}")
st.write("""
Upload a CSV or use the demo dataset to interactively clean your data: handle missing values, encode categories, scale features, and more.
""")
# === GENERATE DEMO DATASET ===
@st.cache_data
def create_demo_data():
np.random.seed(42)
n = 300
data = {
"Age": np.random.normal(35, 12, n).astype(int),
"Income": np.random.normal(45000, 15000, n),
"City": np.random.choice(["Madrid", "Barcelona", "Valencia", "Seville", None], n, p=[0.25, 0.25, 0.25, 0.2, 0.05]),
"Gender": np.random.choice(["M", "F", None], n, p=[0.45, 0.45, 0.10]),
"Has_Children": np.random.choice([0, 1, None], n, p=[0.4, 0.4, 0.2]),
"Satisfaction": np.random.randint(1, 11, n)
}
df = pd.DataFrame(data)
# Introduce some nulls
df.loc[np.random.choice(df.index, 15), "Age"] = np.nan
df.loc[np.random.choice(df.index, 20), "Income"] = np.nan
df.loc[np.random.choice(df.index, 10), "Satisfaction"] = np.nan
return df
# === LOAD DATA ===
if st.button("🧪 Load Demo Dataset"):
st.session_state['df_original'] = create_demo_data()
st.session_state['df_clean'] = st.session_state['df_original'].copy()
st.success("✅ Demo dataset loaded!")
uploaded_file = st.file_uploader("📂 Upload your CSV file", type=["csv"])
if uploaded_file:
df = pd.read_csv(uploaded_file)
st.session_state['df_original'] = df
st.session_state['df_clean'] = df.copy()
st.success("✅ File uploaded successfully.")
if 'df_clean' not in st.session_state:
st.info("👆 Upload a CSV or click 'Load Demo Dataset' to begin.")
st.stop()
df_original = st.session_state['df_original']
df_clean = st.session_state['df_clean']
# Show data preview
st.subheader("🔍 Data Preview")
with st.expander("Original Data (first 10 rows)"):
st.dataframe(df_original.head(10))
with st.expander("Current Cleaned Data (first 10 rows)"):
st.dataframe(df_clean.head(10))
# === DATA QUALITY REPORT ===
st.header("📊 Data Quality Report")
col1, col2, col3, col4 = st.columns(4)
col1.metric("Rows", df_clean.shape[0])
col2.metric("Columns", df_clean.shape[1])
col3.metric("Missing Cells", df_clean.isnull().sum().sum())
col4.metric("Duplicate Rows", df_clean.duplicated().sum())
# Missing values by column
st.subheader("🕳️ Missing Values by Column")
missing_data = df_clean.isnull().sum()
fig_missing = px.bar(
missing_data,
title="Missing Values per Column",
labels={'value': 'Count', 'index': 'Column'},
color=missing_data
)
st.plotly_chart(fig_missing, use_container_width=True)
# Data types
st.subheader("🔤 Column Data Types")
dtypes_df = pd.DataFrame(df_clean.dtypes, columns=['Data Type']).reset_index()
dtypes_df.columns = ['Column', 'Data Type']
st.dataframe(dtypes_df)
# === CLEANING OPTIONS ===
st.header("🧼 Cleaning Actions")
tab1, tab2, tab3, tab4 = st.tabs([
"🧹 Remove Duplicates",
"🩹 Handle Missing Values",
"🔠 Encode Categorical Variables",
"📏 Scale Numeric Variables"
])
# Tab 1: Remove Duplicates
with tab1:
st.subheader("Remove Duplicate Rows")
if st.button("Remove All Duplicates"):
original_count = len(df_clean)
df_clean = df_clean.drop_duplicates().reset_index(drop=True)
st.session_state['df_clean'] = df_clean
removed = original_count - len(df_clean)
st.success(f"✅ Removed {removed} duplicate rows.")
# Tab 2: Handle Missing Values
with tab2:
st.subheader("Impute Missing Values")
# Select column with missing values
cols_with_missing = df_clean.columns[df_clean.isnull().any()].tolist()
if not cols_with_missing:
st.success("✅ No missing values to impute.")
else:
col_to_impute = st.selectbox("Select column to impute:", cols_with_missing)
# Detect column type
col_dtype = df_clean[col_to_impute].dtype
if col_dtype in ['object', 'category']:
strategy = st.selectbox(
f"Imputation strategy for {col_to_impute} (categorical):",
["Most Frequent", "Constant"]
)
if strategy == "Constant":
fill_value = st.text_input("Constant value:", value="Unknown")
else:
fill_value = None
else:
strategy = st.selectbox(
f"Imputation strategy for {col_to_impute} (numeric):",
["Mean", "Median", "Most Frequent", "Constant", "KNN Imputer"]
)
if strategy == "Constant":
fill_value = st.number_input("Constant value:", value=0.0)
else:
fill_value = None
if st.button(f"Apply Imputation to '{col_to_impute}'"):
try:
if strategy == "Mean":
imputer = SimpleImputer(strategy='mean')
elif strategy == "Median":
imputer = SimpleImputer(strategy='median')
elif strategy == "Most Frequent":
imputer = SimpleImputer(strategy='most_frequent')
elif strategy == "Constant":
imputer = SimpleImputer(strategy='constant', fill_value=fill_value)
elif strategy == "KNN Imputer" and col_dtype in [np.number, 'float64', 'int64']:
# Only for numeric
imputer = KNNImputer(n_neighbors=5)
else:
st.error("Invalid strategy for this column type.")
st.stop()
# Apply imputation
if strategy == "KNN Imputer":
# Only apply to numeric columns for KNN
numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
df_clean[numeric_cols] = imputer.fit_transform(df_clean[numeric_cols])
else:
df_clean[col_to_impute] = imputer.fit_transform(df_clean[[col_to_impute]]).ravel()
st.session_state['df_clean'] = df_clean
st.success(f"✅ Missing values in '{col_to_impute}' imputed using '{strategy}'.")
except Exception as e:
st.error(f"❌ Error during imputation: {e}")
# Tab 3: Encode Categorical Variables
with tab3:
st.subheader("Encode Categorical Variables")
categorical_cols = df_clean.select_dtypes(include=['object', 'category']).columns.tolist()
if not categorical_cols:
st.info("ℹ️ No categorical columns to encode.")
else:
col_to_encode = st.selectbox("Select categorical column to encode:", categorical_cols)
encoding_method = st.radio(
"Encoding method:",
["Label Encoding", "One-Hot Encoding"]
)
if st.button(f"Apply {encoding_method} to '{col_to_encode}'"):
try:
if encoding_method == "Label Encoding":
le = LabelEncoder()
df_clean[col_to_encode] = le.fit_transform(df_clean[col_to_encode].astype(str))
st.session_state['df_clean'] = df_clean
st.success(f"✅ '{col_to_encode}' label encoded.")
else: # One-Hot Encoding
df_encoded = pd.get_dummies(df_clean[col_to_encode], prefix=col_to_encode)
df_clean = df_clean.drop(columns=[col_to_encode])
df_clean = pd.concat([df_clean, df_encoded], axis=1)
st.session_state['df_clean'] = df_clean
st.success(f"✅ '{col_to_encode}' one-hot encoded. {df_encoded.shape[1]} new columns added.")
except Exception as e:
st.error(f"❌ Error during encoding: {e}")
# Tab 4: Scale Numeric Variables
with tab4:
st.subheader("Scale Numeric Variables")
numeric_cols = df_clean.select_dtypes(include=[np.number]).columns.tolist()
if not numeric_cols:
st.info("ℹ️ No numeric columns to scale.")
else:
cols_to_scale = st.multiselect("Select numeric columns to scale:", numeric_cols, default=numeric_cols[:2] if len(numeric_cols) >= 2 else numeric_cols)
scaling_method = st.radio("Scaling method:", ["StandardScaler (Z-score)", "MinMaxScaler (0-1)"])
if st.button("Apply Scaling"):
try:
if scaling_method == "StandardScaler (Z-score)":
scaler = StandardScaler()
else:
scaler = MinMaxScaler()
df_clean[cols_to_scale] = scaler.fit_transform(df_clean[cols_to_scale])
st.session_state['df_clean'] = df_clean
st.success(f"✅ Columns {cols_to_scale} scaled using {scaling_method}.")
except Exception as e:
st.error(f"❌ Error during scaling: {e}")
# === DOWNLOAD CLEANED DATA ===
st.header("📥 Download Cleaned Data")
df_clean_final = st.session_state['df_clean']
# Show final preview
with st.expander("Final Cleaned Data Preview"):
st.dataframe(df_clean_final.head(10))
# Download button
csv = df_clean_final.to_csv(index=False).encode('utf-8')
st.download_button(
label="💾 Download Cleaned CSV",
data=csv,
file_name="cleaned_data.csv",
mime="text/csv",
)
# Reset button
if st.button("🔄 Reset to Original Data"):
st.session_state['df_clean'] = st.session_state['df_original'].copy()
st.success("✅ Data reset to original state.")
# Footer
st.markdown("---")
st.caption(f"© {AUTHOR} | License {LICENSE} | Contact: {EMAIL}")