Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, Any, Tuple | |
| class EmpiricalBayesHierarchicalThompson: | |
| """ | |
| 各アームのCTRを Beta 事前で表現し、事前は全体から経験ベイズ推定。 | |
| 事後: Beta(a0+clicks, b0+impressions-clicks) → Thompson Sampling。 | |
| """ | |
| def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42): | |
| self.min_explore = min_explore | |
| self.margin = margin | |
| self.n_draws = n_draws | |
| self.rng = np.random.default_rng(seed) | |
| def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]: | |
| ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2) | |
| m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6)) | |
| v = float(np.var(ctr, ddof=1)) | |
| if not np.isfinite(v) or v <= 1e-8: | |
| return 1.0, 1.0 | |
| k = m * (1 - m) / v - 1.0 | |
| if k <= 0 or not np.isfinite(k): | |
| return 1.0, 1.0 | |
| a0 = float(np.clip(m * k, 0.5, 1000)) | |
| b0 = float(np.clip((1 - m) * k, 0.5, 1000)) | |
| return a0, b0 | |
| def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame: | |
| a0, b0 = self._eb_prior_by_moments(df) | |
| post = df.copy() | |
| post["alpha"] = a0 + post["clicks"].astype(float) | |
| post["beta"] = b0 + (post["impressions"] - post["clicks"]).astype(float) | |
| post["post_mean"] = post["alpha"] / (post["alpha"] + post["beta"]) | |
| post["post_var"] = (post["alpha"] * post["beta"]) / (((post["alpha"] + post["beta"])**2) * (post["alpha"] + post["beta"] + 1)) | |
| post["a0"] = a0 | |
| post["b0"] = b0 | |
| return post | |
| def recommend(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| post = self._posterior_params(df) | |
| out = {} | |
| for medium, g in post.groupby("medium"): | |
| arms = g.reset_index(drop=True) | |
| K = len(arms) | |
| samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K)) | |
| # ベースライン(control があれば優先) | |
| if (arms["is_control"] == 1).any(): | |
| base_idx = int(arms.index[arms["is_control"] == 1][0]) | |
| else: | |
| base_idx = int(arms["post_mean"].idxmax()) | |
| base_col = list(arms.index).index(base_idx) | |
| winners = np.argmax(samples, axis=1) | |
| win_prob = np.bincount(winners, minlength=K) / self.n_draws | |
| worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1) | |
| decisions = [] | |
| for k in range(K): | |
| d = { | |
| "creative": arms.loc[k, "creative"], | |
| "is_control": int(arms.loc[k, "is_control"]), | |
| "post_mean": float(arms.loc[k, "post_mean"]), | |
| "win_prob": float(win_prob[k]), | |
| "worse_than_base_prob": float(worse_than_base[k]), | |
| "status": "hold" | |
| } | |
| if d["worse_than_base_prob"] >= 0.9 and arms.loc[k, "impressions"] >= 200: | |
| d["status"] = "stop" | |
| elif d["win_prob"] >= 0.95 and arms.loc[k, "impressions"] >= 200: | |
| d["status"] = "boost" | |
| decisions.append(d) | |
| alloc = win_prob.copy() | |
| alloc = alloc / alloc.sum() | |
| alloc = np.clip(alloc, self.min_explore, 1.0) | |
| alloc = alloc / alloc.sum() | |
| out[str(medium)] = { | |
| "arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign( | |
| win_prob=win_prob, worse_than_base_prob=worse_than_base, | |
| ).to_dict(orient="records"), | |
| "allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)}, | |
| "decisions": decisions, | |
| "posterior_prior": {"a0": float(arms.loc[0, "a0"]), "b0": float(arms.loc[0, "b0"])}, | |
| } | |
| return out | |