data_analysis / src /streamlit_app.py
Starberry15's picture
Update src/streamlit_app.py
4335246 verified
raw
history blame
15.5 kB
# streamlit_data_analysis_app.py
# Streamlit Data Analysis App for Hugging Face Spaces
# Features:
# - Upload CSV / Excel
# - Automatic cleaning & standardization (column names, missing values, dtypes)
# - Preprocessing (imputation, encoding, scaling)
# - Quick visualizations (histogram, boxplot, scatter, correlation heatmap)
# - Preview cleaned dataset
# - LLM-powered dataset summary & insights using Hugging Face Inference API
# - Uses HF_TOKEN from Streamlit secrets (or environment variable)
import os
import io
import math
from typing import Optional, Tuple, List, Dict
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from huggingface_hub import InferenceClient
# ---------- Configuration ----------
st.set_page_config(page_title="Data Analysis App", layout="wide")
# Try to read HF token from Streamlit secrets then environment
HF_TOKEN = None
try:
HF_TOKEN = st.secrets.get("HF_TOKEN")
except Exception:
HF_TOKEN = None
if not HF_TOKEN:
HF_TOKEN = os.getenv("HF_TOKEN")
# Default open-source model choices (available on Hugging Face)
MODEL_OPTIONS = {
"bigscience/bloomz-7b1": "BloomZ 7B (instruction-tuned)",
"tiiuae/falcon-7b-instruct": "Falcon 7B Instruct",
"bigscience/bloom-3b": "Bloom 3B (lighter)"
}
# ---------- Utility functions ----------
def read_file(uploaded_file: st.uploaded_file_manager.UploadedFile) -> pd.DataFrame:
name = uploaded_file.name.lower()
if name.endswith(('.csv', '.txt')):
return pd.read_csv(uploaded_file)
elif name.endswith(('.xls', '.xlsx')):
return pd.read_excel(uploaded_file)
else:
raise ValueError("Unsupported file type. Please upload CSV or Excel.")
def clean_column_name(col: str) -> str:
# standardize: strip, lower, replace spaces and special chars with _
col = str(col).strip()
col = col.replace("\n", " ").replace("\t", " ")
col = col.lower()
col = "_".join(col.split())
# keep alphanumerics and _
col = ''.join(c for c in col if (c.isalnum() or c == '_'))
# collapse multiple _
while '__' in col:
col = col.replace('__', '_')
return col
def standardize_dataframe(df: pd.DataFrame, drop_all_nan_cols: bool = True) -> pd.DataFrame:
df = df.copy()
# strip whitespace from string columns
for c in df.select_dtypes(include=['object']).columns:
df[c] = df[c].apply(lambda x: x.strip() if isinstance(x, str) else x)
# standardize column names
df.columns = [clean_column_name(c) for c in df.columns]
# drop fully empty columns
if drop_all_nan_cols:
df.dropna(axis=1, how='all', inplace=True)
# try to parse datetime columns heuristically
for c in df.columns:
if df[c].dtype == object:
sample = df[c].dropna().astype(str).head(20)
if not sample.empty:
# quick heuristic: if majority parse as datetime
parsed = pd.to_datetime(sample, errors='coerce')
if parsed.notna().sum() / len(sample) > 0.6:
df[c] = pd.to_datetime(df[c], errors='coerce')
return df
def summarize_dataframe(df: pd.DataFrame, max_rows: int = 5) -> Dict:
summary = {}
summary['shape'] = df.shape
summary['columns'] = []
for c in df.columns:
col_info = {
'name': c,
'dtype': str(df[c].dtype),
'n_missing': int(df[c].isna().sum()),
'n_unique': int(df[c].nunique(dropna=True)) if df[c].dtype != 'object' else int(df[c].nunique(dropna=True)),
}
if pd.api.types.is_numeric_dtype(df[c]):
desc = df[c].describe().to_dict()
col_info['summary'] = {k: float(v) for k, v in desc.items()}
elif pd.api.types.is_datetime64_any_dtype(df[c]):
col_info['summary'] = {
'min': str(df[c].min()),
'max': str(df[c].max())
}
else:
col_info['top_values'] = df[c].dropna().astype(str).value_counts().head(5).to_dict()
summary['columns'].append(col_info)
summary['preview'] = df.head(max_rows).to_dict(orient='records')
return summary
def prepare_preprocessing_pipeline(df: pd.DataFrame, impute_strategy_num='median', scale_numeric=True, encode_categorical='onehot') -> Tuple[Pipeline, List[str]]:
numeric_cols = list(df.select_dtypes(include=[np.number]).columns)
cat_cols = list(df.select_dtypes(include=['object', 'category', 'bool']).columns)
datetime_cols = list(df.select_dtypes(include=['datetime64']).columns)
transformers = []
if numeric_cols:
num_pipeline = Pipeline(steps=[
('imputer', SimpleImputer(strategy=impute_strategy_num)),
])
if scale_numeric:
num_pipeline.steps.append(('scaler', StandardScaler()))
transformers.append(('num', num_pipeline, numeric_cols))
if cat_cols:
if encode_categorical == 'onehot':
cat_pipeline = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
])
else:
cat_pipeline = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('ord', OrdinalEncoder())
])
transformers.append(('cat', cat_pipeline, cat_cols))
preprocessor = ColumnTransformer(transformers=transformers, remainder='drop')
return preprocessor, numeric_cols + cat_cols + datetime_cols
def apply_preprocessing(df: pd.DataFrame, preprocessor: ColumnTransformer) -> pd.DataFrame:
# returns processed numpy array and rebuilt column names for easy display
X = preprocessor.fit_transform(df)
# build feature names
feature_names = []
for name, trans, columns in preprocessor.transformers_:
if name == 'num':
feature_names += columns
elif name == 'cat':
# try to extract categories from OneHotEncoder
try:
ohe = trans.named_steps.get('onehot')
cats = ohe.categories_
for col, catvals in zip(columns, cats):
for v in catvals:
feature_names.append(f"{col}__{v}")
except Exception:
# fallback
feature_names += columns
else:
feature_names += columns
proc_df = pd.DataFrame(X, columns=feature_names)
return proc_df
# ---------- LLM helper ----------
def build_dataset_prompt(summary: Dict, user_question: Optional[str] = None) -> str:
# Build a robust prompt summarizing the dataset for the LLM to give insights
s = []
s.append("You are a helpful data analyst assistant. I will give you a dataset summary and ask for insights and next steps.")
s.append(f"Dataset shape: {summary['shape'][0]} rows, {summary['shape'][1]} columns.")
s.append("Columns:")
for col in summary['columns']:
s.append(f"- {col['name']} (dtype: {col['dtype']}; missing: {col['n_missing']}; unique: {col['n_unique']})")
if 'summary' in col:
s.append(f" summary: {col['summary']}")
if 'top_values' in col:
s.append(f" top values: {col['top_values']}")
s.append("Preview of top rows:")
for r in summary['preview']:
s.append(str(r))
if user_question:
s.append("User question: " + user_question)
else:
s.append("Please provide: 1) quick dataset quality assessment, 2) columns of interest, 3) suggested cleaning steps, 4) recommended visualizations and quick findings, 5) suggested next steps for modeling or analysis.")
prompt = "\n".join(s)
return prompt
def call_llm(prompt: str, model: str = 'bigscience/bloomz-7b1', max_tokens: int = 512) -> str:
if not HF_TOKEN:
return "ERROR: HF_TOKEN not found. Put your Hugging Face token in Streamlit secrets under 'HF_TOKEN' or set the HF_TOKEN environment variable."
client = InferenceClient(token=HF_TOKEN)
# Use the text generation endpoint
try:
response = client.text_generation(model=model, inputs=prompt, max_new_tokens=max_tokens)
# The returned object structure depends on HF inference client; try to be robust
if isinstance(response, list):
return response[0].get('generated_text', str(response))
elif isinstance(response, dict):
return response.get('generated_text', str(response))
else:
return str(response)
except Exception as e:
return f"LLM call failed: {e}"
# ---------- Streamlit UI ----------
st.title("Data Analysis & Cleaning App β€” Streamlit (Deployable to Hugging Face Spaces)")
st.markdown("Upload a CSV or Excel file, clean it, preprocess, preview cleaned data, visualize quickly, and ask an LLM for insights.")
with st.sidebar:
st.header("Options")
model_choice = st.selectbox("LLM model (Inference API)", options=list(MODEL_OPTIONS.keys()), format_func=lambda k: MODEL_OPTIONS[k])
max_tokens = st.slider("LLM max tokens", min_value=128, max_value=1024, value=512, step=64)
impute_strategy_num = st.selectbox("Numeric imputation", ['mean', 'median', 'most_frequent'])
encode_categorical = st.selectbox("Categorical encoding", ['onehot', 'ordinal'])
scale_numeric = st.checkbox("Scale numeric features", value=True)
show_raw_preview = st.checkbox("Show raw preview (before cleaning)", value=True)
uploaded_file = st.file_uploader("Upload CSV or Excel file", type=['csv', 'xls', 'xlsx', 'txt'])
if uploaded_file:
try:
with st.spinner("Reading file..."):
raw_df = read_file(uploaded_file)
except Exception as e:
st.error(f"Failed to read file: {e}")
st.stop()
if show_raw_preview:
st.subheader("Raw data preview")
st.dataframe(raw_df.head(10))
st.subheader("Cleaning & Standardization")
drop_all_nan_cols = st.checkbox("Drop columns with all missing values", value=True)
cleaned_df = standardize_dataframe(raw_df, drop_all_nan_cols=drop_all_nan_cols)
st.write(f"Data after standardization β€” shape: {cleaned_df.shape}")
st.dataframe(cleaned_df.head(10))
st.subheader("Quick data summary")
summary = summarize_dataframe(cleaned_df, max_rows=5)
col1, col2 = st.columns([2,1])
with col1:
st.write(f"**Shape:** {summary['shape']}")
st.write("**Columns:**")
for c in summary['columns']:
st.markdown(f"- **{c['name']}** β€” dtype: {c['dtype']} β€” missing: {c['n_missing']} β€” unique: {c['n_unique']}")
with col2:
st.write("**Preview (head)**")
st.table(pd.DataFrame(summary['preview']))
st.subheader("Preprocessing")
if st.button("Generate preprocessing pipeline and preview processed data"):
preprocessor, kept_cols = prepare_preprocessing_pipeline(cleaned_df, impute_strategy_num=impute_strategy_num, scale_numeric=scale_numeric, encode_categorical=encode_categorical)
try:
proc_df = apply_preprocessing(cleaned_df, preprocessor)
st.success("Preprocessing applied β€” showing preview")
st.dataframe(proc_df.head(10))
st.markdown(f"Processed feature count: **{proc_df.shape[1]}**")
csv = proc_df.to_csv(index=False)
st.download_button("Download processed CSV", data=csv, file_name="processed_data.csv")
except Exception as e:
st.error(f"Failed to process dataset: {e}")
st.subheader("Quick visualizations")
viz_col = st.selectbox("Select column for visualization (numeric or categorical)", options=list(cleaned_df.columns))
viz_type = st.selectbox("Chart type", ['Histogram', 'Boxplot', 'Bar (categorical)', 'Scatter (choose second column)', 'Correlation heatmap'])
if viz_type == 'Scatter (choose second column)':
second_col = st.selectbox("Second column for scatter", options=[c for c in cleaned_df.columns if c != viz_col])
if st.button("Show visualization"):
fig = plt.figure(figsize=(8,5))
try:
if viz_type == 'Histogram':
series = pd.to_numeric(cleaned_df[viz_col], errors='coerce')
series.dropna(inplace=True)
plt.hist(series, bins='auto')
plt.title(f'Histogram β€” {viz_col}')
elif viz_type == 'Boxplot':
series = pd.to_numeric(cleaned_df[viz_col], errors='coerce')
sns.boxplot(x=series)
plt.title(f'Boxplot β€” {viz_col}')
elif viz_type == 'Bar (categorical)':
counts = cleaned_df[viz_col].astype(str).value_counts().head(30)
sns.barplot(x=counts.values, y=counts.index)
plt.title(f'Bar chart β€” {viz_col}')
elif viz_type == 'Scatter (choose second column)':
x = pd.to_numeric(cleaned_df[viz_col], errors='coerce')
y = pd.to_numeric(cleaned_df[second_col], errors='coerce')
mask = x.notna() & y.notna()
plt.scatter(x[mask], y[mask], alpha=0.6)
plt.xlabel(viz_col)
plt.ylabel(second_col)
plt.title(f'Scatter β€” {viz_col} vs {second_col}')
elif viz_type == 'Correlation heatmap':
numeric = cleaned_df.select_dtypes(include=[np.number])
corr = numeric.corr()
sns.heatmap(corr, annot=True, fmt='.2f', cmap='coolwarm')
plt.title('Correlation heatmap (numeric features)')
st.pyplot(fig)
except Exception as e:
st.error(f"Failed to create visualization: {e}")
st.subheader("Ask the LLM for insights (optional)")
user_question = st.text_area("Specific question for the LLM (if empty, a general assessment will be produced)")
if st.button("Get LLM insights"):
with st.spinner("Preparing prompt and calling LLM..."):
prompt = build_dataset_prompt(summary, user_question=user_question if user_question else None)
llm_answer = call_llm(prompt, model=model_choice, max_tokens=max_tokens)
st.subheader("LLM response")
st.write(llm_answer)
st.subheader("Duplicate & Missing-value helpers")
if st.button("Show duplicate rows (if any)"):
dup = cleaned_df[cleaned_df.duplicated(keep=False)]
if dup.empty:
st.write("No duplicates found")
else:
st.dataframe(dup)
if st.button("Show columns with > 20% missing values"):
thresh = 0.2
miss = (cleaned_df.isna().mean() > thresh)
cols = list(miss[miss].index)
if not cols:
st.write("No columns have more than 20% missing values")
else:
st.write(cols)
st.markdown("---")
st.markdown("**Deployment notes**: This app is ready to be deployed to Hugging Face Spaces. Add your Hugging Face token to the Space secrets as `HF_TOKEN`. Use a GPU-enabled Space if you want to run large models locally; otherwise the Inference API will run models hosted by Hugging Face via your token.")
else:
st.info("Upload a CSV or Excel file to get started.")
# End of app