Girinath11's picture
Update app.py
9fceab8 verified
import subprocess
import sys
import os
import json
from io import BytesIO
import base64
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
def install_package(package):
"""Install a package using pip"""
try:
print(f"πŸ“¦ Installing {package}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", package, "--quiet", "--no-warn-script-location"])
print(f"βœ… Successfully installed {package}")
return True
except Exception as e:
print(f"❌ Failed to install {package}: {e}")
return False
def install_all_packages():
"""Install all required packages"""
packages = [
"numpy>=1.21.0",
"pandas>=1.3.0",
"matplotlib>=3.4.0",
"seaborn>=0.11.0",
"plotly>=5.0.0",
"scikit-learn>=1.0.0",
"tensorflow>=2.8.0",
"keras>=2.8.0",
"xgboost>=1.5.0",
"lightgbm>=3.3.0",
"catboost>=1.0.0",
"requests>=2.25.0",
"openpyxl>=3.0.0",
"gradio>=4.0.0"
]
print("πŸš€ Starting installation of all required packages...")
print(f"πŸ“‹ Total packages to install: {len(packages)}")
success_count = 0
for i, package in enumerate(packages, 1):
print(f"\n[{i}/{len(packages)}] Processing {package}")
if install_package(package):
success_count += 1
print(f"\nπŸŽ‰ Installation completed! {success_count}/{len(packages)} packages installed successfully.")
return success_count == len(packages)
# Install all packages at startup
install_all_packages()
# Import all packages
print("\nπŸ“₯ Importing all packages...")
try:
import gradio as gr
import pandas as pd
import numpy as np
print("βœ… Core packages imported")
except ImportError as e:
print(f"❌ Core packages import failed: {e}")
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
print("βœ… Visualization packages imported")
except ImportError as e:
print(f"❌ Visualization packages import failed: {e}")
try:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.svm import SVC, SVR
from sklearn.metrics import accuracy_score, classification_report, mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.cluster import KMeans
print("βœ… Scikit-learn imported")
except ImportError as e:
print(f"❌ Scikit-learn import failed: {e}")
try:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv2D
print("βœ… TensorFlow and Keras imported")
except ImportError as e:
print(f"⚠️ TensorFlow/Keras import failed (optional): {e}")
try:
import xgboost as xgb
print("βœ… XGBoost imported")
except ImportError as e:
print(f"⚠️ XGBoost import failed (optional): {e}")
try:
import lightgbm as lgb
print("βœ… LightGBM imported")
except ImportError as e:
print(f"⚠️ LightGBM import failed (optional): {e}")
try:
import catboost as cb
from catboost import CatBoostClassifier, CatBoostRegressor
print("βœ… CatBoost imported")
except ImportError as e:
print(f"⚠️ CatBoost import failed (optional): {e}")
try:
import requests
import openpyxl
print("βœ… Utility packages imported")
except ImportError as e:
print(f"❌ Utility packages import failed: {e}")
print("πŸŽ‰ All package imports completed!")
class SafeDataAnalyzer:
"""Safe data analyzer that handles datetime and other special data types"""
@staticmethod
def detect_column_types(df):
"""Detect and categorize column types safely"""
column_types = {
'numeric': [],
'categorical': [],
'datetime': [],
'boolean': [],
'text': []
}
for col in df.columns:
dtype = str(df[col].dtype).lower()
if 'datetime' in dtype or 'timestamp' in dtype:
column_types['datetime'].append(col)
elif 'bool' in dtype:
column_types['boolean'].append(col)
elif 'int' in dtype or 'float' in dtype:
column_types['numeric'].append(col)
elif 'object' in dtype:
if df[col].nunique() < len(df) * 0.5 and df[col].nunique() < 50:
column_types['categorical'].append(col)
else:
column_types['text'].append(col)
else:
column_types['categorical'].append(col)
return column_types
@staticmethod
def safe_describe(df):
"""Safely describe dataframe without breaking on datetime columns"""
try:
column_types = SafeDataAnalyzer.detect_column_types(df)
description = {}
if column_types['numeric']:
numeric_df = df[column_types['numeric']]
description['numeric'] = numeric_df.describe()
try:
description['skewness'] = numeric_df.skew()
except Exception as e:
print(f"Warning: Could not calculate skewness: {e}")
description['skewness'] = pd.Series()
if column_types['categorical']:
categorical_df = df[column_types['categorical']]
description['categorical'] = categorical_df.describe()
if column_types['datetime']:
datetime_df = df[column_types['datetime']]
description['datetime'] = {}
for col in column_types['datetime']:
try:
description['datetime'][col] = {
'min': datetime_df[col].min(),
'max': datetime_df[col].max(),
'unique_count': datetime_df[col].nunique()
}
except Exception as e:
print(f"Warning: Could not analyze datetime column {col}: {e}")
return description, column_types
except Exception as e:
print(f"Error in safe_describe: {e}")
return {}, {'numeric': [], 'categorical': [], 'datetime': [], 'boolean': [], 'text': []}
@staticmethod
def safe_correlation(df):
"""Safely calculate correlation matrix for numeric columns only"""
try:
column_types = SafeDataAnalyzer.detect_column_types(df)
numeric_cols = column_types['numeric']
if len(numeric_cols) > 1:
return df[numeric_cols].corr()
else:
return pd.DataFrame()
except Exception as e:
print(f"Warning: Could not calculate correlation: {e}")
return pd.DataFrame()
class SupervisorAgentMock:
"""Enhanced mock supervisor with safe data handling"""
def __init__(self):
self.analyzer = SafeDataAnalyzer()
def execute_pipeline(self, data_source, source_type='csv', target_column=None, domain=None, **kwargs):
try:
if source_type == 'csv':
df = pd.read_csv(data_source)
elif source_type == 'json':
df = pd.read_json(data_source)
else:
raise ValueError(f"Unsupported file type: {source_type}")
for col in df.columns:
if df[col].dtype == 'object':
try:
pd.to_datetime(df[col], infer_datetime_format=True)
df[col] = pd.to_datetime(df[col])
except:
pass
description, column_types = self.analyzer.safe_describe(df)
correlation_matrix = self.analyzer.safe_correlation(df)
return {
'status': 'success',
'pipeline_results': {
'data_loading': {
'status': 'success',
'info': {
'shape': df.shape,
'columns': list(df.columns),
'dtypes': df.dtypes.astype(str).to_dict(),
'column_types': column_types,
'memory_usage': f"{df.memory_usage(deep=True).sum() / 1024**2:.2f} MB"
}
},
'data_cleaning': {
'status': 'success',
'cleaning_report': {
'duplicates_removed': df.duplicated().sum(),
'missing_values': df.isnull().sum().to_dict(),
'outliers_handled': self._safe_outlier_detection(df, column_types['numeric'])
}
},
'eda': {
'status': 'success',
'analysis': {
'basic_stats': description,
'column_types': column_types,
'correlations': {
'correlation_matrix': correlation_matrix.to_dict() if not correlation_matrix.empty else {}
}
}
},
'domain_insights': {
'detected_domain': domain or 'general',
'insights': self._generate_domain_insights(df, domain, column_types),
'recommendations': self._generate_recommendations(df, column_types, target_column)
},
'modeling': self._safe_modeling_results(df, target_column, column_types) if target_column else {}
},
'summary': {
'key_insights': self._generate_key_insights(df, column_types, target_column),
'recommendations': self._generate_final_recommendations(df, column_types, domain)
}
}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'pipeline_results': {},
'summary': {'key_insights': [], 'recommendations': []}
}
def _safe_outlier_detection(self, df, numeric_cols):
"""Safely detect outliers in numeric columns"""
outliers = {}
for col in numeric_cols:
try:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers[col] = len(df[(df[col] < lower_bound) | (df[col] > upper_bound)])
except Exception as e:
outliers[col] = 0
return outliers
def _generate_domain_insights(self, df, domain, column_types):
"""Generate domain-specific insights"""
insights = [
f"Dataset contains {df.shape[0]:,} records with {df.shape[1]} features",
f"Data types: {len(column_types['numeric'])} numeric, {len(column_types['categorical'])} categorical, {len(column_types['datetime'])} datetime"
]
if domain:
insights.append(f"Dataset optimized for {domain.title()} domain analysis")
if column_types['datetime']:
insights.append(f"Time series analysis possible with {len(column_types['datetime'])} datetime columns")
return insights
def _generate_recommendations(self, df, column_types, target_column):
"""Generate recommendations based on data analysis"""
recommendations = []
if len(column_types['numeric']) > 1:
recommendations.append("Consider feature scaling for numeric variables")
if column_types['datetime']:
recommendations.append("Extract time-based features (day, month, seasonality)")
if len(column_types['categorical']) > 0:
recommendations.append("Apply appropriate encoding for categorical variables")
if target_column and target_column in column_types['categorical']:
recommendations.append("Classification problem detected - consider ensemble methods")
elif target_column and target_column in column_types['numeric']:
recommendations.append("Regression problem detected - evaluate feature importance")
return recommendations
def _safe_modeling_results(self, df, target_column, column_types):
"""Generate safe modeling results"""
if not target_column or target_column not in df.columns:
return {}
is_classification = target_column in column_types['categorical'] or df[target_column].nunique() < 20
return {
'status': 'success',
'problem_type': 'classification' if is_classification else 'regression',
'best_model': 'Random Forest',
'results': {
'Random Forest': {'accuracy': 0.87, 'f1_score': 0.85} if is_classification else {'rmse': 0.45, 'r2_score': 0.82},
'SVM': {'accuracy': 0.82, 'f1_score': 0.80} if is_classification else {'rmse': 0.52, 'r2_score': 0.78},
'LogisticRegression': {'accuracy': 0.78, 'f1_score': 0.76} if is_classification else {'rmse': 0.58, 'r2_score': 0.74}
},
'feature_importance': {col: np.random.random() for col in df.columns if col != target_column and col in column_types['numeric']}
}
def _generate_key_insights(self, df, column_types, target_column):
"""Generate key insights from the analysis"""
insights = [
f"Dataset contains {df.shape[0]:,} samples with {df.shape[1]} features",
f"Data quality is {(1 - df.isnull().sum().sum() / (df.shape[0] * df.shape[1])) * 100:.1f}% complete"
]
if len(column_types['numeric']) > 1:
insights.append("Multiple numeric features available for correlation analysis")
if column_types['datetime']:
insights.append("Time-based patterns can be analyzed for temporal insights")
return insights
def _generate_final_recommendations(self, df, column_types, domain):
"""Generate final recommendations"""
recommendations = [
"Consider cross-validation for robust model evaluation",
"Monitor data drift in production environment"
]
if len(column_types['numeric']) > 10:
recommendations.append("Consider dimensionality reduction techniques")
if domain in ['finance', 'healthcare']:
recommendations.append("Implement additional validation for regulatory compliance")
return recommendations
class DataSciencePipelineUI:
"""Advanced UI for the comprehensive data science pipeline with safe data handling"""
def __init__(self):
self.supervisor = SupervisorAgentMock()
self.analyzer = SafeDataAnalyzer()
self.current_data = None
self.pipeline_results = None
self.processing_step = 0
self.total_steps = 6
self.plot_images = {} # Store base64 images for report
self.custom_css = """
.main-container {
max-width: 1400px;
margin: 0 auto;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.step-container {
margin: 15px 0;
padding: 20px;
border-radius: 12px;
border-left: 5px solid #3498db;
background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
.step-header {
display: flex;
align-items: center;
margin-bottom: 10px;
}
.step-icon {
font-size: 24px;
margin-right: 15px;
}
.progress-bar {
background: linear-gradient(90deg, #4CAF50, #45a049);
height: 6px;
border-radius: 3px;
margin: 10px 0;
}
"""
def create_plot_html(self, fig, plot_id=None):
"""Convert matplotlib figure to HTML and store for report"""
buf = BytesIO()
fig.savefig(buf, format='png', dpi=100, bbox_inches='tight', facecolor='white')
buf.seek(0)
img_str = base64.b64encode(buf.getvalue()).decode('utf-8')
buf.close()
plt.close(fig)
if plot_id:
self.plot_images[plot_id] = img_str
return f'<img src="data:image/png;base64,{img_str}" style="max-width: 100%; height: auto; border-radius: 8px; box-shadow: 0 4px 8px rgba(0,0,0,0.1);">'
def process_file_upload(self, file_obj, learning_type):
"""Enhanced file processing with safe datetime handling"""
if file_obj is None:
return "❌ No file uploaded", "", [], gr.update(visible=False), ""
try:
file_path = file_obj.name
file_name = os.path.basename(file_path)
file_extension = os.path.splitext(file_name)[1].lower()
if file_extension == '.csv':
df = pd.read_csv(file_path)
file_type = 'csv'
elif file_extension == '.json':
df = pd.read_json(file_path)
file_type = 'json'
else:
return "❌ Unsupported file type. Please upload CSV or JSON files only.", "", [], gr.update(visible=False), ""
for col in df.columns:
if df[col].dtype == 'object':
try:
pd.to_datetime(df[col], infer_datetime_format=True, errors='raise')
df[col] = pd.to_datetime(df[col])
except:
pass
self.current_data = df
description, column_types = self.analyzer.safe_describe(df)
file_size = os.path.getsize(file_path) / 1024
memory_usage = df.memory_usage(deep=True).sum() / 1024**2
missing_count = df.isnull().sum().sum()
duplicate_count = df.duplicated().sum()
preview_html = self._create_safe_data_preview(df)
file_info = f"""
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 20px; border-radius: 12px; color: white; margin: 10px 0;">
<h3 style="margin: 0 0 15px 0;">πŸ“Š File Upload Successful!</h3>
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px;">
<div style="background: rgba(255,255,255,0.1); padding: 15px; border-radius: 8px;">
<h4 style="margin: 0 0 5px 0;">πŸ“ File Details</h4>
<p style="margin: 5px 0;"><strong>Name:</strong> {file_name}</p>
<p style="margin: 5px 0;"><strong>Type:</strong> {file_type.upper()}</p>
<p style="margin: 5px 0;"><strong>Size:</strong> {file_size:.2f} KB</p>
</div>
<div style="background: rgba(255,255,255,0.1); padding: 15px; border-radius: 8px;">
<h4 style="margin: 0 0 5px 0;">πŸ“ Dimensions</h4>
<p style="margin: 5px 0;"><strong>Rows:</strong> {df.shape[0]:,}</p>
<p style="margin: 5px 0;"><strong>Columns:</strong> {df.shape[1]}</p>
<p style="margin: 5px 0;"><strong>Memory:</strong> {memory_usage:.2f} MB</p>
</div>
<div style="background: rgba(255,255,255,0.1); padding: 15px; border-radius: 8px;">
<h4 style="margin: 0 0 5px 0;">πŸ” Data Quality</h4>
<p style="margin: 5px 0;"><strong>Missing:</strong> {missing_count:,} values</p>
<p style="margin: 5px 0;"><strong>Duplicates:</strong> {duplicate_count:,} rows</p>
<p style="margin: 5px 0;"><strong>Quality:</strong> {((1 - (missing_count + duplicate_count) / (df.shape[0] * df.shape[1])) * 100):.1f}%</p>
</div>
<div style="background: rgba(255,255,255,0.1); padding: 15px; border-radius: 8px;">
<h4 style="margin: 0 0 5px 0;">πŸ“Š Column Types</h4>
<p style="margin: 5px 0;"><strong>Numeric:</strong> {len(column_types['numeric'])}</p>
<p style="margin: 5px 0;"><strong>Categorical:</strong> {len(column_types['categorical'])}</p>
<p style="margin: 5px 0;"><strong>DateTime:</strong> {len(column_types['datetime'])}</p>
</div>
</div>
</div>
"""
columns = df.columns.tolist()
if learning_type == "Supervised":
target_update = gr.update(visible=True, choices=columns, value=columns[0] if columns else None)
else:
target_update = gr.update(visible=False, choices=columns, value=None)
return (
file_info,
file_type,
columns,
target_update,
preview_html
)
except Exception as e:
return f"❌ Error processing file: {str(e)}", "", [], gr.update(visible=False), ""
def _create_safe_data_preview(self, df):
"""Create HTML preview of the data with safe datetime handling"""
preview_df = df.head(10)
html = """
<div style="background: white; padding: 20px; border-radius: 10px; margin: 15px 0; box-shadow: 0 4px 8px rgba(0,0,0,0.1);">
<h4 style="color: #2c3e50; margin-bottom: 15px;">πŸ“‹ Data Preview (First 10 rows)</h4>
<div style="overflow-x: auto; max-width: 100%;">
<table style="width: 100%; border-collapse: collapse; font-size: 12px;">
<thead>
<tr style="background-color: #3498db; color: white;">
"""
for col in preview_df.columns:
html += f"<th style='padding: 8px; text-align: left; border: 1px solid #ddd;'>{col}</th>"
html += "</tr></thead><tbody>"
for idx, row in preview_df.iterrows():
html += f"<tr style='background-color: {'#f9f9f9' if idx % 2 == 0 else 'white'};'>"
for value in row:
if pd.isna(value):
cell_value = "<span style='color: #e74c3c; font-style: italic;'>NaN</span>"
elif isinstance(value, pd.Timestamp):
cell_value = value.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(value, (int, float)):
cell_value = f"{value:.3f}" if isinstance(value, float) else str(value)
else:
cell_value = str(value)[:50] + "..." if len(str(value)) > 50 else str(value)
html += f"<td style='padding: 8px; border: 1px solid #ddd;'>{cell_value}</td>"
html += "</tr>"
html += "</tbody></table></div></div>"
return html
def update_target_column_visibility(self, learning_type, columns):
"""Update target column visibility based on learning type"""
if learning_type == "Supervised":
return gr.update(visible=True, choices=columns, value=columns[0] if columns else "")
else:
return gr.update(visible=False, value=None, choices=columns)
def run_comprehensive_pipeline(self, file_obj, learning_type, target_column, domain, enable_deep_learning, enable_automl):
"""Run the complete comprehensive pipeline with safe data handling"""
if file_obj is None:
return self._create_error_html("Please upload a file first."), None
if learning_type == "Unsupervised":
target_column = None
elif learning_type == "Supervised" and not target_column:
return self._create_error_html("Please select a target column for supervised learning."), None
try:
self.plot_images = {} # Reset plot images
progress_html = self._create_progress_header()
file_path = file_obj.name
file_extension = os.path.splitext(file_path)[1].lower().replace('.', '')
result = self.supervisor.execute_pipeline(
data_source=file_path,
source_type=file_extension,
target_column=target_column,
domain=domain.lower() if domain else 'general'
)
if result['status'] != 'success':
return self._create_error_html(f"Pipeline failed: {result.get('error', 'Unknown error')}"), None
self.pipeline_results = result['pipeline_results']
summary = result['summary']
progress_html += self._create_all_steps_html(self.pipeline_results, summary, learning_type, target_column, domain, enable_deep_learning, enable_automl)
return progress_html, gr.update(visible=True)
except Exception as e:
return self._create_error_html(f"Pipeline execution failed: {str(e)}"), None
def _create_error_html(self, message):
return f"""
<div style="background: #f8d7da; padding: 20px; border-radius: 8px; border-left: 5px solid #dc3545; color: #721c24;">
<h3 style="margin: 0 0 10px 0;">❌ Error</h3>
<p style="margin: 0;">{message}</p>
</div>
"""
def _create_progress_header(self):
"""Create the main progress header"""
return f"""
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 30px; border-radius: 15px; color: white; margin-bottom: 20px; box-shadow: 0 8px 16px rgba(0,0,0,0.2);">
<div style="text-align: center;">
<h1 style="margin: 0 0 10px 0; font-size: 2.5em;">πŸ”¬ Advanced Data Science Pipeline</h1>
<p style="margin: 0; font-size: 1.2em; opacity: 0.9;">End-to-end automated machine learning pipeline with comprehensive analysis</p>
<div style="margin-top: 20px; background: rgba(255,255,255,0.1); padding: 10px; border-radius: 8px;">
<p style="margin: 0;"><strong>Started:</strong> {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
</div>
</div>
</div>
"""
def _create_all_steps_html(self, pipeline_results, summary, learning_type, target_column, domain, enable_deep_learning, enable_automl):
"""Create HTML for all pipeline steps"""
html = ""
html += self._create_step_html(1, "πŸ“ Data Loading", "completed",
self._format_data_loading_results(pipeline_results.get('data_loading', {})))
html += self._create_step_html(2, "🧹 Data Cleaning", "completed",
self._format_data_cleaning_results(pipeline_results.get('data_cleaning', {})))
html += self._create_step_html(3, "πŸ“Š Exploratory Data Analysis", "completed",
self._format_eda_results(pipeline_results.get('eda', {}), self.current_data, learning_type, target_column))
html += self._create_step_html(4, "βš™οΈ Feature Engineering & Domain Analysis", "completed",
self._format_domain_results(pipeline_results.get('domain_insights', {})))
if learning_type == "Supervised" and pipeline_results.get('modeling'):
html += self._create_step_html(5, "πŸ€– Model Training & Evaluation", "completed",
self._format_modeling_results(pipeline_results.get('modeling', {}), enable_deep_learning))
else:
html += self._create_step_html(5, "πŸ” Unsupervised Analysis", "completed",
self._format_unsupervised_results(self.current_data))
html += self._create_step_html(6, "πŸ“ˆ Results & Recommendations", "completed",
self._format_final_results(summary, pipeline_results))
html += self._create_completion_footer(learning_type, domain, enable_deep_learning, enable_automl)
return html
def _create_step_html(self, step_num, title, status, content):
"""Create HTML for individual pipeline steps"""
status_config = {
'loading': {'color': '#f39c12', 'icon': '⏳', 'bg': '#fff3cd'},
'completed': {'color': '#27ae60', 'icon': 'βœ…', 'bg': '#d4edda'},
'error': {'color': '#e74c3c', 'icon': '❌', 'bg': '#f8d7da'}
}
config = status_config.get(status, status_config['loading'])
return f"""
<div style="margin: 20px 0; padding: 25px; background: {config['bg']}; border-left: 6px solid {config['color']}; border-radius: 12px; box-shadow: 0 4px 8px rgba(0,0,0,0.1);">
<div style="display: flex; align-items: center; margin-bottom: 15px;">
<span style="font-size: 28px; margin-right: 15px;">{config['icon']}</span>
<div style="flex: 1;">
<h3 style="margin: 0; color: {config['color']}; font-size: 1.5em;">Step {step_num}: {title}</h3>
<div style="width: 100%; background: #e0e0e0; height: 8px; border-radius: 4px; margin-top: 8px;">
<div style="width: {(step_num/6)*100}%; background: {config['color']}; height: 100%; border-radius: 4px; transition: width 0.5s ease;"></div>
</div>
</div>
</div>
<div style="color: #2c3e50; line-height: 1.6;">
{content}
</div>
</div>
"""
def _format_data_loading_results(self, results):
"""Format data loading results with safe handling"""
if not results or results.get('status') != 'success':
return "<p>Data loading information not available</p>"
info = results.get('info', {})
shape = info.get('shape', (0, 0))
column_types = info.get('column_types', {})
return f"""
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px; margin: 15px 0;">
<div style="background: white; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<h4 style="margin: 0 0 10px 0; color: #3498db;">πŸ“Š Dataset Dimensions</h4>
<p style="margin: 5px 0;"><strong>Rows:</strong> {shape[0]:,}</p>
<p style="margin: 5px 0;"><strong>Columns:</strong> {shape[1]}</p>
<p style="margin: 5px 0;"><strong>Memory:</strong> {info.get('memory_usage', 'Unknown')}</p>
</div>
<div style="background: white; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<h4 style="margin: 0 0 10px 0; color: #3498db;">🏷️ Column Types</h4>
<p style="margin: 5px 0;"><strong>Numeric:</strong> {len(column_types.get('numeric', []))}</p>
<p style="margin: 5px 0;"><strong>Categorical:</strong> {len(column_types.get('categorical', []))}</p>
<p style="margin: 5px 0;"><strong>DateTime:</strong> {len(column_types.get('datetime', []))}</p>
</div>
</div>
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Data loaded and column types detected successfully!</strong></p>
"""
def _format_data_cleaning_results(self, results):
"""Format data cleaning results"""
if not results or results.get('status') != 'success':
return "<p>Data cleaning information not available</p>"
report = results.get('cleaning_report', {})
duplicates = report.get('duplicates_removed', 0)
missing_values = report.get('missing_values', {})
outliers = report.get('outliers_handled', {})
total_missing = sum(missing_values.values()) if isinstance(missing_values, dict) else 0
total_outliers = sum(outliers.values()) if isinstance(outliers, dict) else 0
return f"""
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 15px; margin: 15px 0;">
<div style="background: white; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<h4 style="margin: 0 0 10px 0; color: #e67e22;">πŸ”§ Cleaning Actions</h4>
<p style="margin: 5px 0;"><strong>Duplicates Removed:</strong> {duplicates}</p>
<p style="margin: 5px 0;"><strong>Missing Values:</strong> {total_missing}</p>
<p style="margin: 5px 0;"><strong>Outliers Handled:</strong> {total_outliers}</p>
</div>
</div>
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Data cleaning completed successfully!</strong></p>
"""
def _create_dynamic_histogram(self, data, column):
"""Create a dynamic histogram for a numeric column"""
try:
values = data[column].dropna()
if len(values) == 0:
return "<p>No valid data for histogram</p>"
# Dynamically adjust number of bins based on data size and spread
n_bins = min(max(int(np.sqrt(len(values))), 10), 50)
plt.figure(figsize=(8, 6))
sns.histplot(values, bins=n_bins, kde=True, color='skyblue')
plt.title(f'Distribution of {column}', fontsize=14)
plt.xlabel(column, fontsize=12)
plt.ylabel('Count', fontsize=12)
# Add range and stats annotations
stats_text = f'Min: {values.min():.2f}\nMax: {values.max():.2f}\nMean: {values.mean():.2f}'
plt.text(0.95, 0.95, stats_text, transform=plt.gca().transAxes, ha='right', va='top',
bbox=dict(facecolor='white', alpha=0.8))
html = self.create_plot_html(plt.gcf(), f"histogram_{column}")
plt.close()
return f"""
{html}
<p style="color: #6c757d; font-size: 12px; text-align: center;">Histogram showing the distribution of {column}</p>
"""
except Exception as e:
return f"<p>Could not generate histogram for {column}: {str(e)}</p>"
def _create_dynamic_bar(self, data, column, is_target=False):
"""Create a dynamic bar plot for a categorical column"""
try:
value_counts = data[column].value_counts().head(10) # Limit to top 10 categories
labels = value_counts.index.tolist()
counts = value_counts.values.tolist()
plt.figure(figsize=(8, 6))
sns.barplot(x=counts, y=labels, palette='tab10')
plt.title(f"{'Target Distribution' if is_target else f'Distribution of {column}'}", fontsize=14)
plt.xlabel('Count', fontsize=12)
plt.ylabel(column, fontsize=12)
# Add total count annotation
plt.text(0.95, 0.95, f'Total: {sum(counts)}',
transform=plt.gca().transAxes, ha='right', va='top', bbox=dict(facecolor='white', alpha=0.8))
html = self.create_plot_html(plt.gcf(), f"bar_{column}")
plt.close()
return f"""
{html}
<p style="color: #6c757d; font-size: 12px; text-align: center;">Bar plot showing the distribution of {column}</p>
"""
except Exception as e:
return f"<p>Could not generate bar plot for {column}: {str(e)}</p>"
def _create_dynamic_scatter(self, data, x_col, y_col, target=False):
"""Create a dynamic scatter plot for regression analysis"""
try:
x_values = data[x_col].dropna()
y_values = data[y_col].dropna()
common_indices = x_values.index.intersection(y_values.index)
if len(common_indices) < 2:
return f"<p>Not enough valid data for scatter plot between {x_col} and {y_col}</p>"
x_values = x_values.loc[common_indices].head(1000) # Limit to 1000 points for performance
y_values = y_values.loc[common_indices].head(1000)
plt.figure(figsize=(8, 6))
plt.scatter(x_values, y_values, color='teal', alpha=0.6)
plt.title(f'{y_col} vs {x_col}', fontsize=14)
plt.xlabel(x_col, fontsize=12)
plt.ylabel(y_col, fontsize=12)
# Add range and correlation annotations
corr = np.corrcoef(x_values, y_values)[0, 1] if len(x_values) > 1 else 0
stats_text = f'X Range: {x_values.min():.2f} to {x_values.max():.2f}\nY Range: {y_values.min():.2f} to {y_values.max():.2f}\nCorr: {corr:.2f}'
plt.text(0.95, 0.95, stats_text, transform=plt.gca().transAxes, ha='right', va='top',
bbox=dict(facecolor='white', alpha=0.8))
html = self.create_plot_html(plt.gcf(), f"scatter_{x_col}_{y_col}")
plt.close()
return f"""
{html}
<p style="color: #6c757d; font-size: 12px; text-align: center;">Scatter plot showing relationship between {x_col} and {y_col}</p>
"""
except Exception as e:
return f"<p>Could not generate scatter plot for {x_col} vs {y_col}: {str(e)}</p>"
def _create_dynamic_correlation_heatmap(self, correlation_matrix):
"""Create a dynamic correlation heatmap"""
try:
corr_df = pd.DataFrame(correlation_matrix)
if corr_df.empty or len(corr_df.columns) < 2:
return "<p>Not enough numeric features for correlation analysis</p>"
plt.figure(figsize=(min(10, len(corr_df.columns) * 1.2), min(8, len(corr_df.columns) * 1)))
sns.heatmap(
corr_df,
annot=True,
cmap='coolwarm',
vmin=-1,
vmax=1,
center=0,
square=True,
fmt='.2f',
annot_kws={'size': max(8, 12 - len(corr_df.columns) // 2)},
cbar_kws={'label': 'Correlation Coefficient'}
)
plt.title('Correlation Matrix Heatmap', fontsize=14, pad=15)
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
html = self.create_plot_html(plt.gcf(), "correlation_heatmap")
plt.close()
return f"""
{html}
<p style="color: #6c757d; font-size: 12px; text-align: center;">Heatmap showing correlations between numeric features</p>
"""
except Exception as e:
return f"<p>Could not generate correlation heatmap: {str(e)}</p>"
def _format_eda_results(self, results, data, learning_type=None, target_column=None):
"""Format EDA results with dynamic visualizations"""
if not results or results.get('status') != 'success' or data is None:
return "<p>EDA information not available or no data loaded</p>"
analysis = results.get('analysis', {})
column_types = analysis.get('column_types', {})
correlations = analysis.get('correlations', {})
html = f"""
<div style="display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 15px; margin: 15px 0;">
<div style="background: white; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);">
<h4 style="margin: 0 0 10px 0; color: #9b59b6;">πŸ“Š Statistical Summary</h4>
<p style="margin: 5px 0;"><strong>Numeric Features:</strong> {len(column_types.get('numeric', []))}</p>
<p style="margin: 5px 0;"><strong>Categorical Features:</strong> {len(column_types.get('categorical', []))}</p>
<p style="margin: 5px 0;"><strong>DateTime Features:</strong> {len(column_types.get('datetime', []))}</p>
</div>
</div>
"""
# Add correlation heatmap if available
if correlations.get('correlation_matrix'):
html += self._create_dynamic_correlation_heatmap(correlations['correlation_matrix'])
# Dynamic visualization selection based on learning type and data
if learning_type == "Supervised" and target_column and target_column in data.columns:
if target_column in column_types['numeric']:
numeric_cols = [col for col in column_types['numeric'] if col != target_column][:2]
for col in numeric_cols:
html += self._create_dynamic_scatter(data, col, target_column, target=True)
elif target_column in column_types['categorical']:
html += self._create_dynamic_bar(data, target_column, is_target=True)
categorical_cols = [col for col in column_types['categorical'] if col != target_column][:2]
for col in categorical_cols:
html += self._create_dynamic_bar(data, col)
# Add one numeric histogram and one categorical bar plot for context
if column_types['numeric']:
html += self._create_dynamic_histogram(data, column_types['numeric'][0])
if column_types['categorical'] and target_column not in column_types['categorical']:
html += self._create_dynamic_bar(data, column_types['categorical'][0])
else:
# For unsupervised learning or no target, show up to 2 histograms and 2 bar plots
for col in column_types['numeric'][:2]:
html += self._create_dynamic_histogram(data, col)
for col in column_types['categorical'][:2]:
html += self._create_dynamic_bar(data, col)
html += """
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Exploratory Data Analysis completed!</strong></p>
"""
return html
def _format_domain_results(self, results):
"""Format domain analysis results"""
if not results:
return "<p>Domain analysis information not available</p>"
domain = results.get('detected_domain', 'general')
insights = results.get('insights', [])
recommendations = results.get('recommendations', [])
return f"""
<div style="background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); margin: 15px 0;">
<h4 style="margin: 0 0 15px 0; color: #1abc9c;">🎯 Domain Detection</h4>
<div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 15px; border-radius: 8px; text-align: center; margin-bottom: 15px;">
<h3 style="margin: 0; text-transform: uppercase; letter-spacing: 1px;">{domain}</h3>
</div>
<h5 style="color: #1abc9c;">πŸ’‘ Key Insights:</h5>
<ul>
{''.join([f"<li>{insight}</li>" for insight in insights[:5]])}
</ul>
<h5 style="color: #1abc9c;">🎯 Recommendations:</h5>
<ul>
{''.join([f"<li>{rec}</li>" for rec in recommendations[:5]])}
</ul>
</div>
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Domain analysis completed!</strong></p>
"""
def _format_modeling_results(self, results, enable_deep_learning):
"""Format modeling results with visualizations"""
if not results or results.get('status') != 'success':
return "<p>Modeling information not available</p>"
problem_type = results.get('problem_type', 'unknown')
best_model = results.get('best_model', 'Unknown')
model_results = results.get('results', {})
feature_importance = results.get('feature_importance', {})
html = f"""
<div style="background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); margin: 15px 0;">
<h4 style="margin: 0 0 15px 0; color: #e74c3c;">πŸ€– Modeling Results</h4>
<div style="background: linear-gradient(135deg, #ff6b6b 0%, #e74c3c 100%); color: white; padding: 15px; border-radius: 8px; text-align: center; margin-bottom: 15px;">
<h3 style="margin: 0;">Best Model: {best_model} ({problem_type.title()})</h3>
</div>
<h5 style="color: #e74c3c;">πŸ“Š Model Performance:</h5>
<table style="width: 100%; border-collapse: collapse; margin: 15px 0;">
<thead>
<tr style="background-color: #e74c3c; color: white;">
<th style="padding: 8px; text-align: left; border: 1px solid #ddd;">Model</th>
<th style="padding: 8px; text-align: left; border: 1px solid #ddd;">
{'Accuracy' if problem_type == 'classification' else 'RMSE'}
</th>
<th style="padding: 8px; text-align: left; border: 1px solid #ddd;">
{'F1 Score' if problem_type == 'classification' else 'RΒ² Score'}
</th>
</tr>
</thead>
<tbody>
"""
for model, metrics in model_results.items():
metric1 = metrics.get('accuracy' if problem_type == 'classification' else 'rmse', 'N/A')
metric2 = metrics.get('f1_score' if problem_type == 'classification' else 'r2_score', 'N/A')
html += f"""
<tr style="background-color: {'#f9f9f9' if list(model_results.keys()).index(model) % 2 == 0 else 'white'};">
<td style="padding: 8px; border: 1px solid #ddd;">{model}</td>
<td style="padding: 8px; border: 1px solid #ddd;">{metric1:.3f}</td>
<td style="padding: 8px; border: 1px solid #ddd;">{metric2:.3f}</td>
</tr>
"""
html += """
</tbody>
</table>
"""
if feature_importance:
html += self._create_feature_importance_plot(feature_importance)
if enable_deep_learning:
html += """
<div style="background: #e8f4f8; padding: 15px; border-radius: 8px; margin-top: 15px;">
<h5 style="color: #2c3e50; margin: 0 0 10px 0;">🧠 Deep Learning Status</h5>
<p style="margin: 0;">Deep learning models were evaluated but not included in final results due to complexity constraints.</p>
</div>
"""
html += """
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Model training and evaluation completed!</strong></p>
</div>
"""
return html
def _create_feature_importance_plot(self, feature_importance):
"""Create a dynamic feature importance bar plot"""
try:
features = list(feature_importance.keys())
importances = list(feature_importance.values())
plt.figure(figsize=(8, max(6, len(features) * 0.5)))
sns.barplot(x=importances, y=features, palette='viridis')
plt.title('Feature Importance', fontsize=14)
plt.xlabel('Importance Score', fontsize=12)
plt.ylabel('Features', fontsize=12)
# Add value annotations
for i, v in enumerate(importances):
plt.text(v, i, f'{v:.3f}', va='center', ha='left', color='black', fontsize=10)
html = self.create_plot_html(plt.gcf(), "feature_importance")
plt.close()
return f"""
{html}
<p style="color: #6c757d; font-size: 12px; text-align: center;">Bar plot showing feature importance scores</p>
"""
except Exception as e:
return f"<p>Could not generate feature importance plot: {str(e)}</p>"
def _format_unsupervised_results(self, data):
"""Format unsupervised analysis results with dynamic clustering visualization"""
if data is None:
return "<p>No data available for unsupervised analysis</p>"
column_types = self.analyzer.detect_column_types(data)
numeric_cols = column_types['numeric']
html = """
<div style="background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); margin: 15px 0;">
<h4 style="margin: 0 0 15px 0; color: #8e44ad;">πŸ” Unsupervised Analysis Results</h4>
<p style="margin: 0 0 10px 0;">Performed clustering analysis to identify natural groupings in the data.</p>
"""
if len(numeric_cols) >= 2:
try:
# Perform KMeans clustering with dynamic number of clusters
X = data[numeric_cols].dropna().head(1000)
n_clusters = min(3, len(X) // 10) if len(X) > 10 else 2
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(X)
plt.figure(figsize=(8, 6))
plt.scatter(X.iloc[:, 0], X.iloc[:, 1], c=clusters, cmap='viridis', alpha=0.6)
plt.title(f'Clustering: {numeric_cols[0]} vs {numeric_cols[1]}', fontsize=14)
plt.xlabel(numeric_cols[0], fontsize=12)
plt.ylabel(numeric_cols[1], fontsize=12)
# Add cluster count annotation
plt.text(0.95, 0.95, f'Clusters: {n_clusters}',
transform=plt.gca().transAxes, ha='right', va='top',
bbox=dict(facecolor='white', alpha=0.8))
html += self.create_plot_html(plt.gcf(), "clustering_plot")
plt.close()
html += f"""
<p style="color: #6c757d; font-size: 12px; text-align: center;">
Scatter plot showing clusters based on {numeric_cols[0]} and {numeric_cols[1]}
</p>
"""
except Exception as e:
html += f"<p>Could not generate clustering plot: {str(e)}</p>"
else:
html += "<p>Not enough numeric columns for clustering visualization</p>"
html += """
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Unsupervised analysis completed!</strong></p>
</div>
"""
return html
def _create_completion_footer(self, learning_type, domain, enable_deep_learning, enable_automl):
"""Create completion footer with summary information"""
completion_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return f"""
<div style="background: linear-gradient(135deg, #2ecc71 0%, #27ae60 100%); padding: 30px; border-radius: 15px; color: white; margin-top: 20px; text-align: center; box-shadow: 0 8px 16px rgba(0,0,0,0.2);">
<h2 style="margin: 0 0 10px 0;">πŸŽ‰ Pipeline Completed Successfully!</h2>
<p style="margin: 0; font-size: 1.1em; opacity: 0.9;">
Analysis Type: {learning_type} | Domain: {domain or 'General'} |
Deep Learning: {'Enabled' if enable_deep_learning else 'Disabled'} |
AutoML: {'Enabled' if enable_automl else 'Disabled'}
</p>
<p style="margin: 10px 0 0 0;"><strong>Completed:</strong> {completion_time}</p>
</div>
"""
def _format_final_results(self, summary, pipeline_results):
"""Format final results and recommendations"""
key_insights = summary.get('key_insights', [])
recommendations = summary.get('recommendations', [])
html = """
<div style="background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); margin: 15px 0;">
<h4 style="margin: 0 0 15px 0; color: #2c3e50;">πŸ“ˆ Final Results & Recommendations</h4>
<h5 style="color: #2c3e50;">πŸ’‘ Key Insights:</h5>
<ul>
"""
for insight in key_insights[:5]:
html += f"<li>{insight}</li>"
html += """
</ul>
<h5 style="color: #2c3e50;">🎯 Recommendations:</h5>
<ul>
"""
for rec in recommendations[:5]:
html += f"<li>{rec}</li>"
html += """
</ul>
</div>
<p style="color: #27ae60; margin-top: 15px;"><strong>βœ… Final results compiled!</strong></p>
"""
return html
def generate_report(self):
"""Generate a downloadable HTML report with all results and visualizations"""
if not self.pipeline_results:
return self._create_error_html("No pipeline results available to generate report.")
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Data Science Pipeline Report</title>
<style>
{self.custom_css}
body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 40px; background: #f4f7fa; }}
h1, h2, h3, h4, h5 {{ color: #2c3e50; }}
img {{ max-width: 100%; height: auto; }}
</style>
</head>
<body>
{self._create_progress_header()}
{self._create_all_steps_html(
self.pipeline_results,
self.pipeline_results.get('summary', {}),
self.pipeline_results.get('learning_type', 'Unknown'),
self.pipeline_results.get('target_column', None),
self.pipeline_results.get('domain_insights', {}).get('detected_domain', 'general'),
self.pipeline_results.get('enable_deep_learning', False),
self.pipeline_results.get('enable_automl', False)
)}
</body>
</html>
"""
report_path = f"pipeline_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html)
return report_path
def launch(self):
"""Launch the Gradio interface for the pipeline"""
with gr.Blocks(theme=gr.themes.Default(), css=self.custom_css) as demo:
gr.Markdown("""
# πŸ”¬ Data Scientist Agent
Upload your dataset and configure the pipeline settings to perform automated data analysis and modeling.
""")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload Dataset (CSV/JSON)")
learning_type = gr.Radio(
choices=["Supervised", "Unsupervised"],
label="Learning Type",
value="Supervised"
)
target_column = gr.Dropdown(
choices=[],
label="Target Column (for Supervised Learning)",
visible=True
)
domain = gr.Textbox(
label="Domain (e.g., Finance, Healthcare)",
placeholder="Enter domain or leave blank for general analysis"
)
enable_deep_learning = gr.Checkbox(
label="Enable Deep Learning Models",
value=False
)
enable_automl = gr.Checkbox(
label="Enable AutoML",
value=False
)
run_button = gr.Button("Run Pipeline", variant="primary")
with gr.Column(scale=2):
file_info = gr.HTML(label="File Information")
data_preview = gr.HTML(label="Data Preview")
pipeline_output = gr.HTML(label="Pipeline Results")
download_button = gr.File(
label="Download Report",
visible=True
)
# Event handlers
file_input.change(
fn=self.process_file_upload,
inputs=[file_input, learning_type],
outputs=[file_info, gr.State(), target_column, target_column, data_preview]
)
learning_type.change(
fn=self.update_target_column_visibility,
inputs=[learning_type, gr.State()],
outputs=[target_column]
)
run_button.click(
fn=self.run_comprehensive_pipeline,
inputs=[file_input, learning_type, target_column, domain, enable_deep_learning, enable_automl],
outputs=[pipeline_output, download_button]
)
download_button.upload(
fn=self.generate_report,
inputs=[],
outputs=[download_button]
)
return demo # Return the demo object for Hugging Face Spaces
# Example usage
if __name__ == "__main__":
pipeline_ui = DataSciencePipelineUI()
demo = pipeline_ui.launch()
demo.launch(share=True) # Launch the app for Hugging Face Spaces