"""Bradley-Terry MLE rating computation for pairwise comparisons.""" from __future__ import annotations import math import random from collections import defaultdict from dataclasses import dataclass, field from typing import Literal import numpy as np from scipy.optimize import minimize INITIAL_ELO: float = 1500.0 Winner = Literal["A", "B", "tie"] @dataclass class ComparisonResult: """Result of a single pairwise comparison, ready for ELO computation.""" sample_idx: int model_a: str model_b: str winner: Winner reason: str = "" agreement: str = "1/1" swapped: bool = False text_a: str = "" text_b: str = "" col_a: str = "" col_b: str = "" @dataclass class Leaderboard: """ELO leaderboard computed from pairwise comparison results.""" elo: dict[str, float] = field(default_factory=dict) wins: dict[str, int] = field(default_factory=dict) losses: dict[str, int] = field(default_factory=dict) ties: dict[str, int] = field(default_factory=dict) comparison_log: list[dict[str, object]] = field(default_factory=list) elo_ci: dict[str, tuple[float, float]] = field(default_factory=dict) @property def ranked(self) -> list[tuple[str, float]]: """Models sorted by ELO rating, descending.""" return sorted(self.elo.items(), key=lambda x: x[1], reverse=True) def win_pct(self, model: str) -> float | None: """Win percentage for a model, or None if no comparisons.""" total = self.wins[model] + self.losses[model] + self.ties[model] if total == 0: return None return self.wins[model] / total * 100 def _unswap_winner(winner: Winner, swapped: bool) -> Winner: """Unswap winner if positions were randomized.""" if swapped: if winner == "A": return "B" elif winner == "B": return "A" return winner def _build_win_matrix( results: list[ComparisonResult], ) -> tuple[dict[tuple[str, str], float], set[str]]: """Count wins per ordered pair. Ties count as 0.5 for each side. Returns (win_counts, models_seen) where win_counts[(i, j)] = fractional wins of i over j. """ win_counts: dict[tuple[str, str], float] = defaultdict(float) models_seen: set[str] = set() for r in results: winner = _unswap_winner(r.winner, r.swapped) models_seen.add(r.model_a) models_seen.add(r.model_b) if winner == "A": win_counts[(r.model_a, r.model_b)] += 1.0 elif winner == "B": win_counts[(r.model_b, r.model_a)] += 1.0 else: win_counts[(r.model_a, r.model_b)] += 0.5 win_counts[(r.model_b, r.model_a)] += 0.5 return win_counts, models_seen def _bt_mle( win_counts: dict[tuple[str, str], float], model_names: list[str], ) -> dict[str, float]: """Fit Bradley-Terry model via maximum likelihood estimation. Returns theta (strength) per model. Uses scipy L-BFGS-B on the negative log-likelihood with log-parameterization for positivity. """ n = len(model_names) if n == 0: return {} if n == 1: return {model_names[0]: 1.0} idx = {name: i for i, name in enumerate(model_names)} # Collect all pairs with nonzero games pairs: list[tuple[int, int, float, float]] = [] for i_name in model_names: for j_name in model_names: if i_name >= j_name: continue w_ij = win_counts.get((i_name, j_name), 0.0) w_ji = win_counts.get((j_name, i_name), 0.0) if w_ij + w_ji > 0: pairs.append((idx[i_name], idx[j_name], w_ij, w_ji)) if not pairs: return {name: 1.0 for name in model_names} def neg_log_likelihood(log_theta: np.ndarray) -> float: nll = 0.0 for i, j, w_ij, w_ji in pairs: diff = log_theta[i] - log_theta[j] # log(theta_i / (theta_i + theta_j)) = diff - log(1 + exp(diff)) # log(theta_j / (theta_i + theta_j)) = -diff - log(1 + exp(-diff)) # Use log-sum-exp for numerical stability log_p_ij = diff - np.logaddexp(0.0, diff) log_p_ji = -diff - np.logaddexp(0.0, -diff) nll -= w_ij * log_p_ij + w_ji * log_p_ji return nll def gradient(log_theta: np.ndarray) -> np.ndarray: grad = np.zeros(n) for i, j, w_ij, w_ji in pairs: diff = log_theta[i] - log_theta[j] p_ij = 1.0 / (1.0 + np.exp(-diff)) # sigmoid(diff) total = w_ij + w_ji # d(NLL)/d(log_theta_i) grad[i] -= w_ij - total * p_ij grad[j] -= w_ji - total * (1.0 - p_ij) return grad # Pin first model at 0 to fix the scale x0 = np.zeros(n) result = minimize( neg_log_likelihood, x0, jac=gradient, method="L-BFGS-B", ) log_theta = result.x # Center: subtract geometric mean (= mean of log_theta) log_theta -= log_theta.mean() theta = np.exp(log_theta) return {name: float(theta[idx[name]]) for name in model_names} def _theta_to_elo(theta: dict[str, float], center: float = 1500.0) -> dict[str, float]: """Convert BT theta values to ELO scale. ELO_i = 400 * log10(theta_i / theta_ref) + center where theta_ref is the geometric mean of all theta values. """ if not theta: return {} values = list(theta.values()) log_geo_mean = sum(math.log(v) for v in values) / len(values) geo_mean = math.exp(log_geo_mean) return { name: 400.0 * math.log10(t / geo_mean) + center for name, t in theta.items() } def _bootstrap_ci( results: list[ComparisonResult], model_names: list[str], n_bootstrap: int = 1000, ci: float = 0.95, seed: int = 42, ) -> dict[str, tuple[float, float]]: """Compute bootstrap confidence intervals for ELO ratings. Resamples comparisons with replacement, fits BT-MLE each time, returns percentile-based CIs. """ if not results or not model_names: return {} rng = random.Random(seed) n = len(results) elo_samples: dict[str, list[float]] = {name: [] for name in model_names} for _ in range(n_bootstrap): boot = rng.choices(results, k=n) win_counts, _ = _build_win_matrix(boot) theta = _bt_mle(win_counts, model_names) elos = _theta_to_elo(theta) for name in model_names: elo_samples[name].append(elos.get(name, 1500.0)) alpha = (1.0 - ci) / 2.0 lo_pct = alpha * 100 hi_pct = (1.0 - alpha) * 100 cis: dict[str, tuple[float, float]] = {} for name in model_names: samples = sorted(elo_samples[name]) lo_idx = int(len(samples) * lo_pct / 100) hi_idx = min(int(len(samples) * hi_pct / 100), len(samples) - 1) cis[name] = (samples[lo_idx], samples[hi_idx]) return cis def rankings_resolved(board: Leaderboard) -> bool: """Check if all adjacent ranks have non-overlapping 95% CIs. Returns True when the ranking order is statistically resolved — i.e. for every pair of adjacent models in the ranking, the higher-ranked model's CI lower bound exceeds the lower-ranked model's CI upper bound. """ if not board.elo_ci: return False ranked = board.ranked if len(ranked) < 2: return False for i in range(len(ranked) - 1): model_hi, _ = ranked[i] model_lo, _ = ranked[i + 1] if model_hi not in board.elo_ci or model_lo not in board.elo_ci: return False lo_of_higher, _ = board.elo_ci[model_hi] _, hi_of_lower = board.elo_ci[model_lo] if hi_of_lower >= lo_of_higher: return False # CIs overlap return True def compute_elo( results: list[ComparisonResult], model_names: list[str], n_bootstrap: int = 1000, ) -> Leaderboard: """Compute ELO ratings from pairwise comparison results using Bradley-Terry MLE. Handles position-bias unswapping: if a result has swapped=True, the winner is flipped before updating ratings. Bootstrap confidence intervals are computed when n_bootstrap > 0. """ board = Leaderboard( elo={m: INITIAL_ELO for m in model_names}, wins={m: 0 for m in model_names}, losses={m: 0 for m in model_names}, ties={m: 0 for m in model_names}, ) # Tally wins/losses/ties and build comparison log for r in results: winner = _unswap_winner(r.winner, r.swapped) if winner == "A": board.wins[r.model_a] += 1 board.losses[r.model_b] += 1 elif winner == "B": board.losses[r.model_a] += 1 board.wins[r.model_b] += 1 else: board.ties[r.model_a] += 1 board.ties[r.model_b] += 1 board.comparison_log.append( { "sample_idx": r.sample_idx, "model_a": r.model_a, "model_b": r.model_b, "winner": winner, "reason": r.reason, "agreement": r.agreement, "text_a": r.text_a, "text_b": r.text_b, "col_a": r.col_a, "col_b": r.col_b, } ) # Fit BT-MLE win_counts, _ = _build_win_matrix(results) theta = _bt_mle(win_counts, model_names) board.elo = _theta_to_elo(theta) # Bootstrap CIs if n_bootstrap > 0 and results: board.elo_ci = _bootstrap_ci(results, model_names, n_bootstrap=n_bootstrap) return board