Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json, math, hashlib | |
| from typing import Dict, Any, Tuple | |
| import numpy as np | |
| from . import storage | |
| def _seg_bucket(segment: str, buckets: int = 8) -> int: | |
| if not segment: | |
| return 0 | |
| h = hashlib.sha1(segment.encode("utf-8")).hexdigest() | |
| return int(h[:8], 16) % buckets | |
| def feature_vec(context: Dict[str, Any], d_seg: int = 8) -> np.ndarray: | |
| hour = int(context.get("hour", 12)) | |
| seg = str(context.get("segment") or "") | |
| x = np.zeros(1 + 24 + d_seg, dtype=float) | |
| x[0] = 1.0 # bias | |
| x[1 + max(0, min(23, hour))] = 1.0 # hour one-hot | |
| x[1 + 24 + _seg_bucket(seg, d_seg)] = 1.0 | |
| return x | |
| class LinUCB: | |
| """ | |
| クリック率のLinUCB。p_click ≈ theta^T x + α * sqrt(x^T A^{-1} x) | |
| CVRはベータ平均で補完し、EV=pc_ucb*E[CVR]*V を最大化。 | |
| """ | |
| def __init__(self, campaign_id: str, alpha: float = 0.3, d_seg: int = 8): | |
| self.campaign_id = campaign_id | |
| self.alpha = float(alpha) | |
| self.d_seg = d_seg | |
| self.d = 1 + 24 + d_seg | |
| def _load_state(self, variant_id: str) -> Tuple[np.ndarray, np.ndarray, int]: | |
| row = storage.get_linucb_state(self.campaign_id, variant_id) | |
| if row: | |
| A = np.array(json.loads(row["A_json"]), dtype=float).reshape(self.d, self.d) | |
| b = np.array(json.loads(row["b_json"]), dtype=float).reshape(self.d, 1) | |
| return A, b, int(row["n_updates"]) | |
| A = np.eye(self.d) | |
| b = np.zeros((self.d, 1)) | |
| return A, b, 0 | |
| def _save_state(self, variant_id: str, A: np.ndarray, b: np.ndarray, n_updates: int): | |
| storage.upsert_linucb_state( | |
| self.campaign_id, variant_id, self.d, | |
| json.dumps(A.reshape(-1).tolist()), | |
| json.dumps(b.reshape(-1).tolist()), | |
| int(n_updates), | |
| ) | |
| def choose(self, context: Dict[str, Any]) -> Tuple[str | None, float]: | |
| mets = storage.get_metrics(self.campaign_id) | |
| if not mets: | |
| return None, -1.0 | |
| vpc = storage.get_campaign_value_per_conversion(self.campaign_id) | |
| x = feature_vec(context, self.d_seg).reshape(self.d, 1) | |
| best_score, best_vid = -1.0, None | |
| for r in mets: | |
| vid = r["variant_id"] | |
| A, b, _ = self._load_state(vid) | |
| try: | |
| A_inv = np.linalg.inv(A) | |
| except np.linalg.LinAlgError: | |
| A_inv = np.linalg.pinv(A) | |
| theta = A_inv @ b | |
| mean = float((theta.T @ x)[0, 0]) | |
| ucb = self.alpha * float(math.sqrt((x.T @ A_inv @ x)[0, 0])) | |
| pc = max(0.0, min(1.0, mean + ucb)) | |
| # CVRはベータ平均で補完 | |
| av, bv = float(r["alpha_conv"]), float(r["beta_conv"]) | |
| pv_mean = av / max(1e-6, (av + bv)) | |
| score = pc * pv_mean * vpc | |
| if score > best_score: | |
| best_score, best_vid = score, vid | |
| return best_vid, best_score | |
| def update_click(self, variant_id: str, context: Dict[str, Any], reward: float): | |
| # クリック有無で更新(reward=1/0) | |
| x = feature_vec(context, self.d_seg).reshape(self.d, 1) | |
| A, b, n = self._load_state(variant_id) | |
| A = A + x @ x.T | |
| b = b + reward * x | |
| self._save_state(variant_id, A, b, n + 1) | |