Corin1998's picture
Upload 8 files
8b4a5e6 verified
raw
history blame
5.22 kB
from __future__ import annotations
import numpy as np
import pandas as pd
from typing import Dict, Any, Tuple
# ------------------------------
# Empirical Bayes hierarchical Beta-Binomial + Thompson Sampling
# ------------------------------
class EmpiricalBayesHierarchicalThompson:
"""
各アーム (medium, creative) のクリック率 p を Beta 事前分布で表現。
事前 Beta(a0, b0) は全アームの経験ベイズ推定で安定化。
事後: Beta(a0 + clicks, b0 + impressions - clicks)
* 少データ時に極端な推定を避ける
* Thompson Sampling により配分を提案
* 自動停止/増配分判断を提供
"""
def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42):
self.min_explore = min_explore # 各アームの最低配分
self.margin = margin # 改善余地のマージン(ex: 0.002 = 0.2pp)
self.n_draws = n_draws
self.rng = np.random.default_rng(seed)
@staticmethod
def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]:
"""
アーム別 CTR の分散を利用して Beta(a0, b0) を MoM 推定。
既知の式: mean=m, var=v => a0 = m*(m*(1-m)/v - 1), b0 = (1-m)*(m*(1-m)/v - 1)
v が小さすぎる/0 の場合は弱情報事前を返す。
"""
# 各アームの粗推定 CTR(Laplace 平滑で安定化)
ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2)
m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6))
v = float(np.var(ctr, ddof=1))
if not np.isfinite(v) or v <= 1e-8:
# ほぼ同一のCTR → 弱情報事前
return 1.0, 1.0
k = m * (1 - m) / v - 1.0
if k <= 0 or not np.isfinite(k):
return 1.0, 1.0
a0 = m * k
b0 = (1 - m) * k
# 上限/下限を設定
a0 = float(np.clip(a0, 0.5, 1000))
b0 = float(np.clip(b0, 0.5, 1000))
return a0, b0
def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame:
a0, b0 = self._eb_prior_by_moments(df)
post = df.copy()
post["alpha"] = a0 + post["clicks"].astype(float)
post["beta"] = b0 + (post["impressions"] - post["clicks"]).astype(float)
post["post_mean"] = post["alpha"] / (post["alpha"] + post["beta"])
post["post_var"] = (post["alpha"] * post["beta"]) / (((post["alpha"] + post["beta"])**2) * (post["alpha"] + post["beta"] + 1))
post["a0"] = a0
post["b0"] = b0
return post
def recommend(self, df: pd.DataFrame) -> Dict[str, Any]:
"""媒体ごとにTSで配分率を提案し、停止/増配分を判断。"""
post = self._posterior_params(df)
out = {}
for medium, g in post.groupby("medium"):
arms = g.reset_index(drop=True)
K = len(arms)
samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K))
# ベースライン(control があればそれを優先)
if (arms["is_control"] == 1).any():
base_idx = int(arms.index[arms["is_control"] == 1][0])
else:
base_idx = int(arms["post_mean"].idxmax())
base_col = list(arms.index).index(base_idx)
winners = np.argmax(samples, axis=1)
win_prob = np.bincount(winners, minlength=K) / self.n_draws
# 各アームがベースより (margin) だけ下回る確率
worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1)
# 停止・増配分判定
decisions = []
for k in range(K):
d = {
"creative": arms.loc[k, "creative"],
"is_control": int(arms.loc[k, "is_control"]),
"post_mean": float(arms.loc[k, "post_mean"]),
"win_prob": float(win_prob[k]),
"worse_than_base_prob": float(worse_than_base[k]),
"status": "hold"
}
if d["worse_than_base_prob"] >= 0.9 and arms.loc[k, "impressions"] >= 200:
d["status"] = "stop"
elif d["win_prob"] >= 0.95 and arms.loc[k, "impressions"] >= 200:
d["status"] = "boost"
decisions.append(d)
# 配分:勝者確率に基づき、min_explore を確保
alloc = win_prob.copy()
alloc = alloc / alloc.sum()
alloc = np.clip(alloc, self.min_explore, 1.0)
alloc = alloc / alloc.sum()
out[str(medium)] = {
"arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign(
win_prob=win_prob,
worse_than_base_prob=worse_than_base,
).to_dict(orient="records"),
"allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)},
"decisions": decisions,
"posterior_prior": {"a0": float(arms.loc[0, "a0"]), "b0": float(arms.loc[0, "b0"])},
}
return out