Spaces:
Sleeping
Sleeping
File size: 4,032 Bytes
8b4a5e6 5a416a8 8b4a5e6 5a416a8 8b4a5e6 5a416a8 8b4a5e6 5a416a8 8b4a5e6 5a416a8 8b4a5e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
from __future__ import annotations
import numpy as np
import pandas as pd
from typing import Dict, Any, Tuple
class EmpiricalBayesHierarchicalThompson:
"""
各アームのCTRを Beta 事前で表現し、事前は全体から経験ベイズ推定。
事後: Beta(a0+clicks, b0+impressions-clicks) → Thompson Sampling。
"""
def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42):
self.min_explore = min_explore
self.margin = margin
self.n_draws = n_draws
self.rng = np.random.default_rng(seed)
@staticmethod
def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]:
ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2)
m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6))
v = float(np.var(ctr, ddof=1))
if not np.isfinite(v) or v <= 1e-8:
return 1.0, 1.0
k = m * (1 - m) / v - 1.0
if k <= 0 or not np.isfinite(k):
return 1.0, 1.0
a0 = float(np.clip(m * k, 0.5, 1000))
b0 = float(np.clip((1 - m) * k, 0.5, 1000))
return a0, b0
def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame:
a0, b0 = self._eb_prior_by_moments(df)
post = df.copy()
post["alpha"] = a0 + post["clicks"].astype(float)
post["beta"] = b0 + (post["impressions"] - post["clicks"]).astype(float)
post["post_mean"] = post["alpha"] / (post["alpha"] + post["beta"])
post["post_var"] = (post["alpha"] * post["beta"]) / (((post["alpha"] + post["beta"])**2) * (post["alpha"] + post["beta"] + 1))
post["a0"] = a0
post["b0"] = b0
return post
def recommend(self, df: pd.DataFrame) -> Dict[str, Any]:
post = self._posterior_params(df)
out = {}
for medium, g in post.groupby("medium"):
arms = g.reset_index(drop=True)
K = len(arms)
samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K))
# ベースライン(control があれば優先)
if (arms["is_control"] == 1).any():
base_idx = int(arms.index[arms["is_control"] == 1][0])
else:
base_idx = int(arms["post_mean"].idxmax())
base_col = list(arms.index).index(base_idx)
winners = np.argmax(samples, axis=1)
win_prob = np.bincount(winners, minlength=K) / self.n_draws
worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1)
decisions = []
for k in range(K):
d = {
"creative": arms.loc[k, "creative"],
"is_control": int(arms.loc[k, "is_control"]),
"post_mean": float(arms.loc[k, "post_mean"]),
"win_prob": float(win_prob[k]),
"worse_than_base_prob": float(worse_than_base[k]),
"status": "hold"
}
if d["worse_than_base_prob"] >= 0.9 and arms.loc[k, "impressions"] >= 200:
d["status"] = "stop"
elif d["win_prob"] >= 0.95 and arms.loc[k, "impressions"] >= 200:
d["status"] = "boost"
decisions.append(d)
alloc = win_prob.copy()
alloc = alloc / alloc.sum()
alloc = np.clip(alloc, self.min_explore, 1.0)
alloc = alloc / alloc.sum()
out[str(medium)] = {
"arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign(
win_prob=win_prob, worse_than_base_prob=worse_than_base,
).to_dict(orient="records"),
"allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)},
"decisions": decisions,
"posterior_prior": {"a0": float(arms.loc[0, "a0"]), "b0": float(arms.loc[0, "b0"])},
}
return out
|