| """ |
| Insights generation module for the Business Intelligence Dashboard. |
| Contains functions for automated insight generation and pattern detection. |
| Works with ANY dataset - no hardcoded column names. |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from typing import Dict, List, Any, Optional, Tuple |
| from utils import format_number, get_numeric_columns, get_categorical_columns, get_datetime_columns |
|
|
|
|
| def generate_top_performers( |
| df: pd.DataFrame, |
| group_column: str, |
| value_column: str, |
| n: int = 10, |
| ascending: bool = False |
| ) -> Tuple[pd.DataFrame, str]: |
| """ |
| Identify top or bottom performers in the data. |
| |
| Args: |
| df: pandas DataFrame |
| group_column: Column to group by |
| value_column: Column to aggregate |
| n: Number of top/bottom items |
| ascending: True for bottom, False for top |
| |
| Returns: |
| Tuple of (results DataFrame, insight text) |
| """ |
| if df is None or df.empty: |
| return pd.DataFrame(), "No data available for analysis." |
| |
| if group_column not in df.columns or value_column not in df.columns: |
| return pd.DataFrame(), f"Columns '{group_column}' or '{value_column}' not found." |
| |
| try: |
| |
| agg_df = df.groupby(group_column)[value_column].sum().reset_index() |
| agg_df = agg_df.sort_values(value_column, ascending=ascending).head(n) |
| agg_df.columns = [group_column, f'Total {value_column}'] |
| |
| |
| position = "Bottom" if ascending else "Top" |
| top_item = agg_df.iloc[0] |
| |
| insight = f"### {position} {n} {group_column} by {value_column}\n\n" |
| insight += f"**{position} Performer:** {top_item[group_column]}\n" |
| insight += f"- Total {value_column}: {format_number(top_item[f'Total {value_column}'])}\n\n" |
| |
| |
| total = df[value_column].sum() |
| if total > 0: |
| top_pct = (agg_df[f'Total {value_column}'].sum() / total) * 100 |
| insight += f"π These {n} items account for **{top_pct:.1f}%** of total {value_column.lower()}.\n" |
| |
| return agg_df, insight |
| |
| except Exception as e: |
| return pd.DataFrame(), f"Error generating insights: {str(e)}" |
|
|
|
|
| def generate_data_overview(df: pd.DataFrame) -> str: |
| """ |
| Generate a comprehensive data overview for any dataset. |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Formatted string with insights |
| """ |
| if df is None or df.empty: |
| return "No data available for analysis." |
| |
| insights = ["## π Data Overview\n"] |
| |
| try: |
| |
| insights.append("### π Dataset Summary") |
| insights.append(f"- **Total Records:** {len(df):,}") |
| insights.append(f"- **Total Columns:** {len(df.columns)}") |
| insights.append(f"- **Memory Usage:** {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB") |
| |
| |
| numeric_cols = get_numeric_columns(df) |
| categorical_cols = get_categorical_columns(df) |
| datetime_cols = get_datetime_columns(df) |
| |
| insights.append(f"\n### π Column Types") |
| insights.append(f"- **Numeric Columns:** {len(numeric_cols)}") |
| insights.append(f"- **Categorical Columns:** {len(categorical_cols)}") |
| insights.append(f"- **DateTime Columns:** {len(datetime_cols)}") |
| |
| |
| total_missing = df.isnull().sum().sum() |
| total_cells = df.shape[0] * df.shape[1] |
| missing_pct = (total_missing / total_cells) * 100 if total_cells > 0 else 0 |
| |
| insights.append(f"\n### β οΈ Data Quality") |
| insights.append(f"- **Total Missing Values:** {total_missing:,} ({missing_pct:.2f}%)") |
| |
| |
| missing_by_col = df.isnull().sum() |
| if missing_by_col.max() > 0: |
| worst_col = missing_by_col.idxmax() |
| worst_missing = missing_by_col.max() |
| insights.append(f"- **Most Missing:** {worst_col} ({worst_missing:,} missing)") |
| |
| return "\n".join(insights) |
| |
| except Exception as e: |
| return f"Error generating overview: {str(e)}" |
|
|
|
|
| def generate_numeric_insights(df: pd.DataFrame) -> str: |
| """ |
| Generate insights for numeric columns. |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Formatted insights text |
| """ |
| if df is None or df.empty: |
| return "No data available for analysis." |
| |
| numeric_cols = get_numeric_columns(df) |
| |
| if not numeric_cols: |
| return "No numeric columns found for analysis." |
| |
| insights = ["## π’ Numeric Column Insights\n"] |
| |
| try: |
| for col in numeric_cols[:5]: |
| data = df[col].dropna() |
| if len(data) == 0: |
| continue |
| |
| insights.append(f"### π {col}") |
| insights.append(f"- **Mean:** {format_number(data.mean())}") |
| insights.append(f"- **Median:** {format_number(data.median())}") |
| insights.append(f"- **Std Dev:** {format_number(data.std())}") |
| insights.append(f"- **Range:** {format_number(data.min())} to {format_number(data.max())}") |
| |
| |
| skew = data.skew() |
| if skew > 1: |
| insights.append(f"- **Distribution:** Right-skewed (skew: {skew:.2f})") |
| elif skew < -1: |
| insights.append(f"- **Distribution:** Left-skewed (skew: {skew:.2f})") |
| else: |
| insights.append(f"- **Distribution:** Approximately normal (skew: {skew:.2f})") |
| |
| insights.append("") |
| |
| return "\n".join(insights) |
| |
| except Exception as e: |
| return f"Error generating numeric insights: {str(e)}" |
|
|
|
|
| def generate_categorical_insights(df: pd.DataFrame) -> str: |
| """ |
| Generate insights for categorical columns. |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Formatted insights text |
| """ |
| if df is None or df.empty: |
| return "No data available for analysis." |
| |
| categorical_cols = get_categorical_columns(df) |
| |
| if not categorical_cols: |
| return "No categorical columns found for analysis." |
| |
| insights = ["## π·οΈ Categorical Column Insights\n"] |
| |
| try: |
| for col in categorical_cols[:5]: |
| data = df[col].dropna() |
| if len(data) == 0: |
| continue |
| |
| value_counts = data.value_counts() |
| unique_count = len(value_counts) |
| |
| insights.append(f"### π {col}") |
| insights.append(f"- **Unique Values:** {unique_count:,}") |
| |
| |
| insights.append("- **Top Values:**") |
| for i, (val, count) in enumerate(value_counts.head(3).items()): |
| pct = (count / len(data)) * 100 |
| insights.append(f" {i+1}. {str(val)[:30]} - {count:,} ({pct:.1f}%)") |
| |
| |
| top1_pct = (value_counts.iloc[0] / len(data)) * 100 |
| if top1_pct > 50: |
| insights.append(f"- β οΈ **Highly concentrated:** Top value has {top1_pct:.1f}% of data") |
| |
| insights.append("") |
| |
| return "\n".join(insights) |
| |
| except Exception as e: |
| return f"Error generating categorical insights: {str(e)}" |
|
|
|
|
| def generate_time_insights(df: pd.DataFrame) -> str: |
| """ |
| Generate time-based insights if datetime columns exist. |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Formatted insights text |
| """ |
| if df is None or df.empty: |
| return "No data available for analysis." |
| |
| datetime_cols = get_datetime_columns(df) |
| |
| if not datetime_cols: |
| return "No datetime columns found for time analysis." |
| |
| insights = ["## π
Time-Based Insights\n"] |
| |
| try: |
| for col in datetime_cols[:2]: |
| data = df[col].dropna() |
| if len(data) == 0: |
| continue |
| |
| insights.append(f"### β° {col}") |
| insights.append(f"- **Date Range:** {data.min().strftime('%Y-%m-%d')} to {data.max().strftime('%Y-%m-%d')}") |
| insights.append(f"- **Time Span:** {(data.max() - data.min()).days} days") |
| |
| |
| records_per_day = len(data) / max((data.max() - data.min()).days, 1) |
| insights.append(f"- **Avg Records/Day:** {records_per_day:.1f}") |
| |
| insights.append("") |
| |
| return "\n".join(insights) |
| |
| except Exception as e: |
| return f"Error generating time insights: {str(e)}" |
|
|
|
|
| def detect_anomalies(df: pd.DataFrame, column: str, threshold: float = 2.0) -> Tuple[pd.DataFrame, str]: |
| """ |
| Detect potential anomalies in numerical data using IQR method. |
| |
| Args: |
| df: pandas DataFrame |
| column: Column to analyze |
| threshold: IQR multiplier for outlier detection |
| |
| Returns: |
| Tuple of (anomalies DataFrame, insight text) |
| """ |
| if df is None or df.empty or column not in df.columns: |
| return pd.DataFrame(), "No data available for analysis." |
| |
| try: |
| data = df[column].dropna() |
| |
| Q1 = data.quantile(0.25) |
| Q3 = data.quantile(0.75) |
| IQR = Q3 - Q1 |
| |
| lower_bound = Q1 - threshold * IQR |
| upper_bound = Q3 + threshold * IQR |
| |
| |
| anomalies = df[(df[column] < lower_bound) | (df[column] > upper_bound)] |
| |
| insight = f"### π Anomaly Detection: {column}\n\n" |
| insight += f"- **Analysis Method:** IQR (threshold: {threshold}x)\n" |
| insight += f"- **Lower Bound:** {format_number(lower_bound)}\n" |
| insight += f"- **Upper Bound:** {format_number(upper_bound)}\n" |
| insight += f"- **Anomalies Found:** {len(anomalies):,} ({(len(anomalies)/len(df)*100):.2f}% of data)\n" |
| |
| if len(anomalies) > 0: |
| insight += f"\n**Note:** Anomalies may indicate data entry errors or genuine outliers worthy of investigation." |
| |
| return anomalies.head(20), insight |
| |
| except Exception as e: |
| return pd.DataFrame(), f"Error detecting anomalies: {str(e)}" |
|
|
|
|
| def generate_correlation_insights(df: pd.DataFrame) -> str: |
| """ |
| Generate insights about correlations between numeric columns. |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Formatted correlation insights |
| """ |
| if df is None or df.empty: |
| return "No data available for analysis." |
| |
| numeric_cols = get_numeric_columns(df) |
| |
| if len(numeric_cols) < 2: |
| return "Need at least 2 numeric columns for correlation analysis." |
| |
| insights = ["## π Correlation Insights\n"] |
| |
| try: |
| corr_matrix = df[numeric_cols].corr() |
| |
| |
| strong_correlations = [] |
| for i, col1 in enumerate(numeric_cols): |
| for j, col2 in enumerate(numeric_cols): |
| if i < j: |
| corr_val = corr_matrix.loc[col1, col2] |
| if abs(corr_val) > 0.5: |
| strong_correlations.append((col1, col2, corr_val)) |
| |
| |
| strong_correlations.sort(key=lambda x: abs(x[2]), reverse=True) |
| |
| if strong_correlations: |
| insights.append("### πͺ Strong Correlations Found") |
| for col1, col2, corr in strong_correlations[:5]: |
| direction = "Positive" if corr > 0 else "Negative" |
| insights.append(f"- **{col1}** β **{col2}**: {corr:.3f} ({direction})") |
| else: |
| insights.append("βΉοΈ No strong correlations (|r| > 0.5) found between numeric columns.") |
| |
| return "\n".join(insights) |
| |
| except Exception as e: |
| return f"Error analyzing correlations: {str(e)}" |
|
|
|
|
| def generate_all_insights(df: pd.DataFrame) -> str: |
| """ |
| Generate comprehensive insights report for any dataset. |
| |
| Args: |
| df: pandas DataFrame |
| |
| Returns: |
| Complete insights report as formatted string |
| """ |
| if df is None or df.empty: |
| return "No data available for analysis." |
| |
| all_insights = [] |
| |
| |
| all_insights.append(generate_data_overview(df)) |
| all_insights.append("\n---\n") |
| all_insights.append(generate_numeric_insights(df)) |
| all_insights.append("\n---\n") |
| all_insights.append(generate_categorical_insights(df)) |
| all_insights.append("\n---\n") |
| all_insights.append(generate_time_insights(df)) |
| all_insights.append("\n---\n") |
| all_insights.append(generate_correlation_insights(df)) |
| |
| return "\n".join(all_insights) |