File size: 5,090 Bytes
ce87c62
 
 
 
 
 
 
 
 
 
 
 
52d607e
ce87c62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43ef7d3
ce87c62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import math
from collections import defaultdict
from typing import Dict, Iterable
import pandas as pd

import pandas as pd
from itertools import combinations
import scipy


def compute_pct_improvement_over_baseline(
    df: pd.DataFrame,
    baseline_model: str = "xgboost_ensemble",
    metric: str = "Accuracy"
) -> pd.DataFrame:
    """
    Compute the percentage improvement of each model over a baseline model.

    For each dataset, computes: ((model_metric - baseline_metric) / baseline_metric) * 100

    Args:
        df: DataFrame with columns 'model', 'dataset_name', and the metric column
        baseline_model: The model to use as baseline (default: "xgboost_ensemble")
        metric: The metric to compute improvement on (default: "Accuracy")

    Returns:
        DataFrame with a new 'Pct_Improvement_over_XGBoost' column
    """
    if df.empty:
        return df

    df = df.copy()

    # Get baseline scores per dataset
    baseline_scores = (
        df[df["model"] == baseline_model]
        .set_index("dataset_name")[metric]
        .to_dict()
    )

    # Compute percentage improvement for each row
    def calc_pct_improvement(row):
        baseline = baseline_scores.get(row["dataset_name"])
        if baseline is None or baseline == 0:
            return None
        return ((row[metric] - baseline) / baseline) * 100

    df["%↗ over XGBoost"] = df.apply(calc_pct_improvement, axis=1)

    return df


def scores_to_battles(df: pd.DataFrame, metric: str = "Accuracy") -> pd.DataFrame:
    battles = []
    
    for dataset, group in df.groupby("dataset_name"):
        # Sort classifiers in descending order of metric
        group_sorted = group.sort_values(by=metric, ascending=False)
        for (i1, row1), (i2, row2) in combinations(group_sorted.iterrows(), 2):
            if row1[metric] == row2[metric]:
                winner = "tie"
            elif row1[metric] > row2[metric]:
                winner = "model_a"
            else:
                winner = "model_b"
            battles.append({
                "model_a": row1["model"],
                "model_b": row2["model"],
                "winner": winner,
                "dataset": dataset,
            })
    
    return battles


def _sigmoid(x: float, eps: float = 1e-7) -> float:
    """Stable sigmoid with clipped output."""
    val = 0.5 * (1 + math.tanh(0.5 * x))
    return max(eps, min(1.0 - eps, val))


def compute_bt_elo(
    battles: Iterable[Dict[str, str]],
    SCALE: float = 400.0,
    BASE: float = 10.0,
    INIT_RATING: float = 1000.0,
    lr: float = 0.05,
    n_iter: int = 1000,
    use_scipy: bool = True,
) -> Dict[str, float]:
    """Fit a Bradley--Terry model.

    ``BASE`` controls the link function scale.  If ``BASE=10`` (the default),
    the win probability follows the usual Elo form

    ``P(win) = 1 / (1 + BASE ** ((rating_b - rating_a) / SCALE))``.

    The function will use :mod:`scipy.optimize` if available for a fast
    optimisation of the negative log-likelihood.  If SciPy is not installed,
    it falls back to the simple gradient-descent routine previously used.
    """

    models = sorted({b["model_a"] for b in battles} | {b["model_b"] for b in battles})
    battles_list = list(battles)

    if use_scipy:
        try:
            import numpy as np
            from scipy.optimize import minimize
        except Exception:  # pragma: no cover - SciPy not available
            use_scipy = False

    if use_scipy:
        idx = {m: k for k, m in enumerate(models)}

        def nll(theta_vec: "np.ndarray") -> float:
            loss = 0.0
            for row in battles_list:
                i = idx[row["model_a"]]
                j = idx[row["model_b"]]
                s = math.log(BASE) * (theta_vec[i] - theta_vec[j])
                p = _sigmoid(s)
                y = 1.0 if row["winner"] == "model_a" else 0.0
                if str(row["winner"]).startswith("tie"):
                    y = 0.5
                # Binary cross entropy with y in [0, 1]
                loss -= y * math.log(p) + (1 - y) * math.log(1 - p)
            return loss

        theta0 = [0.0] * len(models)
        res = minimize(nll, theta0, method="BFGS")
        theta_opt = res.x - sum(res.x) / len(res.x)
        theta = {m: theta_opt[idx[m]] for m in models}
    else:
        theta = {m: 0.0 for m in models}
        for _ in range(n_iter):
            grad = {m: 0.0 for m in models}
            for row in battles_list:
                i = row["model_a"]
                j = row["model_b"]
                w = row["winner"]
                y = 1.0 if w == "model_a" else 0.0
                if str(w).startswith("tie"):
                    y = 0.5
                s = math.log(BASE) * (theta[i] - theta[j])
                p = _sigmoid(s)
                diff = (p - y) * math.log(BASE)
                grad[i] += diff
                grad[j] -= diff
            for m in models:
                theta[m] -= lr * grad[m] / len(battles_list)

    return {m: SCALE * theta[m] + INIT_RATING for m in sorted(models, key=lambda x: -theta[x])}