""" AdRL Studio — Contextual Bandit Ad Recommendation Engine This application implements and benchmarks four reinforcement learning contextual bandit algorithms for ad recommendation: (1) ε-Greedy Neural Bandit using a shared PyTorch MLP, (2) UCB1 (Upper Confidence Bound), a non-contextual baseline, (3) Thompson Sampling with Beta distribution priors, and (4) LinUCB Disjoint Model, the industry-standard contextual bandit used in production ad systems. The simulated environment features 20 ads across 5 categories and 5 user context features (age group, device, time of day, content category, region) encoded as a 19-dimensional one-hot vector. True click-through rates are determined by hidden weight vectors initialized at startup (seed=42). Algorithms observe only bandit feedback — the reward for the chosen arm only — and must balance exploration vs. exploitation to minimize cumulative regret. """ import json import math import threading import numpy as np import torch import torch.nn as nn import torch.optim as optim from flask import Flask, Response, jsonify, render_template_string, request from scipy import stats app = Flask(__name__) # ───────────────────────────────────────────────────────────────────────────── # Environment constants # ───────────────────────────────────────────────────────────────────────────── np.random.seed(42) AGE_GROUPS = ["young_adult", "adult", "senior"] DEVICES = ["mobile", "desktop", "tablet"] TIMES_OF_DAY = ["morning", "afternoon", "evening", "night"] CONTENT_CATS = ["tech", "sports", "lifestyle", "news", "entertainment"] REGIONS = ["north_america", "europe", "asia", "other"] CONTEXT_DIM = len(AGE_GROUPS) + len(DEVICES) + len(TIMES_OF_DAY) + len(CONTENT_CATS) + len(REGIONS) # 19 N_ADS = 20 AD_IDS = [f"ad_{i:02d}" for i in range(1, 21)] # Category mapping AD_CAT_MAP = {} for i, ad in enumerate(AD_IDS): cats = ["Tech","Fashion","Finance","Food","Travel"] AD_CAT_MAP[ad] = cats[i // 4] AD_FORMATS = { "ad_01":"banner","ad_02":"video","ad_03":"native","ad_04":"banner", "ad_05":"banner","ad_06":"video","ad_07":"banner","ad_08":"native", "ad_09":"native","ad_10":"banner","ad_11":"video","ad_12":"native", "ad_13":"banner","ad_14":"native","ad_15":"banner","ad_16":"video", "ad_17":"video","ad_18":"banner","ad_19":"native","ad_20":"video", } AD_BIDS = { "ad_01":2.50,"ad_02":3.00,"ad_03":3.50,"ad_04":4.00, "ad_05":1.50,"ad_06":2.00,"ad_07":2.50,"ad_08":3.00, "ad_09":3.00,"ad_10":3.50,"ad_11":4.00,"ad_12":5.00, "ad_13":1.00,"ad_14":1.50,"ad_15":2.00,"ad_16":2.50, "ad_17":2.00,"ad_18":2.50,"ad_19":3.00,"ad_20":3.50, } # Hidden true CTR weights — fixed at startup, never exposed to algorithms _TRUE_WEIGHTS = np.random.randn(N_ADS, CONTEXT_DIM) * 0.3 def _sigmoid(x): return 1.0 / (1.0 + np.exp(-np.clip(x, -20, 20))) def true_ctr(ad_idx, ctx): return float(np.clip(_sigmoid(ctx @ _TRUE_WEIGHTS[ad_idx]), 0.02, 0.25)) def encode_context(age, device, tod, content, region): vec = np.zeros(CONTEXT_DIM, dtype=np.float32) offset = 0 vec[offset + AGE_GROUPS.index(age)] = 1.0; offset += len(AGE_GROUPS) vec[offset + DEVICES.index(device)] = 1.0; offset += len(DEVICES) vec[offset + TIMES_OF_DAY.index(tod)] = 1.0; offset += len(TIMES_OF_DAY) vec[offset + CONTENT_CATS.index(content)] = 1.0; offset += len(CONTENT_CATS) vec[offset + REGIONS.index(region)] = 1.0 return vec def sample_random_context(): return encode_context( np.random.choice(AGE_GROUPS), np.random.choice(DEVICES), np.random.choice(TIMES_OF_DAY), np.random.choice(CONTENT_CATS), np.random.choice(REGIONS), ) # ───────────────────────────────────────────────────────────────────────────── # Algorithm classes # ───────────────────────────────────────────────────────────────────────────── class EpsilonGreedyNeuralBandit: NAME = "ε-Greedy" COLOR = "#f59e0b" def __init__(self, epsilon=0.15, epsilon_min=0.01, decay=0.995, lr=0.01): self.epsilon_0 = epsilon self.epsilon_min = epsilon_min self.decay = decay self.lr = lr self.reset() def reset(self): self.t = 0 self.n_updates = 0 self.model = nn.Sequential( nn.Linear(CONTEXT_DIM + N_ADS, 32), nn.ReLU(), nn.Linear(32, 16), nn.ReLU(), nn.Linear(16, 1), nn.Sigmoid(), ) self.optimizer = optim.SGD(self.model.parameters(), lr=self.lr) self.criterion = nn.MSELoss() def _inp(self, ctx, ad_idx): oh = np.zeros(N_ADS, dtype=np.float32); oh[ad_idx] = 1.0 return torch.FloatTensor(np.concatenate([ctx, oh])) def _pred(self, ctx, ad_idx): self.model.eval() with torch.no_grad(): return self.model(self._inp(ctx, ad_idx)).item() def select(self, ctx): eps = max(self.epsilon_min, self.epsilon_0 * (self.decay ** self.t)) if np.random.rand() < eps: return int(np.random.randint(N_ADS)) ctx_rep = np.tile(ctx, (N_ADS, 1)) ad_eye = np.eye(N_ADS, dtype=np.float32) batch = torch.FloatTensor(np.hstack([ctx_rep, ad_eye])) self.model.eval() with torch.no_grad(): scores = self.model(batch).squeeze().numpy() return int(np.argmax(scores)) def predict_ctr(self, ctx, ad_idx): return self._pred(ctx, ad_idx) def update(self, ctx, action, reward): self.model.train() x = self._inp(ctx, action).unsqueeze(0) y = torch.FloatTensor([[float(reward)]]) self.optimizer.zero_grad() self.criterion(self.model(x), y).backward() self.optimizer.step() self.t += 1 self.n_updates += 1 class UCB1Bandit: NAME = "UCB1" COLOR = "#10b981" def __init__(self): self.reset() def reset(self): self.n_a = np.zeros(N_ADS) self.R_a = np.zeros(N_ADS) self.t = 0 self._init_idx = 0 self.n_updates = 0 def select(self, ctx): if self._init_idx < N_ADS: return self._init_idx mu = self.R_a / np.maximum(self.n_a, 1) bonus = np.sqrt(2.0 * np.log(max(self.t, 1)) / np.maximum(self.n_a, 1)) return int(np.argmax(mu + bonus)) def predict_ctr(self, ctx, ad_idx): if self.n_a[ad_idx] == 0: return 0.0 return float(self.R_a[ad_idx] / self.n_a[ad_idx]) def update(self, ctx, action, reward): if self._init_idx < N_ADS: self._init_idx += 1 self.n_a[action] += 1 self.R_a[action] += reward self.t += 1 self.n_updates += 1 class ThompsonSamplingBandit: NAME = "Thompson" COLOR = "#3b82f6" def __init__(self): self.reset() def reset(self): self.alpha = np.ones(N_ADS) self.beta_p = np.ones(N_ADS) self.n_updates = 0 def select(self, ctx): return int(np.argmax(np.random.beta(self.alpha, self.beta_p))) def predict_ctr(self, ctx, ad_idx): return float(self.alpha[ad_idx] / (self.alpha[ad_idx] + self.beta_p[ad_idx])) def update(self, ctx, action, reward): if reward == 1: self.alpha[action] += 1 else: self.beta_p[action] += 1 self.n_updates += 1 class LinUCBBandit: NAME = "LinUCB" COLOR = "#ef4444" def __init__(self, alpha=1.0): self.alpha = alpha self.reset() def reset(self): d = CONTEXT_DIM self.A = [np.identity(d) for _ in range(N_ADS)] self.A_inv = [np.identity(d) for _ in range(N_ADS)] self.b = [np.zeros(d) for _ in range(N_ADS)] self.n_updates = 0 def _ucb_score(self, ctx, ad_idx): A_inv = self.A_inv[ad_idx] theta = A_inv @ self.b[ad_idx] x = ctx return float(theta @ x + self.alpha * math.sqrt(max(float(x @ A_inv @ x), 0.0))) def select(self, ctx): return int(np.argmax([self._ucb_score(ctx, a) for a in range(N_ADS)])) def predict_ctr(self, ctx, ad_idx): return float((self.A_inv[ad_idx] @ self.b[ad_idx]) @ ctx) def update(self, ctx, action, reward): x = ctx Ai = self.A_inv[action] Aix = Ai @ x self.A_inv[action] = Ai - np.outer(Aix, Aix) / (1.0 + x @ Aix) self.A[action] += np.outer(x, x) self.b[action] += reward * x self.n_updates += 1 # ───────────────────────────────────────────────────────────────────────────── # Global state # ───────────────────────────────────────────────────────────────────────────── ALGO_KEYS = ["epsilon_greedy", "ucb1", "thompson", "linucb"] ALGO_CLASSES = { "epsilon_greedy": EpsilonGreedyNeuralBandit, "ucb1": UCB1Bandit, "thompson": ThompsonSamplingBandit, "linucb": LinUCBBandit, } ALGO_DISPLAY = { "epsilon_greedy": "ε-Greedy", "ucb1": "UCB1", "thompson": "Thompson", "linucb": "LinUCB", } ALGO_COLORS = { "epsilon_greedy": "#f59e0b", "ucb1": "#10b981", "thompson": "#3b82f6", "linucb": "#ef4444", } algorithms = {k: cls() for k, cls in ALGO_CLASSES.items()} sim_lock = threading.Lock() sim_state = {"running": False, "step": 0, "total": 0, "last_results": None} # ───────────────────────────────────────────────────────────────────────────── # HTML Template # ───────────────────────────────────────────────────────────────────────────── TEMPLATE = """ AdRL Studio
Live Ad Serving
Model Ready
🎓 User Context
🎭 Algorithm Recommendations
ε-Greedy
UCB1
Thompson
LinUCB
⚙ Simulation Settings
1,00010,000
1100
📈 Rolling CTR (100-impression window)
📊 Simulation Summary

