Spaces:
Sleeping
Sleeping
| # streamlit_data_analysis_app.py | |
| # Streamlit Data Analysis App for Hugging Face Spaces | |
| # Features: | |
| # - Upload CSV / Excel | |
| # - Automatic cleaning & standardization (column names, missing values, dtypes) | |
| # - Preprocessing (imputation, encoding, scaling) | |
| # - Quick visualizations (histogram, boxplot, scatter, correlation heatmap) | |
| # - Preview cleaned dataset | |
| # - LLM-powered dataset summary & insights using Hugging Face Inference API | |
| # - Uses HF_TOKEN from Streamlit secrets (or environment variable) | |
| import os | |
| import io | |
| import math | |
| from typing import Optional, Tuple, List, Dict | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.pipeline import Pipeline | |
| from huggingface_hub import InferenceClient | |
| # ---------- Configuration ---------- | |
| st.set_page_config(page_title="Data Analysis App", layout="wide") | |
| # Try to read HF token from Streamlit secrets then environment | |
| HF_TOKEN = None | |
| try: | |
| HF_TOKEN = st.secrets.get("HF_TOKEN") | |
| except Exception: | |
| HF_TOKEN = None | |
| if not HF_TOKEN: | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # Default open-source model choices (available on Hugging Face) | |
| MODEL_OPTIONS = { | |
| "bigscience/bloomz-7b1": "BloomZ 7B (instruction-tuned)", | |
| "tiiuae/falcon-7b-instruct": "Falcon 7B Instruct", | |
| "bigscience/bloom-3b": "Bloom 3B (lighter)" | |
| } | |
| # ---------- Utility functions ---------- | |
| def read_file(uploaded_file: st.uploaded_file_manager.UploadedFile) -> pd.DataFrame: | |
| name = uploaded_file.name.lower() | |
| if name.endswith(('.csv', '.txt')): | |
| return pd.read_csv(uploaded_file) | |
| elif name.endswith(('.xls', '.xlsx')): | |
| return pd.read_excel(uploaded_file) | |
| else: | |
| raise ValueError("Unsupported file type. Please upload CSV or Excel.") | |
| def clean_column_name(col: str) -> str: | |
| # standardize: strip, lower, replace spaces and special chars with _ | |
| col = str(col).strip() | |
| col = col.replace("\n", " ").replace("\t", " ") | |
| col = col.lower() | |
| col = "_".join(col.split()) | |
| # keep alphanumerics and _ | |
| col = ''.join(c for c in col if (c.isalnum() or c == '_')) | |
| # collapse multiple _ | |
| while '__' in col: | |
| col = col.replace('__', '_') | |
| return col | |
| def standardize_dataframe(df: pd.DataFrame, drop_all_nan_cols: bool = True) -> pd.DataFrame: | |
| df = df.copy() | |
| # strip whitespace from string columns | |
| for c in df.select_dtypes(include=['object']).columns: | |
| df[c] = df[c].apply(lambda x: x.strip() if isinstance(x, str) else x) | |
| # standardize column names | |
| df.columns = [clean_column_name(c) for c in df.columns] | |
| # drop fully empty columns | |
| if drop_all_nan_cols: | |
| df.dropna(axis=1, how='all', inplace=True) | |
| # try to parse datetime columns heuristically | |
| for c in df.columns: | |
| if df[c].dtype == object: | |
| sample = df[c].dropna().astype(str).head(20) | |
| if not sample.empty: | |
| # quick heuristic: if majority parse as datetime | |
| parsed = pd.to_datetime(sample, errors='coerce') | |
| if parsed.notna().sum() / len(sample) > 0.6: | |
| df[c] = pd.to_datetime(df[c], errors='coerce') | |
| return df | |
| def summarize_dataframe(df: pd.DataFrame, max_rows: int = 5) -> Dict: | |
| summary = {} | |
| summary['shape'] = df.shape | |
| summary['columns'] = [] | |
| for c in df.columns: | |
| col_info = { | |
| 'name': c, | |
| 'dtype': str(df[c].dtype), | |
| 'n_missing': int(df[c].isna().sum()), | |
| 'n_unique': int(df[c].nunique(dropna=True)) if df[c].dtype != 'object' else int(df[c].nunique(dropna=True)), | |
| } | |
| if pd.api.types.is_numeric_dtype(df[c]): | |
| desc = df[c].describe().to_dict() | |
| col_info['summary'] = {k: float(v) for k, v in desc.items()} | |
| elif pd.api.types.is_datetime64_any_dtype(df[c]): | |
| col_info['summary'] = { | |
| 'min': str(df[c].min()), | |
| 'max': str(df[c].max()) | |
| } | |
| else: | |
| col_info['top_values'] = df[c].dropna().astype(str).value_counts().head(5).to_dict() | |
| summary['columns'].append(col_info) | |
| summary['preview'] = df.head(max_rows).to_dict(orient='records') | |
| return summary | |
| def prepare_preprocessing_pipeline(df: pd.DataFrame, impute_strategy_num='median', scale_numeric=True, encode_categorical='onehot') -> Tuple[Pipeline, List[str]]: | |
| numeric_cols = list(df.select_dtypes(include=[np.number]).columns) | |
| cat_cols = list(df.select_dtypes(include=['object', 'category', 'bool']).columns) | |
| datetime_cols = list(df.select_dtypes(include=['datetime64']).columns) | |
| transformers = [] | |
| if numeric_cols: | |
| num_pipeline = Pipeline(steps=[ | |
| ('imputer', SimpleImputer(strategy=impute_strategy_num)), | |
| ]) | |
| if scale_numeric: | |
| num_pipeline.steps.append(('scaler', StandardScaler())) | |
| transformers.append(('num', num_pipeline, numeric_cols)) | |
| if cat_cols: | |
| if encode_categorical == 'onehot': | |
| cat_pipeline = Pipeline(steps=[ | |
| ('imputer', SimpleImputer(strategy='most_frequent')), | |
| ('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False)) | |
| ]) | |
| else: | |
| cat_pipeline = Pipeline(steps=[ | |
| ('imputer', SimpleImputer(strategy='most_frequent')), | |
| ('ord', OrdinalEncoder()) | |
| ]) | |
| transformers.append(('cat', cat_pipeline, cat_cols)) | |
| preprocessor = ColumnTransformer(transformers=transformers, remainder='drop') | |
| return preprocessor, numeric_cols + cat_cols + datetime_cols | |
| def apply_preprocessing(df: pd.DataFrame, preprocessor: ColumnTransformer) -> pd.DataFrame: | |
| # returns processed numpy array and rebuilt column names for easy display | |
| X = preprocessor.fit_transform(df) | |
| # build feature names | |
| feature_names = [] | |
| for name, trans, columns in preprocessor.transformers_: | |
| if name == 'num': | |
| feature_names += columns | |
| elif name == 'cat': | |
| # try to extract categories from OneHotEncoder | |
| try: | |
| ohe = trans.named_steps.get('onehot') | |
| cats = ohe.categories_ | |
| for col, catvals in zip(columns, cats): | |
| for v in catvals: | |
| feature_names.append(f"{col}__{v}") | |
| except Exception: | |
| # fallback | |
| feature_names += columns | |
| else: | |
| feature_names += columns | |
| proc_df = pd.DataFrame(X, columns=feature_names) | |
| return proc_df | |
| # ---------- LLM helper ---------- | |
| def build_dataset_prompt(summary: Dict, user_question: Optional[str] = None) -> str: | |
| # Build a robust prompt summarizing the dataset for the LLM to give insights | |
| s = [] | |
| s.append("You are a helpful data analyst assistant. I will give you a dataset summary and ask for insights and next steps.") | |
| s.append(f"Dataset shape: {summary['shape'][0]} rows, {summary['shape'][1]} columns.") | |
| s.append("Columns:") | |
| for col in summary['columns']: | |
| s.append(f"- {col['name']} (dtype: {col['dtype']}; missing: {col['n_missing']}; unique: {col['n_unique']})") | |
| if 'summary' in col: | |
| s.append(f" summary: {col['summary']}") | |
| if 'top_values' in col: | |
| s.append(f" top values: {col['top_values']}") | |
| s.append("Preview of top rows:") | |
| for r in summary['preview']: | |
| s.append(str(r)) | |
| if user_question: | |
| s.append("User question: " + user_question) | |
| else: | |
| s.append("Please provide: 1) quick dataset quality assessment, 2) columns of interest, 3) suggested cleaning steps, 4) recommended visualizations and quick findings, 5) suggested next steps for modeling or analysis.") | |
| prompt = "\n".join(s) | |
| return prompt | |
| def call_llm(prompt: str, model: str = 'bigscience/bloomz-7b1', max_tokens: int = 512) -> str: | |
| if not HF_TOKEN: | |
| return "ERROR: HF_TOKEN not found. Put your Hugging Face token in Streamlit secrets under 'HF_TOKEN' or set the HF_TOKEN environment variable." | |
| client = InferenceClient(token=HF_TOKEN) | |
| # Use the text generation endpoint | |
| try: | |
| response = client.text_generation(model=model, inputs=prompt, max_new_tokens=max_tokens) | |
| # The returned object structure depends on HF inference client; try to be robust | |
| if isinstance(response, list): | |
| return response[0].get('generated_text', str(response)) | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', str(response)) | |
| else: | |
| return str(response) | |
| except Exception as e: | |
| return f"LLM call failed: {e}" | |
| # ---------- Streamlit UI ---------- | |
| st.title("Data Analysis & Cleaning App β Streamlit (Deployable to Hugging Face Spaces)") | |
| st.markdown("Upload a CSV or Excel file, clean it, preprocess, preview cleaned data, visualize quickly, and ask an LLM for insights.") | |
| with st.sidebar: | |
| st.header("Options") | |
| model_choice = st.selectbox("LLM model (Inference API)", options=list(MODEL_OPTIONS.keys()), format_func=lambda k: MODEL_OPTIONS[k]) | |
| max_tokens = st.slider("LLM max tokens", min_value=128, max_value=1024, value=512, step=64) | |
| impute_strategy_num = st.selectbox("Numeric imputation", ['mean', 'median', 'most_frequent']) | |
| encode_categorical = st.selectbox("Categorical encoding", ['onehot', 'ordinal']) | |
| scale_numeric = st.checkbox("Scale numeric features", value=True) | |
| show_raw_preview = st.checkbox("Show raw preview (before cleaning)", value=True) | |
| uploaded_file = st.file_uploader("Upload CSV or Excel file", type=['csv', 'xls', 'xlsx', 'txt']) | |
| if uploaded_file: | |
| try: | |
| with st.spinner("Reading file..."): | |
| raw_df = read_file(uploaded_file) | |
| except Exception as e: | |
| st.error(f"Failed to read file: {e}") | |
| st.stop() | |
| if show_raw_preview: | |
| st.subheader("Raw data preview") | |
| st.dataframe(raw_df.head(10)) | |
| st.subheader("Cleaning & Standardization") | |
| drop_all_nan_cols = st.checkbox("Drop columns with all missing values", value=True) | |
| cleaned_df = standardize_dataframe(raw_df, drop_all_nan_cols=drop_all_nan_cols) | |
| st.write(f"Data after standardization β shape: {cleaned_df.shape}") | |
| st.dataframe(cleaned_df.head(10)) | |
| st.subheader("Quick data summary") | |
| summary = summarize_dataframe(cleaned_df, max_rows=5) | |
| col1, col2 = st.columns([2,1]) | |
| with col1: | |
| st.write(f"**Shape:** {summary['shape']}") | |
| st.write("**Columns:**") | |
| for c in summary['columns']: | |
| st.markdown(f"- **{c['name']}** β dtype: {c['dtype']} β missing: {c['n_missing']} β unique: {c['n_unique']}") | |
| with col2: | |
| st.write("**Preview (head)**") | |
| st.table(pd.DataFrame(summary['preview'])) | |
| st.subheader("Preprocessing") | |
| if st.button("Generate preprocessing pipeline and preview processed data"): | |
| preprocessor, kept_cols = prepare_preprocessing_pipeline(cleaned_df, impute_strategy_num=impute_strategy_num, scale_numeric=scale_numeric, encode_categorical=encode_categorical) | |
| try: | |
| proc_df = apply_preprocessing(cleaned_df, preprocessor) | |
| st.success("Preprocessing applied β showing preview") | |
| st.dataframe(proc_df.head(10)) | |
| st.markdown(f"Processed feature count: **{proc_df.shape[1]}**") | |
| csv = proc_df.to_csv(index=False) | |
| st.download_button("Download processed CSV", data=csv, file_name="processed_data.csv") | |
| except Exception as e: | |
| st.error(f"Failed to process dataset: {e}") | |
| st.subheader("Quick visualizations") | |
| viz_col = st.selectbox("Select column for visualization (numeric or categorical)", options=list(cleaned_df.columns)) | |
| viz_type = st.selectbox("Chart type", ['Histogram', 'Boxplot', 'Bar (categorical)', 'Scatter (choose second column)', 'Correlation heatmap']) | |
| if viz_type == 'Scatter (choose second column)': | |
| second_col = st.selectbox("Second column for scatter", options=[c for c in cleaned_df.columns if c != viz_col]) | |
| if st.button("Show visualization"): | |
| fig = plt.figure(figsize=(8,5)) | |
| try: | |
| if viz_type == 'Histogram': | |
| series = pd.to_numeric(cleaned_df[viz_col], errors='coerce') | |
| series.dropna(inplace=True) | |
| plt.hist(series, bins='auto') | |
| plt.title(f'Histogram β {viz_col}') | |
| elif viz_type == 'Boxplot': | |
| series = pd.to_numeric(cleaned_df[viz_col], errors='coerce') | |
| sns.boxplot(x=series) | |
| plt.title(f'Boxplot β {viz_col}') | |
| elif viz_type == 'Bar (categorical)': | |
| counts = cleaned_df[viz_col].astype(str).value_counts().head(30) | |
| sns.barplot(x=counts.values, y=counts.index) | |
| plt.title(f'Bar chart β {viz_col}') | |
| elif viz_type == 'Scatter (choose second column)': | |
| x = pd.to_numeric(cleaned_df[viz_col], errors='coerce') | |
| y = pd.to_numeric(cleaned_df[second_col], errors='coerce') | |
| mask = x.notna() & y.notna() | |
| plt.scatter(x[mask], y[mask], alpha=0.6) | |
| plt.xlabel(viz_col) | |
| plt.ylabel(second_col) | |
| plt.title(f'Scatter β {viz_col} vs {second_col}') | |
| elif viz_type == 'Correlation heatmap': | |
| numeric = cleaned_df.select_dtypes(include=[np.number]) | |
| corr = numeric.corr() | |
| sns.heatmap(corr, annot=True, fmt='.2f', cmap='coolwarm') | |
| plt.title('Correlation heatmap (numeric features)') | |
| st.pyplot(fig) | |
| except Exception as e: | |
| st.error(f"Failed to create visualization: {e}") | |
| st.subheader("Ask the LLM for insights (optional)") | |
| user_question = st.text_area("Specific question for the LLM (if empty, a general assessment will be produced)") | |
| if st.button("Get LLM insights"): | |
| with st.spinner("Preparing prompt and calling LLM..."): | |
| prompt = build_dataset_prompt(summary, user_question=user_question if user_question else None) | |
| llm_answer = call_llm(prompt, model=model_choice, max_tokens=max_tokens) | |
| st.subheader("LLM response") | |
| st.write(llm_answer) | |
| st.subheader("Duplicate & Missing-value helpers") | |
| if st.button("Show duplicate rows (if any)"): | |
| dup = cleaned_df[cleaned_df.duplicated(keep=False)] | |
| if dup.empty: | |
| st.write("No duplicates found") | |
| else: | |
| st.dataframe(dup) | |
| if st.button("Show columns with > 20% missing values"): | |
| thresh = 0.2 | |
| miss = (cleaned_df.isna().mean() > thresh) | |
| cols = list(miss[miss].index) | |
| if not cols: | |
| st.write("No columns have more than 20% missing values") | |
| else: | |
| st.write(cols) | |
| st.markdown("---") | |
| st.markdown("**Deployment notes**: This app is ready to be deployed to Hugging Face Spaces. Add your Hugging Face token to the Space secrets as `HF_TOKEN`. Use a GPU-enabled Space if you want to run large models locally; otherwise the Inference API will run models hosted by Hugging Face via your token.") | |
| else: | |
| st.info("Upload a CSV or Excel file to get started.") | |
| # End of app | |