File size: 4,032 Bytes
8b4a5e6
 
 
 
 
 
 
5a416a8
 
8b4a5e6
 
 
5a416a8
 
8b4a5e6
 
 
 
 
 
 
 
 
 
 
 
 
5a416a8
 
8b4a5e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a416a8
 
8b4a5e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a416a8
8b4a5e6
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from __future__ import annotations
import numpy as np
import pandas as pd
from typing import Dict, Any, Tuple

class EmpiricalBayesHierarchicalThompson:
    """
    各アームのCTRを Beta 事前で表現し、事前は全体から経験ベイズ推定。
    事後: Beta(a0+clicks, b0+impressions-clicks) → Thompson Sampling。
    """

    def __init__(self, min_explore: float = 0.05, margin: float = 0.0, n_draws: int = 20000, seed: int = 42):
        self.min_explore = min_explore
        self.margin = margin
        self.n_draws = n_draws
        self.rng = np.random.default_rng(seed)

    @staticmethod
    def _eb_prior_by_moments(arm_df: pd.DataFrame) -> Tuple[float, float]:
        ctr = (arm_df["clicks"] + 1) / (arm_df["impressions"] + 2)
        m = float(np.clip(ctr.mean(), 1e-6, 1 - 1e-6))
        v = float(np.var(ctr, ddof=1))
        if not np.isfinite(v) or v <= 1e-8:
            return 1.0, 1.0
        k = m * (1 - m) / v - 1.0
        if k <= 0 or not np.isfinite(k):
            return 1.0, 1.0
        a0 = float(np.clip(m * k, 0.5, 1000))
        b0 = float(np.clip((1 - m) * k, 0.5, 1000))
        return a0, b0

    def _posterior_params(self, df: pd.DataFrame) -> pd.DataFrame:
        a0, b0 = self._eb_prior_by_moments(df)
        post = df.copy()
        post["alpha"] = a0 + post["clicks"].astype(float)
        post["beta"]  = b0 + (post["impressions"] - post["clicks"]).astype(float)
        post["post_mean"] = post["alpha"] / (post["alpha"] + post["beta"])
        post["post_var"]  = (post["alpha"] * post["beta"]) / (((post["alpha"] + post["beta"])**2) * (post["alpha"] + post["beta"] + 1))
        post["a0"] = a0
        post["b0"] = b0
        return post

    def recommend(self, df: pd.DataFrame) -> Dict[str, Any]:
        post = self._posterior_params(df)
        out = {}
        for medium, g in post.groupby("medium"):
            arms = g.reset_index(drop=True)
            K = len(arms)
            samples = self.rng.beta(arms["alpha"].values, arms["beta"].values, size=(self.n_draws, K))

            # ベースライン(control があれば優先)
            if (arms["is_control"] == 1).any():
                base_idx = int(arms.index[arms["is_control"] == 1][0])
            else:
                base_idx = int(arms["post_mean"].idxmax())
            base_col = list(arms.index).index(base_idx)

            winners = np.argmax(samples, axis=1)
            win_prob = np.bincount(winners, minlength=K) / self.n_draws

            worse_than_base = (samples.T < (samples[:, base_col] - self.margin)).mean(axis=1)

            decisions = []
            for k in range(K):
                d = {
                    "creative": arms.loc[k, "creative"],
                    "is_control": int(arms.loc[k, "is_control"]),
                    "post_mean": float(arms.loc[k, "post_mean"]),
                    "win_prob": float(win_prob[k]),
                    "worse_than_base_prob": float(worse_than_base[k]),
                    "status": "hold"
                }
                if d["worse_than_base_prob"] >= 0.9 and arms.loc[k, "impressions"] >= 200:
                    d["status"] = "stop"
                elif d["win_prob"] >= 0.95 and arms.loc[k, "impressions"] >= 200:
                    d["status"] = "boost"
                decisions.append(d)

            alloc = win_prob.copy()
            alloc = alloc / alloc.sum()
            alloc = np.clip(alloc, self.min_explore, 1.0)
            alloc = alloc / alloc.sum()

            out[str(medium)] = {
                "arms": arms[["creative", "impressions", "clicks", "post_mean", "is_control"]].assign(
                    win_prob=win_prob, worse_than_base_prob=worse_than_base,
                ).to_dict(orient="records"),
                "allocation": {str(arms.loc[k, "creative"]): float(alloc[k]) for k in range(K)},
                "decisions": decisions,
                "posterior_prior": {"a0": float(arms.loc[0, "a0"]), "b0": float(arms.loc[0, "b0"])},
            }
        return out