jaker86's picture
Update app.py
1ed57a6 verified
import pandas as pd
import numpy as np
import gradio as gr
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import confusion_matrix, classification_report, mean_squared_error, r2_score
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import f_classif
import matplotlib.pyplot as plt
import seaborn as sns
import io
from PIL import Image
# Constants
RANDOM_STATE = 42
MIN_ROWS = 10
MIN_COLS = 2
MAX_FEATURES_TO_SHOW = 10
# Global variable to store trained model and data
global_data = {'model': None, 'scaler': None, 'X_columns': None, 'y_type': None, 'uniques': None}
def update_dropdown(file):
if file is None:
return gr.update(choices=[], value=None)
try:
if file.name.endswith('.csv'):
df = pd.read_csv(file.name)
elif file.name.endswith('.xlsx'):
df = pd.read_excel(file.name)
else:
return gr.update(choices=[], value=None)
return gr.update(choices=list(df.columns), value=None)
except Exception as e:
print(f"Error in update_dropdown: {e}") # Debug logging
return gr.update(choices=[], value=None)
def analyze_file(file, label_col, n_clusters):
if file is None:
return ("Please upload a file.", None, None, None, None, None)
try:
if file.name.endswith('.csv'):
df = pd.read_csv(file.name)
elif file.name.endswith('.xlsx'):
df = pd.read_excel(file.name)
else:
return ("Unsupported file type. Please upload a CSV or XLSX file.", None, None, None, None, None)
except Exception as e:
print(f"Error reading file: {e}") # Debug logging
return (f"Error reading file: {e}", None, None, None, None, None)
if df.empty:
return ("File is empty.", None, None, None, None, None)
if label_col not in df.columns:
return (f"Label column '{label_col}' not found.", None, None, None, None, None)
df = df.dropna()
if df.shape[0] < MIN_ROWS:
return (f"Not enough data rows (less than {MIN_ROWS}) after removing missing values.", None, None, None, None, None)
if df.shape[1] < MIN_COLS:
return ("Need at least one feature and one label column.", None, None, None, None, None)
y = df[label_col]
X = df.drop(columns=[label_col])
X_processed = pd.get_dummies(X)
if X_processed.shape[1] == 0:
return ("No valid features after preprocessing.", None, None, None, None, None)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_processed)
results_text = ""
model_img = None
fi_img = None
kmeans_img = None
agg_img = None
diff_img = None
try:
if pd.api.types.is_numeric_dtype(y):
# Regression
X_train, X_test, y_train, y_test = train_test_split(X_processed, y, test_size=0.3, random_state=RANDOM_STATE)
model = RandomForestRegressor(random_state=RANDOM_STATE)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
results_text += (
"Regression Results:\n"
f"- MSE: {mse:.3f}\n"
f"- R²: {r2:.3f}\n"
"\nCheck the 'Feature Importances' tab to see the top features impacting predictions.\n"
)
# 2D Plots: Top 3 features vs predicted and true vs predicted
fi = pd.Series(model.feature_importances_, index=X_processed.columns).sort_values(ascending=False)
top_features = fi.head(3).index
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
for i, feature in enumerate(top_features):
ax = axes[i]
ax.scatter(X_test[feature], y_pred, alpha=0.5)
ax.set_xlabel(feature)
ax.set_ylabel('Predicted Value')
ax.set_title(f'{feature} vs Predicted')
ax = axes[3]
ax.scatter(y_test, y_pred, alpha=0.5)
ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', label='Perfect Prediction')
ax.set_xlabel('True Value')
ax.set_ylabel('Predicted Value')
ax.set_title('True vs Predicted')
min_val = min(y_test.min(), y_pred.min())
max_val = max(y_test.max(), y_pred.max())
ax.set_xlim(min_val, max_val)
ax.set_ylim(min_val, max_val)
ax.legend()
plt.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
model_img = Image.open(buf)
global_data.update({'model': model, 'scaler': scaler, 'X_columns': X_processed.columns, 'y_type': 'regression', 'uniques': None})
else:
# Classification
if len(y.unique()) < 2:
return ("Label must have at least 2 unique values.", None, None, None, None, None)
y_encoded, uniques = pd.factorize(y)
X_train, X_test, y_train, y_test = train_test_split(X_processed, y_encoded, test_size=0.3, random_state=RANDOM_STATE)
model = RandomForestClassifier(random_state=RANDOM_STATE)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
cr = classification_report(y_test, y_pred, target_names=[str(u) for u in uniques])
results_text += "Classification Results:\n" + cr + "\n"
# 2D Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=[str(u) for u in uniques], yticklabels=[str(u) for u in uniques])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
model_img = Image.open(buf)
global_data.update({'model': model, 'scaler': scaler, 'X_columns': X_processed.columns, 'y_type': 'classification', 'uniques': uniques})
except Exception as e:
results_text += f"\nError during model training: {e}"
try:
fi = pd.Series(model.feature_importances_, index=X_processed.columns).sort_values(ascending=False).head(MAX_FEATURES_TO_SHOW)
plt.figure(figsize=(10, 6))
sns.barplot(x=fi.values, y=fi.index)
plt.title("Top 10 Feature Importances")
plt.xlabel("Importance")
plt.ylabel("Feature")
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
fi_img = Image.open(buf)
except Exception as e:
results_text += f"\nWarning: Could not compute feature importance: {e}"
try:
kmeans = KMeans(n_clusters=n_clusters, random_state=RANDOM_STATE)
clusters_kmeans = kmeans.fit_predict(X_scaled)
pca = PCA(n_components=2, random_state=RANDOM_STATE)
X_pca = pca.fit_transform(X_scaled)
explained_var = sum(pca.explained_variance_ratio_)
plt.figure(figsize=(8, 6))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=clusters_kmeans, cmap="viridis", alpha=0.7)
plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.title(f"KMeans Clustering (PCA, {explained_var:.2%} variance explained)")
plt.colorbar(scatter, ticks=range(n_clusters))
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
kmeans_img = Image.open(buf)
except Exception as e:
results_text += f"\nWarning: KMeans clustering failed: {e}"
try:
agg = AgglomerativeClustering(n_clusters=n_clusters)
clusters_agg = agg.fit_predict(X_scaled)
plt.figure(figsize=(8, 6))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=clusters_agg, cmap="plasma", alpha=0.7)
plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.title(f"Agglomerative Clustering (PCA, {explained_var:.2%} variance explained)")
plt.colorbar(scatter, ticks=range(n_clusters))
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
agg_img = Image.open(buf)
except Exception as e:
results_text += f"\nWarning: Agglomerative clustering failed: {e}"
try:
f_scores, _ = f_classif(X_processed, clusters_kmeans)
# Handle potential division by zero or NaN values
f_scores = np.nan_to_num(f_scores, nan=0.0, posinf=0.0)
f_series = pd.Series(f_scores, index=X_processed.columns).sort_values(ascending=False).head(MAX_FEATURES_TO_SHOW)
plt.figure(figsize=(10, 6))
sns.barplot(data=f_series.reset_index(), x="index", y=0, hue="index", legend=False) # Fix palette warning
plt.title("Top 10 Differentiating Features (ANOVA F-scores)")
plt.xlabel("F-score")
plt.ylabel("Feature")
plt.xticks(rotation=45)
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close()
buf.seek(0)
diff_img = Image.open(buf)
except Exception as e:
results_text += f"\nWarning: Could not compute differentiating features: {e}"
return results_text, model_img, fi_img, kmeans_img, agg_img, diff_img
def predict_interactive(*args):
if global_data['model'] is None:
return "Please analyze a file first to train the model."
try:
# Convert args to kwargs based on column names
kwargs = {}
if len(args) > 0 and global_data['X_columns'] is not None:
for i, col in enumerate(global_data['X_columns']):
if i < len(args):
kwargs[col] = args[i]
# Create DataFrame from user inputs
input_data = pd.DataFrame([kwargs])
# Handle categorical variables with one-hot encoding
X_processed = pd.get_dummies(input_data)
# Ensure all expected columns are present
for col in global_data['X_columns']:
if col not in X_processed.columns:
X_processed[col] = 0
# Reorder columns to match training data
X_processed = X_processed[global_data['X_columns']]
# Scale the input
X_scaled = global_data['scaler'].transform(X_processed)
# Predict
prediction = global_data['model'].predict(X_scaled)
if global_data['y_type'] == 'classification':
pred_value = global_data['uniques'][int(prediction[0])]
return f"Predicted class: {pred_value}"
else:
return f"Predicted value: {prediction[0]:.3f}"
except Exception as e:
return f"Error in prediction: {str(e)}. Please ensure all inputs are valid numbers or categories."
def create_interactive_inputs(file, label_col):
if file is None or label_col is None:
print("No file or label column provided") # Debug logging
return []
try:
if file.name.endswith('.csv'):
df = pd.read_csv(file.name)
elif file.name.endswith('.xlsx'):
df = pd.read_excel(file.name)
else:
print("Unsupported file type") # Debug logging
return []
if df.empty or label_col not in df.columns:
print(f"Empty DataFrame or invalid label column: {label_col}") # Debug logging
return []
X = df.drop(columns=[label_col])
if X.empty:
print("No features available after dropping label column") # Debug logging
return []
components = []
for col in X.columns:
examples = X[col].dropna().sample(min(3, len(X[col].dropna()))).tolist()
if pd.api.types.is_numeric_dtype(X[col]):
components.append(gr.Number(label=f"{col} (e.g., {', '.join(map(str, examples))})", value=None))
else:
unique_values = X[col].dropna().unique().tolist()
components.append(gr.Dropdown(label=f"{col} (e.g., {', '.join(map(str, examples))})", choices=unique_values, value=None))
print(f"Generated {len(components)} input components") # Debug logging
return components
except Exception as e:
print(f"Error in create_interactive_inputs: {e}") # Debug logging
return []
with gr.Blocks() as demo:
gr.Markdown("## Data Analysis Explorer")
gr.Markdown("Upload a CSV or XLSX file to explore classification, regression, and clustering. Select a column to predict and the number of clusters!")
with gr.Row():
file_input = gr.File(label="Upload CSV or XLSX", file_types=[".csv", ".xlsx"])
label_dropdown = gr.Dropdown(label="Select Column to Predict", choices=[], interactive=True)
clusters_slider = gr.Slider(minimum=2, maximum=10, step=1, value=3, label="Number of Clusters")
file_input.change(fn=update_dropdown, inputs=file_input, outputs=label_dropdown)
analyze_btn = gr.Button("Analyze")
with gr.Tabs():
with gr.TabItem("Prediction Results"):
gr.Markdown("### Classification or Regression")
gr.Markdown("""
- **Regression**: Predicts numbers (e.g., sales). Uses Random Forest.
- **Classification**: Predicts categories (e.g., yes/no). Uses Random Forest.
- Rows with missing values are removed. 70% of data trains the model; 30% tests it.
""")
results_textbox = gr.Textbox(label="Performance Metrics", lines=10)
with gr.TabItem("Prediction Plot"):
gr.Markdown("### Prediction Visualization")
gr.Markdown("For regression: scatter plots of top 3 features vs. predicted values and true vs. predicted. For classification: confusion matrix.")
model_img_output = gr.Image(label="Prediction Output")
with gr.TabItem("Feature Importances"):
gr.Markdown("### Top 10 Key Features")
gr.Markdown("Shows the most important features for predictions. Higher bars mean bigger impact.")
fi_output = gr.Image(label="Feature Importances")
with gr.TabItem("KMeans Clustering"):
gr.Markdown("### KMeans Clustering")
gr.Markdown("Groups similar data points without using the selected column. Colors show clusters in 2D (PCA projection).")
kmeans_output = gr.Image(label="KMeans Clusters")
with gr.TabItem("Agglomerative Clustering"):
gr.Markdown("### Agglomerative Clustering")
gr.Markdown("Another way to group data hierarchically. Compare with KMeans to see differences!")
agg_output = gr.Image(label="Agglomerative Clusters")
with gr.TabItem("Cluster Differences"):
gr.Markdown("### Top 10 Cluster-Differentiating Features")
gr.Markdown("Shows features that vary most between clusters, helping explain the groupings.")
diff_output = gr.Image(label="Differentiating Features")
with gr.TabItem("Interactive"):
gr.Markdown("### Interactive Prediction")
gr.Markdown("Enter values for each feature to get a prediction based on the trained model.")
with gr.Column():
input_components = gr.State(value=[])
dynamic_inputs = gr.Column(visible=True)
predict_btn = gr.Button("Predict")
prediction_output = gr.Textbox(label="Prediction Result")
def update_inputs(file, label_col):
print(f"Updating inputs with file: {file}, label_col: {label_col}") # Debug logging
components = create_interactive_inputs(file, label_col)
# Return the components and update the Column's visibility
return components, gr.update(visible=True) # Only update visibility, components are rendered in Blocks
# Use Blocks to render components dynamically
with dynamic_inputs:
for component in components:
component.render()
file_input.change(
fn=update_inputs,
inputs=[file_input, label_dropdown],
outputs=[input_components, dynamic_inputs]
)
label_dropdown.change(
fn=update_inputs,
inputs=[file_input, label_dropdown],
outputs=[input_components, dynamic_inputs]
)
predict_btn.click(
fn=predict_interactive,
inputs=input_components,
outputs=prediction_output
)
analyze_btn.click(fn=analyze_file, inputs=[file_input, label_dropdown, clusters_slider],
outputs=[results_textbox, model_img_output, fi_output, kmeans_output, agg_output, diff_output])
demo.launch(debug=True) # Enable debug mode for more detailed error logging