Corin1998 commited on
Commit
5a416a8
·
verified ·
1 Parent(s): c5a52d6

Update bandit.py

Browse files
Files changed (1) hide show
  1. bandit.py +9 -32
bandit.py CHANGED
@@ -3,49 +3,30 @@ import numpy as np
3
  import pandas as pd
4
  from typing import Dict, Any, Tuple
5
 
6
- # ------------------------------
7
- # Empirical Bayes hierarchical Beta-Binomial + Thompson Sampling
8
- # ------------------------------
9
-
10
  class EmpiricalBayesHierarchicalThompson:
11
  """
12
- 各アーム (medium, creative) のクリック率 p を Beta 事前分布で表現。
13
- 事前 Beta(a0, b0) は全アームの経験ベイズ推定で安定化。
14
- 事後: Beta(a0 + clicks, b0 + impressions - clicks)
15
-
16
- * 少データ時に極端な推定を避ける
17
- * Thompson Sampling により配分を提案
18
- * 自動停止/増配分判断を提供
19
  """
20
 
21
  def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42):
22
- self.min_explore = min_explore # 各アームの最低配分
23
- self.margin = margin # 改善余地のマージン(ex: 0.002 = 0.2pp)
24
  self.n_draws = n_draws
25
  self.rng = np.random.default_rng(seed)
26
 
27
  @staticmethod
28
  def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]:
29
- """
30
- アーム別 CTR の分散を利用して Beta(a0, b0) を MoM 推定。
31
- 既知の式: mean=m, var=v => a0 = m*(m*(1-m)/v - 1), b0 = (1-m)*(m*(1-m)/v - 1)
32
- v が小さすぎる/0 の場合は弱情報事前を返す。
33
- """
34
- # 各アームの粗推定 CTR(Laplace 平滑で安定化)
35
  ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2)
36
  m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6))
37
  v = float(np.var(ctr, ddof=1))
38
  if not np.isfinite(v) or v <= 1e-8:
39
- # ほぼ同一のCTR → 弱情報事前
40
  return 1.0, 1.0
41
  k = m * (1 - m) / v - 1.0
42
  if k <= 0 or not np.isfinite(k):
43
  return 1.0, 1.0
44
- a0 = m * k
45
- b0 = (1 - m) * k
46
- # 上限/下限を設定
47
- a0 = float(np.clip(a0, 0.5, 1000))
48
- b0 = float(np.clip(b0, 0.5, 1000))
49
  return a0, b0
50
 
51
  def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame:
@@ -60,14 +41,14 @@ class EmpiricalBayesHierarchicalThompson:
60
  return post
61
 
62
  def recommend(self, df: pd.DataFrame) -> Dict[str, Any]:
63
- """媒体ごとにTSで配分率を提案し、停止/増配分を判断。"""
64
  post = self._posterior_params(df)
65
  out = {}
66
  for medium, g in post.groupby("medium"):
67
  arms = g.reset_index(drop=True)
68
  K = len(arms)
69
  samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K))
70
- # ベースライン(control があればそれを優先)
 
71
  if (arms["is_control"] == 1).any():
72
  base_idx = int(arms.index[arms["is_control"] == 1][0])
73
  else:
@@ -77,10 +58,8 @@ class EmpiricalBayesHierarchicalThompson:
77
  winners = np.argmax(samples, axis=1)
78
  win_prob = np.bincount(winners, minlength=K) / self.n_draws
79
 
80
- # 各アームがベースより (margin) だけ下回る確率
81
  worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1)
82
 
83
- # 停止・増配分判定
84
  decisions = []
85
  for k in range(K):
86
  d = {
@@ -97,7 +76,6 @@ class EmpiricalBayesHierarchicalThompson:
97
  d["status"] = "boost"
98
  decisions.append(d)
99
 
100
- # 配分:勝者確率に基づき、min_explore を確保
101
  alloc = win_prob.copy()
102
  alloc = alloc / alloc.sum()
103
  alloc = np.clip(alloc, self.min_explore, 1.0)
@@ -105,8 +83,7 @@ class EmpiricalBayesHierarchicalThompson:
105
 
106
  out[str(medium)] = {
107
  "arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign(
108
- win_prob=win_prob,
109
- worse_than_base_prob=worse_than_base,
110
  ).to_dict(orient="records"),
111
  "allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)},
112
  "decisions": decisions,
 
3
  import pandas as pd
4
  from typing import Dict, Any, Tuple
5
 
 
 
 
 
6
  class EmpiricalBayesHierarchicalThompson:
7
  """
8
+ 各アームのCTRを Beta 事前で表現し、事前は全体から経験ベイズ推定。
9
+ 事後: Beta(a0+clicks, b0+impressions-clicks) → Thompson Sampling。
 
 
 
 
 
10
  """
11
 
12
  def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42):
13
+ self.min_explore = min_explore
14
+ self.margin = margin
15
  self.n_draws = n_draws
16
  self.rng = np.random.default_rng(seed)
17
 
18
  @staticmethod
19
  def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]:
 
 
 
 
 
 
20
  ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2)
21
  m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6))
22
  v = float(np.var(ctr, ddof=1))
23
  if not np.isfinite(v) or v <= 1e-8:
 
24
  return 1.0, 1.0
25
  k = m * (1 - m) / v - 1.0
26
  if k <= 0 or not np.isfinite(k):
27
  return 1.0, 1.0
28
+ a0 = float(np.clip(m * k, 0.5, 1000))
29
+ b0 = float(np.clip((1 - m) * k, 0.5, 1000))
 
 
 
30
  return a0, b0
31
 
32
  def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame:
 
41
  return post
42
 
43
  def recommend(self, df: pd.DataFrame) -> Dict[str, Any]:
 
44
  post = self._posterior_params(df)
45
  out = {}
46
  for medium, g in post.groupby("medium"):
47
  arms = g.reset_index(drop=True)
48
  K = len(arms)
49
  samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K))
50
+
51
+ # ベースライン(control があれば優先)
52
  if (arms["is_control"] == 1).any():
53
  base_idx = int(arms.index[arms["is_control"] == 1][0])
54
  else:
 
58
  winners = np.argmax(samples, axis=1)
59
  win_prob = np.bincount(winners, minlength=K) / self.n_draws
60
 
 
61
  worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1)
62
 
 
63
  decisions = []
64
  for k in range(K):
65
  d = {
 
76
  d["status"] = "boost"
77
  decisions.append(d)
78
 
 
79
  alloc = win_prob.copy()
80
  alloc = alloc / alloc.sum()
81
  alloc = np.clip(alloc, self.min_explore, 1.0)
 
83
 
84
  out[str(medium)] = {
85
  "arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign(
86
+ win_prob=win_prob, worse_than_base_prob=worse_than_base,
 
87
  ).to_dict(orient="records"),
88
  "allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)},
89
  "decisions": decisions,