| """Insight generation utilities for the BI dashboard.""" | |
| from __future__ import annotations | |
| from typing import Dict, Iterable, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from utils import ColumnTypes | |
| def top_bottom_performers(df: pd.DataFrame, column: str, n: int = 5) -> Dict[str, pd.DataFrame]: | |
| """Return the top and bottom performers for a numeric column.""" | |
| if column not in df.columns: | |
| raise ValueError(f"Column '{column}' not found in dataset.") | |
| numeric_series = pd.to_numeric(df[column], errors="coerce").dropna() | |
| if numeric_series.empty: | |
| raise ValueError(f"Column '{column}' does not contain numeric data.") | |
| top = numeric_series.nlargest(n) | |
| bottom = numeric_series.nsmallest(n) | |
| return { | |
| "top": top.reset_index(), | |
| "bottom": bottom.reset_index(), | |
| } | |
| def detect_trend(df: pd.DataFrame, date_column: str, value_column: str) -> str: | |
| """Analyze basic trend between the first and last data points.""" | |
| if date_column not in df.columns or value_column not in df.columns: | |
| raise ValueError("Selected columns are not present in the dataset.") | |
| working = df[[date_column, value_column]].dropna() | |
| working[date_column] = pd.to_datetime(working[date_column], errors="coerce") | |
| working = working.dropna() | |
| if working.empty or working[date_column].nunique() < 2: | |
| return "Not enough data to evaluate a trend." | |
| working = working.sort_values(by=date_column) | |
| first_date = working[date_column].iloc[0] | |
| last_date = working[date_column].iloc[-1] | |
| first_value = working[value_column].iloc[0] | |
| last_value = working[value_column].iloc[-1] | |
| change = last_value - first_value | |
| pct_change = (change / first_value * 100) if first_value != 0 else np.nan | |
| if np.isnan(pct_change): | |
| direction = "changed" | |
| elif pct_change > 0: | |
| direction = "increased" | |
| elif pct_change < 0: | |
| direction = "decreased" | |
| else: | |
| direction = "remained stable" | |
| pct_text = f" ({pct_change:.2f}%)" if not np.isnan(pct_change) else "" | |
| return ( | |
| f"Between {first_date.date()} and {last_date.date()}, " | |
| f"{value_column} {direction} by {change:.2f}{pct_text}." | |
| ) | |
| def detect_anomalies(df: pd.DataFrame, column: str, z_threshold: float = 3.0, limit: int = 5) -> pd.DataFrame: | |
| """Identify potential outliers using a simple z-score approach.""" | |
| if column not in df.columns: | |
| raise ValueError(f"Column '{column}' not found in dataset.") | |
| series = pd.to_numeric(df[column], errors="coerce") | |
| z_scores = ((series - series.mean()) / series.std()).abs() | |
| anomalies = df.loc[z_scores > z_threshold, [column]].copy() | |
| anomalies["z_score"] = z_scores[z_scores > z_threshold] | |
| return anomalies.sort_values(by="z_score", ascending=False).head(limit) | |
| def get_default_insight_columns(column_types: ColumnTypes) -> Dict[str, Optional[str]]: | |
| """Determine default columns to use when auto-generating insights.""" | |
| numeric_col = column_types.numeric[0] if column_types.numeric else None | |
| date_col = column_types.datetime[0] if column_types.datetime else None | |
| return {"numeric": numeric_col, "datetime": date_col} | |