yogesh882's picture
Upload 6 files
df86d3a verified
"""
Insights generation module for the Business Intelligence Dashboard.
Contains functions for automated insight generation and pattern detection.
Works with ANY dataset - no hardcoded column names.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from utils import format_number, get_numeric_columns, get_categorical_columns, get_datetime_columns
def generate_top_performers(
df: pd.DataFrame,
group_column: str,
value_column: str,
n: int = 10,
ascending: bool = False
) -> Tuple[pd.DataFrame, str]:
"""
Identify top or bottom performers in the data.
Args:
df: pandas DataFrame
group_column: Column to group by
value_column: Column to aggregate
n: Number of top/bottom items
ascending: True for bottom, False for top
Returns:
Tuple of (results DataFrame, insight text)
"""
if df is None or df.empty:
return pd.DataFrame(), "No data available for analysis."
if group_column not in df.columns or value_column not in df.columns:
return pd.DataFrame(), f"Columns '{group_column}' or '{value_column}' not found."
try:
# Aggregate
agg_df = df.groupby(group_column)[value_column].sum().reset_index()
agg_df = agg_df.sort_values(value_column, ascending=ascending).head(n)
agg_df.columns = [group_column, f'Total {value_column}']
# Generate insight text
position = "Bottom" if ascending else "Top"
top_item = agg_df.iloc[0]
insight = f"### {position} {n} {group_column} by {value_column}\n\n"
insight += f"**{position} Performer:** {top_item[group_column]}\n"
insight += f"- Total {value_column}: {format_number(top_item[f'Total {value_column}'])}\n\n"
# Calculate percentage of total
total = df[value_column].sum()
if total > 0:
top_pct = (agg_df[f'Total {value_column}'].sum() / total) * 100
insight += f"πŸ“Š These {n} items account for **{top_pct:.1f}%** of total {value_column.lower()}.\n"
return agg_df, insight
except Exception as e:
return pd.DataFrame(), f"Error generating insights: {str(e)}"
def generate_data_overview(df: pd.DataFrame) -> str:
"""
Generate a comprehensive data overview for any dataset.
Args:
df: pandas DataFrame
Returns:
Formatted string with insights
"""
if df is None or df.empty:
return "No data available for analysis."
insights = ["## πŸ“Š Data Overview\n"]
try:
# Basic stats
insights.append("### πŸ“‹ Dataset Summary")
insights.append(f"- **Total Records:** {len(df):,}")
insights.append(f"- **Total Columns:** {len(df.columns)}")
insights.append(f"- **Memory Usage:** {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
# Column type breakdown
numeric_cols = get_numeric_columns(df)
categorical_cols = get_categorical_columns(df)
datetime_cols = get_datetime_columns(df)
insights.append(f"\n### πŸ“ Column Types")
insights.append(f"- **Numeric Columns:** {len(numeric_cols)}")
insights.append(f"- **Categorical Columns:** {len(categorical_cols)}")
insights.append(f"- **DateTime Columns:** {len(datetime_cols)}")
# Missing data summary
total_missing = df.isnull().sum().sum()
total_cells = df.shape[0] * df.shape[1]
missing_pct = (total_missing / total_cells) * 100 if total_cells > 0 else 0
insights.append(f"\n### ⚠️ Data Quality")
insights.append(f"- **Total Missing Values:** {total_missing:,} ({missing_pct:.2f}%)")
# Columns with most missing
missing_by_col = df.isnull().sum()
if missing_by_col.max() > 0:
worst_col = missing_by_col.idxmax()
worst_missing = missing_by_col.max()
insights.append(f"- **Most Missing:** {worst_col} ({worst_missing:,} missing)")
return "\n".join(insights)
except Exception as e:
return f"Error generating overview: {str(e)}"
def generate_numeric_insights(df: pd.DataFrame) -> str:
"""
Generate insights for numeric columns.
Args:
df: pandas DataFrame
Returns:
Formatted insights text
"""
if df is None or df.empty:
return "No data available for analysis."
numeric_cols = get_numeric_columns(df)
if not numeric_cols:
return "No numeric columns found for analysis."
insights = ["## πŸ”’ Numeric Column Insights\n"]
try:
for col in numeric_cols[:5]: # Limit to first 5 numeric columns
data = df[col].dropna()
if len(data) == 0:
continue
insights.append(f"### πŸ“Š {col}")
insights.append(f"- **Mean:** {format_number(data.mean())}")
insights.append(f"- **Median:** {format_number(data.median())}")
insights.append(f"- **Std Dev:** {format_number(data.std())}")
insights.append(f"- **Range:** {format_number(data.min())} to {format_number(data.max())}")
# Detect skewness
skew = data.skew()
if skew > 1:
insights.append(f"- **Distribution:** Right-skewed (skew: {skew:.2f})")
elif skew < -1:
insights.append(f"- **Distribution:** Left-skewed (skew: {skew:.2f})")
else:
insights.append(f"- **Distribution:** Approximately normal (skew: {skew:.2f})")
insights.append("")
return "\n".join(insights)
except Exception as e:
return f"Error generating numeric insights: {str(e)}"
def generate_categorical_insights(df: pd.DataFrame) -> str:
"""
Generate insights for categorical columns.
Args:
df: pandas DataFrame
Returns:
Formatted insights text
"""
if df is None or df.empty:
return "No data available for analysis."
categorical_cols = get_categorical_columns(df)
if not categorical_cols:
return "No categorical columns found for analysis."
insights = ["## 🏷️ Categorical Column Insights\n"]
try:
for col in categorical_cols[:5]: # Limit to first 5 categorical columns
data = df[col].dropna()
if len(data) == 0:
continue
value_counts = data.value_counts()
unique_count = len(value_counts)
insights.append(f"### πŸ“Š {col}")
insights.append(f"- **Unique Values:** {unique_count:,}")
# Top 3 values
insights.append("- **Top Values:**")
for i, (val, count) in enumerate(value_counts.head(3).items()):
pct = (count / len(data)) * 100
insights.append(f" {i+1}. {str(val)[:30]} - {count:,} ({pct:.1f}%)")
# Concentration
top1_pct = (value_counts.iloc[0] / len(data)) * 100
if top1_pct > 50:
insights.append(f"- ⚠️ **Highly concentrated:** Top value has {top1_pct:.1f}% of data")
insights.append("")
return "\n".join(insights)
except Exception as e:
return f"Error generating categorical insights: {str(e)}"
def generate_time_insights(df: pd.DataFrame) -> str:
"""
Generate time-based insights if datetime columns exist.
Args:
df: pandas DataFrame
Returns:
Formatted insights text
"""
if df is None or df.empty:
return "No data available for analysis."
datetime_cols = get_datetime_columns(df)
if not datetime_cols:
return "No datetime columns found for time analysis."
insights = ["## πŸ“… Time-Based Insights\n"]
try:
for col in datetime_cols[:2]: # Limit to first 2 datetime columns
data = df[col].dropna()
if len(data) == 0:
continue
insights.append(f"### ⏰ {col}")
insights.append(f"- **Date Range:** {data.min().strftime('%Y-%m-%d')} to {data.max().strftime('%Y-%m-%d')}")
insights.append(f"- **Time Span:** {(data.max() - data.min()).days} days")
# Records per period
records_per_day = len(data) / max((data.max() - data.min()).days, 1)
insights.append(f"- **Avg Records/Day:** {records_per_day:.1f}")
insights.append("")
return "\n".join(insights)
except Exception as e:
return f"Error generating time insights: {str(e)}"
def detect_anomalies(df: pd.DataFrame, column: str, threshold: float = 2.0) -> Tuple[pd.DataFrame, str]:
"""
Detect potential anomalies in numerical data using IQR method.
Args:
df: pandas DataFrame
column: Column to analyze
threshold: IQR multiplier for outlier detection
Returns:
Tuple of (anomalies DataFrame, insight text)
"""
if df is None or df.empty or column not in df.columns:
return pd.DataFrame(), "No data available for analysis."
try:
data = df[column].dropna()
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
# Identify anomalies
anomalies = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
insight = f"### πŸ” Anomaly Detection: {column}\n\n"
insight += f"- **Analysis Method:** IQR (threshold: {threshold}x)\n"
insight += f"- **Lower Bound:** {format_number(lower_bound)}\n"
insight += f"- **Upper Bound:** {format_number(upper_bound)}\n"
insight += f"- **Anomalies Found:** {len(anomalies):,} ({(len(anomalies)/len(df)*100):.2f}% of data)\n"
if len(anomalies) > 0:
insight += f"\n**Note:** Anomalies may indicate data entry errors or genuine outliers worthy of investigation."
return anomalies.head(20), insight
except Exception as e:
return pd.DataFrame(), f"Error detecting anomalies: {str(e)}"
def generate_correlation_insights(df: pd.DataFrame) -> str:
"""
Generate insights about correlations between numeric columns.
Args:
df: pandas DataFrame
Returns:
Formatted correlation insights
"""
if df is None or df.empty:
return "No data available for analysis."
numeric_cols = get_numeric_columns(df)
if len(numeric_cols) < 2:
return "Need at least 2 numeric columns for correlation analysis."
insights = ["## πŸ”— Correlation Insights\n"]
try:
corr_matrix = df[numeric_cols].corr()
# Find strongest correlations (excluding self-correlation)
strong_correlations = []
for i, col1 in enumerate(numeric_cols):
for j, col2 in enumerate(numeric_cols):
if i < j: # Upper triangle only
corr_val = corr_matrix.loc[col1, col2]
if abs(corr_val) > 0.5: # Strong correlation threshold
strong_correlations.append((col1, col2, corr_val))
# Sort by absolute correlation
strong_correlations.sort(key=lambda x: abs(x[2]), reverse=True)
if strong_correlations:
insights.append("### πŸ’ͺ Strong Correlations Found")
for col1, col2, corr in strong_correlations[:5]:
direction = "Positive" if corr > 0 else "Negative"
insights.append(f"- **{col1}** ↔ **{col2}**: {corr:.3f} ({direction})")
else:
insights.append("ℹ️ No strong correlations (|r| > 0.5) found between numeric columns.")
return "\n".join(insights)
except Exception as e:
return f"Error analyzing correlations: {str(e)}"
def generate_all_insights(df: pd.DataFrame) -> str:
"""
Generate comprehensive insights report for any dataset.
Args:
df: pandas DataFrame
Returns:
Complete insights report as formatted string
"""
if df is None or df.empty:
return "No data available for analysis."
all_insights = []
# Add each insight section
all_insights.append(generate_data_overview(df))
all_insights.append("\n---\n")
all_insights.append(generate_numeric_insights(df))
all_insights.append("\n---\n")
all_insights.append(generate_categorical_insights(df))
all_insights.append("\n---\n")
all_insights.append(generate_time_insights(df))
all_insights.append("\n---\n")
all_insights.append(generate_correlation_insights(df))
return "\n".join(all_insights)