File size: 3,204 Bytes
a48d292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
"""Insight generation utilities for the BI dashboard."""

from __future__ import annotations

from typing import Dict, Iterable, Optional, Tuple

import numpy as np
import pandas as pd

from utils import ColumnTypes


def top_bottom_performers(df: pd.DataFrame, column: str, n: int = 5) -> Dict[str, pd.DataFrame]:
    """Return the top and bottom performers for a numeric column."""
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in dataset.")

    numeric_series = pd.to_numeric(df[column], errors="coerce").dropna()
    if numeric_series.empty:
        raise ValueError(f"Column '{column}' does not contain numeric data.")

    top = numeric_series.nlargest(n)
    bottom = numeric_series.nsmallest(n)
    return {
        "top": top.reset_index(),
        "bottom": bottom.reset_index(),
    }


def detect_trend(df: pd.DataFrame, date_column: str, value_column: str) -> str:
    """Analyze basic trend between the first and last data points."""
    if date_column not in df.columns or value_column not in df.columns:
        raise ValueError("Selected columns are not present in the dataset.")

    working = df[[date_column, value_column]].dropna()
    working[date_column] = pd.to_datetime(working[date_column], errors="coerce")
    working = working.dropna()

    if working.empty or working[date_column].nunique() < 2:
        return "Not enough data to evaluate a trend."

    working = working.sort_values(by=date_column)
    first_date = working[date_column].iloc[0]
    last_date = working[date_column].iloc[-1]
    first_value = working[value_column].iloc[0]
    last_value = working[value_column].iloc[-1]

    change = last_value - first_value
    pct_change = (change / first_value * 100) if first_value != 0 else np.nan

    if np.isnan(pct_change):
        direction = "changed"
    elif pct_change > 0:
        direction = "increased"
    elif pct_change < 0:
        direction = "decreased"
    else:
        direction = "remained stable"

    pct_text = f" ({pct_change:.2f}%)" if not np.isnan(pct_change) else ""
    return (
        f"Between {first_date.date()} and {last_date.date()}, "
        f"{value_column} {direction} by {change:.2f}{pct_text}."
    )


def detect_anomalies(df: pd.DataFrame, column: str, z_threshold: float = 3.0, limit: int = 5) -> pd.DataFrame:
    """Identify potential outliers using a simple z-score approach."""
    if column not in df.columns:
        raise ValueError(f"Column '{column}' not found in dataset.")

    series = pd.to_numeric(df[column], errors="coerce")
    z_scores = ((series - series.mean()) / series.std()).abs()
    anomalies = df.loc[z_scores > z_threshold, [column]].copy()
    anomalies["z_score"] = z_scores[z_scores > z_threshold]
    return anomalies.sort_values(by="z_score", ascending=False).head(limit)


def get_default_insight_columns(column_types: ColumnTypes) -> Dict[str, Optional[str]]:
    """Determine default columns to use when auto-generating insights."""
    numeric_col = column_types.numeric[0] if column_types.numeric else None
    date_col = column_types.datetime[0] if column_types.datetime else None
    return {"numeric": numeric_col, "datetime": date_col}