data-analysis-platform / analyzer.py
entropy25's picture
Update analyzer.py
c50f214 verified
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from typing import Dict, List, Any, Optional
import os
from dotenv import load_dotenv
from data_handler import *
from io import BytesIO
# Load environment variables
load_dotenv()
# Optional AI Integration
try:
import openai
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
class AIAssistant:
"""AI-powered analysis assistant"""
def __init__(self):
self.openai_key = os.getenv('OPENAI_API_KEY')
self.gemini_key = os.getenv('GOOGLE_API_KEY')
if self.gemini_key and GEMINI_AVAILABLE:
genai.configure(api_key=self.gemini_key)
self.gemini_model = genai.GenerativeModel('gemini-1.5-flash')
def get_available_models(self) -> List[str]:
"""Get list of available AI models"""
models = []
if self.openai_key and OPENAI_AVAILABLE:
models.append("OpenAI GPT")
if self.gemini_key and GEMINI_AVAILABLE:
models.append("Google Gemini")
return models
def analyze_insights(self, df: pd.DataFrame, insights: List[Dict], model: str = "Google Gemini") -> str:
"""Get AI analysis of insights"""
# Prepare data summary
summary = f"""
Dataset Summary:
- Shape: {df.shape}
- Columns: {list(df.columns)}
- Data types: {df.dtypes.value_counts().to_dict()}
Key Insights Found:
"""
for insight in insights:
summary += f"\n- {insight['insight']}"
prompt = f"""
As a senior data scientist, analyze this dataset and provide:
1. Business implications of the findings
2. Potential opportunities or risks
3. Recommendations for decision-making
4. Suggestions for further analysis
{summary}
Provide actionable insights in a professional format.
"""
try:
if model == "Google Gemini" and hasattr(self, 'gemini_model'):
response = self.gemini_model.generate_content(prompt)
return response.text
elif model == "OpenAI GPT" and self.openai_key:
client = openai.OpenAI(api_key=self.openai_key)
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
else:
return "AI analysis not available. Please configure API keys."
except Exception as e:
return f"AI Analysis Error: {str(e)}"
class DataAnalysisWorkflow:
"""Optimized data analysis workflow with caching and pagination"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.stats = calculate_basic_stats(df)
self.column_types = get_column_types(df)
self.insights = []
self.page_size = 1000 # For pagination
def add_insight(self, insight: str, stage: int):
"""Add insight to analysis report"""
self.insights.append({
'stage': stage,
'insight': insight,
'timestamp': pd.Timestamp.now()
})
def get_paginated_data(self, page: int = 0) -> pd.DataFrame:
"""Get paginated data for display"""
start_idx = page * self.page_size
end_idx = start_idx + self.page_size
return self.df.iloc[start_idx:end_idx]
def stage_1_overview(self):
"""Stage 1: Data Overview with caching"""
st.subheader("📊 Data Overview")
# Data Quality Score
quality_metrics = calculate_data_quality_score(self.df)
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Rows", f"{self.stats['shape'][0]:,}")
with col2:
st.metric("Columns", f"{self.stats['shape'][1]:,}")
with col3:
st.metric("Quality Score", f"{quality_metrics['score']:.1f}/100")
with col4:
st.metric("Grade", quality_metrics['grade'])
if quality_metrics['issues']:
st.warning("Quality Issues Found:")
for issue in quality_metrics['issues']:
st.write(f"• {issue}")
# Memory Usage and Optimization
st.subheader("Memory Analysis")
memory_opt = calculate_memory_optimization(self.df)
col1, col2 = st.columns(2)
with col1:
st.metric("Current Memory", f"{memory_opt['current_memory_mb']:.1f} MB")
with col2:
if memory_opt['potential_savings_mb'] > 0:
st.metric("Potential Savings",
f"{memory_opt['potential_savings_mb']:.1f} MB",
f"{memory_opt['potential_savings_pct']:.1f}%")
if st.button("Show Optimization Details"):
st.dataframe(pd.DataFrame(memory_opt['suggestions']))
# Column Cardinality Analysis
st.subheader("Column Cardinality Analysis")
cardinality_df = calculate_column_cardinality(self.df)
# Filter options
col_types = cardinality_df['Type'].unique()
selected_types = st.multiselect("Filter by Column Type",
col_types,
default=col_types)
filtered_df = cardinality_df[cardinality_df['Type'].isin(selected_types)]
st.dataframe(filtered_df, use_container_width=True)
# Highlight important findings
id_cols = filtered_df[filtered_df['Type'] == 'Unique Identifier']['Column'].tolist()
if id_cols:
st.info(f"📌 Potential ID columns found: {', '.join(id_cols)}")
const_cols = filtered_df[filtered_df['Type'] == 'Constant']['Column'].tolist()
if const_cols:
st.warning(f"⚠️ Constant columns found: {', '.join(const_cols)}")
# Data types visualization
if self.stats['dtypes']:
st.subheader("Data Types Distribution")
fig = px.pie(values=list(self.stats['dtypes'].values()),
names=list(self.stats['dtypes'].keys()),
title="Data Types")
st.plotly_chart(fig, use_container_width=True)
# Sample data with pagination
st.subheader("Sample Data")
total_pages = (len(self.df) - 1) // self.page_size + 1
if total_pages > 1:
page = st.slider("Page", 0, total_pages - 1, 0)
sample_data = self.get_paginated_data(page)
st.write(f"Showing rows {page * self.page_size + 1} to {min((page + 1) * self.page_size, len(self.df))}")
else:
sample_data = self.df.head(10)
st.dataframe(sample_data, use_container_width=True)
# Missing values analysis
missing_df = calculate_missing_data(self.df)
if not missing_df.empty:
st.subheader("Missing Values Analysis")
st.dataframe(missing_df, use_container_width=True)
worst_column = missing_df.iloc[0]['Column']
worst_percentage = missing_df.iloc[0]['Missing %']
self.add_insight(f"Column '{worst_column}' has highest missing data: {worst_percentage:.1f}%", 1)
else:
st.success("✅ No missing values found!")
self.add_insight("Dataset has no missing values - excellent data quality", 1)
# Add insights about data quality and cardinality
if quality_metrics['score'] < 80:
self.add_insight(f"Data quality needs improvement (Score: {quality_metrics['score']:.1f}/100)", 1)
if memory_opt['potential_savings_pct'] > 20:
self.add_insight(f"Potential memory optimization of {memory_opt['potential_savings_pct']:.1f}% identified", 1)
if id_cols:
self.add_insight(f"Found {len(id_cols)} potential ID columns", 1)
def stage_2_exploration(self):
"""Stage 2: Exploratory Data Analysis with caching"""
st.subheader("🔍 Exploratory Data Analysis")
numeric_cols = self.column_types['numeric']
categorical_cols = self.column_types['categorical']
# Numeric analysis
if numeric_cols:
st.subheader("Numeric Variables")
selected_numeric = st.selectbox("Select numeric column:", numeric_cols)
col1, col2 = st.columns(2)
with col1:
fig = px.histogram(self.df, x=selected_numeric,
title=f"Distribution of {selected_numeric}")
st.plotly_chart(fig, use_container_width=True)
with col2:
fig = px.box(self.df, y=selected_numeric,
title=f"Box Plot of {selected_numeric}")
st.plotly_chart(fig, use_container_width=True)
# Statistical summary
st.subheader("Statistical Summary")
summary_stats = self.df[numeric_cols].describe()
st.dataframe(summary_stats, use_container_width=True)
# Correlation analysis
if len(numeric_cols) > 1:
st.subheader("Correlation Analysis")
corr_matrix = calculate_correlation_matrix(self.df)
if not corr_matrix.empty:
fig = px.imshow(corr_matrix, text_auto=True, aspect="auto",
title="Correlation Matrix")
st.plotly_chart(fig, use_container_width=True)
# Find highest correlation
corr_values = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_values.append(abs(corr_matrix.iloc[i, j]))
if corr_values:
max_corr = max(corr_values)
self.add_insight(f"Maximum correlation coefficient: {max_corr:.3f}", 2)
# Categorical analysis
if categorical_cols:
st.subheader("Categorical Variables")
selected_categorical = st.selectbox("Select categorical column:", categorical_cols)
value_counts = get_value_counts(self.df, selected_categorical)
fig = px.bar(x=value_counts.index, y=value_counts.values,
title=f"Top 10 {selected_categorical} Values")
st.plotly_chart(fig, use_container_width=True)
total_categories = self.df[selected_categorical].nunique()
self.add_insight(f"Column '{selected_categorical}' has {total_categories} unique categories", 2)
def stage_3_cleaning(self):
"""Stage 3: Data Quality Assessment"""
st.subheader("🧹 Data Quality Assessment")
cleaning_actions = []
cleaning_history = []
# Missing values handling
if self.stats['missing_values'] > 0:
st.subheader("Missing Values Treatment")
missing_df = calculate_missing_data(self.df)
st.dataframe(missing_df, use_container_width=True)
col1, col2 = st.columns(2)
with col1:
selected_col = st.selectbox("Select column to handle missing values:",
missing_df['Column'].tolist())
with col2:
fill_method = st.selectbox("Choose fill method:",
["Drop rows", "Mean", "Median", "Mode", "Custom value"])
if st.button("Apply Missing Value Treatment"):
try:
if fill_method == "Drop rows":
self.df = self.df.dropna(subset=[selected_col])
cleaning_history.append(f"Dropped rows with missing values in {selected_col}")
else:
if fill_method == "Mean":
fill_value = self.df[selected_col].mean()
elif fill_method == "Median":
fill_value = self.df[selected_col].median()
elif fill_method == "Mode":
fill_value = self.df[selected_col].mode()[0]
else: # Custom value
fill_value = st.number_input("Enter custom value:", value=0.0)
self.df[selected_col] = self.df[selected_col].fillna(fill_value)
cleaning_history.append(f"Filled missing values in {selected_col} with {fill_method}")
st.success("✅ Missing values handled successfully!")
except Exception as e:
st.error(f"Error handling missing values: {str(e)}")
# Duplicates handling
if self.stats['duplicates'] > 0:
st.subheader("Duplicate Rows")
st.warning(f"Found {self.stats['duplicates']} duplicate rows")
if st.button("Remove Duplicate Rows"):
original_len = len(self.df)
self.df = self.df.drop_duplicates()
removed = original_len - len(self.df)
cleaning_history.append(f"Removed {removed} duplicate rows")
st.success(f"✅ Removed {removed} duplicate rows")
else:
st.success("✅ No duplicate rows found")
# Mixed type detection and handling
mixed_types = detect_mixed_types(self.df)
if mixed_types:
st.subheader("Mixed Data Types")
mixed_df = pd.DataFrame(mixed_types)
st.dataframe(mixed_df, use_container_width=True)
selected_col = st.selectbox("Select column to fix data type:",
[item['column'] for item in mixed_types])
fix_method = st.selectbox("Choose fix method:",
["Convert to numeric", "Convert to string"])
if st.button("Fix Data Type"):
try:
if fix_method == "Convert to numeric":
self.df[selected_col] = pd.to_numeric(self.df[selected_col], errors='coerce')
else:
self.df[selected_col] = self.df[selected_col].astype(str)
cleaning_history.append(f"Fixed data type for {selected_col} to {fix_method}")
st.success("✅ Data type fixed successfully!")
except Exception as e:
st.error(f"Error fixing data type: {str(e)}")
# Outlier detection and handling
numeric_cols = self.column_types['numeric']
if numeric_cols:
st.subheader("Outlier Detection")
selected_col = st.selectbox("Select column for outlier detection:", numeric_cols)
outliers = calculate_outliers(self.df, selected_col)
outlier_count = len(outliers)
if outlier_count > 0:
st.warning(f"Found {outlier_count} potential outliers in '{selected_col}'")
st.dataframe(outliers[[selected_col]].head(100), use_container_width=True)
treatment_method = st.selectbox("Choose outlier treatment method:",
["None", "Remove", "Cap at percentiles"])
if treatment_method != "None" and st.button("Apply Outlier Treatment"):
try:
if treatment_method == "Remove":
self.df = self.df[~self.df.index.isin(outliers.index)]
cleaning_history.append(f"Removed {outlier_count} outliers from {selected_col}")
else: # Cap at percentiles
Q1 = self.df[selected_col].quantile(0.25)
Q3 = self.df[selected_col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
self.df[selected_col] = self.df[selected_col].clip(lower_bound, upper_bound)
cleaning_history.append(f"Capped outliers in {selected_col} at percentiles")
st.success("✅ Outliers handled successfully!")
except Exception as e:
st.error(f"Error handling outliers: {str(e)}")
else:
st.success(f"✅ No outliers detected in '{selected_col}'")
# Cleaning History
if cleaning_history:
st.subheader("Cleaning Operations History")
for i, operation in enumerate(cleaning_history, 1):
st.write(f"{i}. {operation}")
self.add_insight(f"Performed {len(cleaning_history)} data cleaning operations", 3)
# Summary
if cleaning_actions:
st.subheader("Remaining Action Items")
for i, action in enumerate(cleaning_actions, 1):
st.write(f"{i}. {action}")
self.add_insight(f"Identified {len(cleaning_actions)} data quality issues", 3)
else:
st.success("✅ Data quality is excellent!")
self.add_insight("No major data quality issues found", 3)
def stage_4_analysis(self):
"""Stage 4: Advanced Analysis"""
st.subheader("🔬 Advanced Analysis")
numeric_cols = self.column_types['numeric']
categorical_cols = self.column_types['categorical']
# Relationship analysis
if len(numeric_cols) >= 2:
st.subheader("Variable Relationships")
col1, col2 = st.columns(2)
with col1:
x_var = st.selectbox("X Variable:", numeric_cols)
with col2:
y_var = st.selectbox("Y Variable:",
[col for col in numeric_cols if col != x_var])
# Sample data for performance if dataset is large
sample_size = min(5000, len(self.df))
sample_df = self.df.sample(n=sample_size) if len(self.df) > sample_size else self.df
fig = px.scatter(sample_df, x=x_var, y=y_var,
title=f"Relationship: {x_var} vs {y_var}")
st.plotly_chart(fig, use_container_width=True)
correlation = self.df[x_var].corr(self.df[y_var])
st.metric("Correlation", f"{correlation:.3f}")
if abs(correlation) > 0.7:
strength = "Strong"
elif abs(correlation) > 0.3:
strength = "Moderate"
else:
strength = "Weak"
direction = "positive" if correlation > 0 else "negative"
st.write(f"**Result:** {strength} {direction} correlation")
self.add_insight(f"{strength} correlation ({correlation:.3f}) between {x_var} and {y_var}", 4)
# Group analysis
if categorical_cols and numeric_cols:
st.subheader("Group Analysis")
col1, col2 = st.columns(2)
with col1:
group_var = st.selectbox("Group by:", categorical_cols)
with col2:
metric_var = st.selectbox("Analyze:", numeric_cols)
group_stats = calculate_group_stats(self.df, group_var, metric_var)
st.dataframe(group_stats, use_container_width=True)
# Sample for visualization if too many groups
unique_groups = self.df[group_var].nunique()
if unique_groups <= 20:
fig = px.box(self.df, x=group_var, y=metric_var,
title=f"{metric_var} by {group_var}")
st.plotly_chart(fig, use_container_width=True)
else:
st.info(f"Too many groups ({unique_groups}) for visualization. Showing statistics only.")
best_group = group_stats['mean'].idxmax()
best_value = group_stats.loc[best_group, 'mean']
self.add_insight(f"'{best_group}' has highest average {metric_var}: {best_value:.2f}", 4)
def stage_5_summary(self):
"""Stage 5: Summary and Export"""
st.subheader("📈 Analysis Summary")
# Key metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Insights", len(self.insights))
with col2:
quality = "High" if self.stats['missing_values'] == 0 else "Medium"
st.metric("Data Quality", quality)
with col3:
st.metric("Analysis Complete", "✅")
# Insights summary
st.subheader("Key Insights")
for i, insight in enumerate(self.insights, 1):
st.write(f"{i}. **Stage {insight['stage']}:** {insight['insight']}")
# Export options
st.subheader("Export Results")
export_format = st.selectbox("Choose export format:",
["Text Report", "Markdown Report", "Python Code", "Cleaned Data"])
if export_format == "Text Report":
report = self.generate_text_report()
st.download_button(
label="Download Text Report",
data=report,
file_name="analysis_report.txt",
mime="text/plain"
)
elif export_format == "Markdown Report":
report = self.generate_markdown_report()
st.download_button(
label="Download Markdown Report",
data=report,
file_name="analysis_report.md",
mime="text/markdown"
)
elif export_format == "Python Code":
code = self.generate_python_code()
st.code(code, language="python")
st.download_button(
label="Download Python Script",
data=code,
file_name="analysis_script.py",
mime="text/plain"
)
else: # Cleaned Data
# Offer different export formats
data_format = st.selectbox("Choose data format:",
["CSV", "Excel", "Parquet"])
if st.button("Export Data"):
try:
if data_format == "CSV":
csv = self.df.to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv,
file_name="cleaned_data.csv",
mime="text/csv"
)
elif data_format == "Excel":
excel_buffer = BytesIO()
self.df.to_excel(excel_buffer, index=False)
excel_data = excel_buffer.getvalue()
st.download_button(
label="Download Excel",
data=excel_data,
file_name="cleaned_data.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
else: # Parquet
parquet_buffer = BytesIO()
self.df.to_parquet(parquet_buffer, index=False)
parquet_data = parquet_buffer.getvalue()
st.download_button(
label="Download Parquet",
data=parquet_data,
file_name="cleaned_data.parquet",
mime="application/octet-stream"
)
except Exception as e:
st.error(f"Error exporting data: {str(e)}")
def generate_text_report(self) -> str:
"""Generate text analysis report"""
report = f"""DATA ANALYSIS REPORT
==================
Dataset Overview:
- Rows: {self.stats['shape'][0]:,}
- Columns: {self.stats['shape'][1]:,}
- Missing Values: {self.stats['missing_values']:,}
- Memory Usage: {self.stats['memory_usage']:.1f} MB
Key Insights:
"""
for insight in self.insights:
report += f"\n- Stage {insight['stage']}: {insight['insight']}"
report += f"\n\nGenerated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}"
return report
def generate_markdown_report(self) -> str:
"""Generate markdown analysis report"""
report = f"""# Data Analysis Report
## Dataset Overview
* **Rows:** {self.stats['shape'][0]:,}
* **Columns:** {self.stats['shape'][1]:,}
* **Missing Values:** {self.stats['missing_values']:,}
* **Memory Usage:** {self.stats['memory_usage']:.1f} MB
## Data Types
```
{pd.DataFrame(self.stats['dtypes'].items(), columns=['Type', 'Count']).to_markdown()}
```
## Key Insights
"""
# Group insights by stage
for stage in range(1, 6):
stage_insights = [i for i in self.insights if i['stage'] == stage]
if stage_insights:
report += f"\n### Stage {stage}\n"
for insight in stage_insights:
report += f"* {insight['insight']}\n"
report += f"\n\n*Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}*"
return report
def generate_python_code(self) -> str:
"""Generate reproducible Python code"""
code = """import pandas as pd
import numpy as np
import plotly.express as px
from typing import Dict, List, Any
# Load and prepare data
df = pd.read_csv('your_data.csv') # Update with your data source
# Basic statistics
def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]:
return {
'shape': df.shape,
'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2),
'missing_values': int(df.isnull().sum().sum()),
'dtypes': df.dtypes.value_counts().to_dict(),
'duplicates': int(df.duplicated().sum())
}
stats = calculate_basic_stats(df)
print("\\nBasic Statistics:")
print(f"- Shape: {stats['shape']}")
print(f"- Memory Usage: {stats['memory_usage']:.1f} MB")
print(f"- Missing Values: {stats['missing_values']}")
print(f"- Duplicates: {stats['duplicates']}")
"""
# Add data cleaning operations if any were performed
if hasattr(self, 'cleaning_history'):
code += "\n# Data Cleaning\n"
for operation in self.cleaning_history:
if "missing values" in operation.lower():
code += "# Handle missing values\n"
code += "df = df.fillna(method='ffill') # Update with your chosen method\n"
elif "duplicate" in operation.lower():
code += "# Remove duplicates\n"
code += "df = df.drop_duplicates()\n"
elif "outlier" in operation.lower():
code += """# Handle outliers
def remove_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
return df[~((df[column] < (Q1 - 1.5 * IQR)) | (df[column] > (Q3 + 1.5 * IQR)))]
# Apply to numeric columns as needed
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df = remove_outliers(df, col)
"""
# Add visualization code
code += """
# Visualizations
def plot_missing_values(df: pd.DataFrame):
missing = df.isnull().sum()
if missing.sum() > 0:
missing = missing[missing > 0]
fig = px.bar(x=missing.index, y=missing.values,
title='Missing Values by Column')
fig.show()
def plot_correlations(df: pd.DataFrame):
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
corr = df[numeric_cols].corr()
fig = px.imshow(corr, title='Correlation Matrix')
fig.show()
# Generate plots
plot_missing_values(df)
plot_correlations(df)
"""
return code