Run a simulation to see results.

📉 Cumulative Regret Comparison

Cumulative regret measures the total reward missed vs. always picking the oracle best arm. Lower is better. LinUCB and Thompson typically achieve sub-linear regret.

📋 Regret Summary

Run a simulation first (Online Learning tab).

⚖ A/B Test Settings
1,00020,000
🌡 Reward Landscape Settings

Estimated CTR for each user content category × ad category pair. Context held at: adult, desktop, afternoon, north_america.

📈 Estimated CTR Heatmap
""" # ───────────────────────────────────────────────────────────────────────────── # Flask routes # ───────────────────────────────────────────────────────────────────────────── @app.route('/') def index(): return render_template_string(TEMPLATE) @app.route('/api/status') def api_status(): with sim_lock: return jsonify({ "running": sim_state["running"], "step": sim_state["step"], "total": sim_state["total"], }) @app.route('/api/recommend', methods=['POST']) def api_recommend(): data = request.get_json(force=True) try: ctx = encode_context( data['age'], data['device'], data['tod'], data['content'], data['region'] ) except (KeyError, ValueError) as e: return jsonify({"error": str(e)}), 400 result = {} for key, algo in algorithms.items(): ad_idx = algo.select(ctx) score = algo.predict_ctr(ctx, ad_idx) ad_id = AD_IDS[ad_idx] result[key] = { "ad_id": ad_id, "category": AD_CAT_MAP[ad_id], "format": AD_FORMATS[ad_id], "bid": AD_BIDS[ad_id], "score": round(score, 4), } return jsonify(result) @app.route('/api/simulate', methods=['POST']) def api_simulate(): data = request.get_json(force=True) n_impressions = int(data.get('n_impressions', 3000)) seed = int(data.get('seed', 42)) n_impressions = max(1000, min(10000, n_impressions)) def generate(): # Reset all algorithm states for algo in algorithms.values(): algo.reset() np.random.seed(seed) with sim_lock: sim_state['running'] = True sim_state['step'] = 0 sim_state['total'] = n_impressions rewards = {k: [] for k in ALGO_KEYS} checkpoint_interval = 50 # Per-checkpoint rolling window (last 100 impressions) rolling_window = 100 rolling_ctr_series = {k: [] for k in ALGO_KEYS} steps_series = [] # Incremental cumulative regret (avoids O(n²) post-loop recomputation) cum_regret = {k: 0.0 for k in ALGO_KEYS} cum_regret_series = {k: [] for k in ALGO_KEYS} for t in range(n_impressions): ctx = sample_random_context() # Vectorized oracle best arm all_ctrs = np.clip(_sigmoid(_TRUE_WEIGHTS @ ctx), 0.02, 0.25) oracle_idx = int(np.argmax(all_ctrs)) oracle_r = int(np.random.rand() < all_ctrs[oracle_idx]) # Each algorithm selects, receives reward, updates for k, algo in algorithms.items(): act = algo.select(ctx) r = int(np.random.rand() < all_ctrs[act]) algo.update(ctx, act, r) rewards[k].append(r) cum_regret[k] += oracle_r - r # Checkpoint every `checkpoint_interval` steps if (t + 1) % checkpoint_interval == 0 or t == n_impressions - 1: steps_series.append(t + 1) for k in ALGO_KEYS: start = max(0, len(rewards[k]) - rolling_window) window = rewards[k][start:] rolling_ctr_series[k].append(round(sum(window) / len(window), 4)) cum_regret_series[k].append(round(cum_regret[k], 4)) with sim_lock: sim_state['step'] = t + 1 payload = { "step": t + 1, "total": n_impressions, "done": False, } yield f"data: {json.dumps(payload)}\n\n" # Final payload with full series final_ctr = {k: round(sum(rewards[k]) / len(rewards[k]), 4) for k in ALGO_KEYS} total_rew = {k: int(sum(rewards[k])) for k in ALGO_KEYS} n_upd = {k: algorithms[k].n_updates for k in ALGO_KEYS} # Store for /api/regret with sim_lock: sim_state['running'] = False sim_state['last_results'] = { 'steps': steps_series, 'cumulative_regret': cum_regret_series, 'final_regret': {k: cum_regret_series[k][-1] for k in ALGO_KEYS}, 'avg_regret': {k: round(cum_regret_series[k][-1] / n_impressions, 5) for k in ALGO_KEYS}, } final_payload = { "done": True, "step": n_impressions, "total": n_impressions, "n_impressions": n_impressions, "steps": steps_series, "rolling_ctr": rolling_ctr_series, "final_ctr": final_ctr, "total_reward": total_rew, "n_updates": n_upd, } yield f"data: {json.dumps(final_payload)}\n\n" return Response( generate(), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}, ) @app.route('/api/regret') def api_regret(): with sim_lock: results = sim_state.get('last_results') if results is None: return jsonify({"error": "No simulation results available. Run a simulation first."}), 404 return jsonify(results) @app.route('/api/abtest', methods=['POST']) def api_abtest(): data = request.get_json(force=True) key_a = data.get('policy_a', 'linucb') key_b = data.get('policy_b', 'ucb1') n_tot = int(data.get('n_impressions', 5000)) n_tot = max(1000, min(20000, n_tot)) if key_a not in ALGO_CLASSES or key_b not in ALGO_CLASSES: return jsonify({"error": "Invalid policy key"}), 400 if key_a == key_b: return jsonify({"error": "Policy A and B must differ"}), 400 algo_a = ALGO_CLASSES[key_a]() algo_b = ALGO_CLASSES[key_b]() n_each = n_tot // 2 np.random.seed(1) r_a, r_b = [], [] for _ in range(n_each): ctx = sample_random_context() act = algo_a.select(ctx) rew = int(np.random.rand() < true_ctr(act, ctx)) algo_a.update(ctx, act, rew) r_a.append(rew) for _ in range(n_each): ctx = sample_random_context() act = algo_b.select(ctx) rew = int(np.random.rand() < true_ctr(act, ctx)) algo_b.update(ctx, act, rew) r_b.append(rew) n1, n2 = len(r_a), len(r_b) p1, p2 = sum(r_a) / n1, sum(r_b) / n2 p_pool = (sum(r_a) + sum(r_b)) / (n1 + n2) se = math.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2)) if p_pool not in (0, 1) else 1e-9 z = (p1 - p2) / se p_value = float(2 * (1 - stats.norm.cdf(abs(z)))) se_diff = math.sqrt(p1*(1-p1)/n1 + p2*(1-p2)/n2) ci_low = (p1 - p2) - 1.96 * se_diff ci_high = (p1 - p2) + 1.96 * se_diff return jsonify({ "ctr_a": round(p1, 5), "ctr_b": round(p2, 5), "n_a": n1, "n_b": n2, "lift_abs": round(p1 - p2, 5), "lift_rel": round((p1 - p2) / max(p2, 1e-9), 5), "z_stat": round(z, 4), "p_value": round(p_value, 5), "ci_low": round(ci_low, 5), "ci_high": round(ci_high, 5), "significant": p_value < 0.05, }) @app.route('/api/heatmap', methods=['POST']) def api_heatmap(): data = request.get_json(force=True) algo_key = data.get('algorithm', 'linucb') if algo_key not in algorithms: return jsonify({"error": "Invalid algorithm"}), 400 algo = algorithms[algo_key] ad_cats = ["Tech", "Fashion", "Finance", "Food", "Travel"] matrix = [] for content in CONTENT_CATS: row = [] for ad_cat in ad_cats: # Representative ad: first ad of this category ad_idx_for_cat = ad_cats.index(ad_cat) * 4 # ad_01, ad_05, ad_09, ad_13, ad_17 ctx = encode_context("adult", "desktop", "afternoon", content, "north_america") score = algo.predict_ctr(ctx, ad_idx_for_cat) row.append(round(float(score), 5)) matrix.append(row) return jsonify({ "matrix": matrix, "content_cats": CONTENT_CATS, "ad_cats": ad_cats, "algorithm": algo_key, }) # ───────────────────────────────────────────────────────────────────────────── if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)