from __future__ import annotations import numpy as np import pandas as pd from typing import Dict, Any, Tuple class EmpiricalBayesHierarchicalThompson: """ 各アームのCTRを Beta 事前で表現し、事前は全体から経験ベイズ推定。 事後: Beta(a0+clicks, b0+impressions-clicks) → Thompson Sampling。 """ def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42): self.min_explore = min_explore self.margin = margin self.n_draws = n_draws self.rng = np.random.default_rng(seed) @staticmethod def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]: ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2) m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6)) v = float(np.var(ctr, ddof=1)) if not np.isfinite(v) or v <= 1e-8: return 1.0, 1.0 k = m * (1 - m) / v - 1.0 if k <= 0 or not np.isfinite(k): return 1.0, 1.0 a0 = float(np.clip(m * k, 0.5, 1000)) b0 = float(np.clip((1 - m) * k, 0.5, 1000)) return a0, b0 def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame: a0, b0 = self._eb_prior_by_moments(df) post = df.copy() post["alpha"] = a0 + post["clicks"].astype(float) post["beta"] = b0 + (post["impressions"] - post["clicks"]).astype(float) post["post_mean"] = post["alpha"] / (post["alpha"] + post["beta"]) post["post_var"] = (post["alpha"] * post["beta"]) / (((post["alpha"] + post["beta"])**2) * (post["alpha"] + post["beta"] + 1)) post["a0"] = a0 post["b0"] = b0 return post def recommend(self, df: pd.DataFrame) -> Dict[str, Any]: post = self._posterior_params(df) out = {} for medium, g in post.groupby("medium"): arms = g.reset_index(drop=True) K = len(arms) samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K)) # ベースライン(control があれば優先) if (arms["is_control"] == 1).any(): base_idx = int(arms.index[arms["is_control"] == 1][0]) else: base_idx = int(arms["post_mean"].idxmax()) base_col = list(arms.index).index(base_idx) winners = np.argmax(samples, axis=1) win_prob = np.bincount(winners, minlength=K) / self.n_draws worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1) decisions = [] for k in range(K): d = { "creative": arms.loc[k, "creative"], "is_control": int(arms.loc[k, "is_control"]), "post_mean": float(arms.loc[k, "post_mean"]), "win_prob": float(win_prob[k]), "worse_than_base_prob": float(worse_than_base[k]), "status": "hold" } if d["worse_than_base_prob"] >= 0.9 and arms.loc[k, "impressions"] >= 200: d["status"] = "stop" elif d["win_prob"] >= 0.95 and arms.loc[k, "impressions"] >= 200: d["status"] = "boost" decisions.append(d) alloc = win_prob.copy() alloc = alloc / alloc.sum() alloc = np.clip(alloc, self.min_explore, 1.0) alloc = alloc / alloc.sum() out[str(medium)] = { "arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign( win_prob=win_prob, worse_than_base_prob=worse_than_base, ).to_dict(orient="records"), "allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)}, "decisions": decisions, "posterior_prior": {"a0": float(arms.loc[0, "a0"]), "b0": float(arms.loc[0, "b0"])}, } return out