"""
AdRL Studio — Contextual Bandit Ad Recommendation Engine
This application implements and benchmarks four reinforcement learning
contextual bandit algorithms for ad recommendation: (1) ε-Greedy Neural
Bandit using a shared PyTorch MLP, (2) UCB1 (Upper Confidence Bound),
a non-contextual baseline, (3) Thompson Sampling with Beta distribution
priors, and (4) LinUCB Disjoint Model, the industry-standard contextual
bandit used in production ad systems. The simulated environment features
20 ads across 5 categories and 5 user context features (age group, device,
time of day, content category, region) encoded as a 19-dimensional one-hot
vector. True click-through rates are determined by hidden weight vectors
initialized at startup (seed=42). Algorithms observe only bandit feedback
— the reward for the chosen arm only — and must balance exploration
vs. exploitation to minimize cumulative regret.
"""
import json
import math
import threading
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from flask import Flask, Response, jsonify, render_template_string, request
from scipy import stats
app = Flask(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Environment constants
# ─────────────────────────────────────────────────────────────────────────────
np.random.seed(42)
AGE_GROUPS = ["young_adult", "adult", "senior"]
DEVICES = ["mobile", "desktop", "tablet"]
TIMES_OF_DAY = ["morning", "afternoon", "evening", "night"]
CONTENT_CATS = ["tech", "sports", "lifestyle", "news", "entertainment"]
REGIONS = ["north_america", "europe", "asia", "other"]
CONTEXT_DIM = len(AGE_GROUPS) + len(DEVICES) + len(TIMES_OF_DAY) + len(CONTENT_CATS) + len(REGIONS) # 19
N_ADS = 20
AD_IDS = [f"ad_{i:02d}" for i in range(1, 21)]
# Category mapping
AD_CAT_MAP = {}
for i, ad in enumerate(AD_IDS):
cats = ["Tech","Fashion","Finance","Food","Travel"]
AD_CAT_MAP[ad] = cats[i // 4]
AD_FORMATS = {
"ad_01":"banner","ad_02":"video","ad_03":"native","ad_04":"banner",
"ad_05":"banner","ad_06":"video","ad_07":"banner","ad_08":"native",
"ad_09":"native","ad_10":"banner","ad_11":"video","ad_12":"native",
"ad_13":"banner","ad_14":"native","ad_15":"banner","ad_16":"video",
"ad_17":"video","ad_18":"banner","ad_19":"native","ad_20":"video",
}
AD_BIDS = {
"ad_01":2.50,"ad_02":3.00,"ad_03":3.50,"ad_04":4.00,
"ad_05":1.50,"ad_06":2.00,"ad_07":2.50,"ad_08":3.00,
"ad_09":3.00,"ad_10":3.50,"ad_11":4.00,"ad_12":5.00,
"ad_13":1.00,"ad_14":1.50,"ad_15":2.00,"ad_16":2.50,
"ad_17":2.00,"ad_18":2.50,"ad_19":3.00,"ad_20":3.50,
}
# Hidden true CTR weights — fixed at startup, never exposed to algorithms
_TRUE_WEIGHTS = np.random.randn(N_ADS, CONTEXT_DIM) * 0.3
def _sigmoid(x):
return 1.0 / (1.0 + np.exp(-np.clip(x, -20, 20)))
def true_ctr(ad_idx, ctx):
return float(np.clip(_sigmoid(ctx @ _TRUE_WEIGHTS[ad_idx]), 0.02, 0.25))
def encode_context(age, device, tod, content, region):
vec = np.zeros(CONTEXT_DIM, dtype=np.float32)
offset = 0
vec[offset + AGE_GROUPS.index(age)] = 1.0; offset += len(AGE_GROUPS)
vec[offset + DEVICES.index(device)] = 1.0; offset += len(DEVICES)
vec[offset + TIMES_OF_DAY.index(tod)] = 1.0; offset += len(TIMES_OF_DAY)
vec[offset + CONTENT_CATS.index(content)] = 1.0; offset += len(CONTENT_CATS)
vec[offset + REGIONS.index(region)] = 1.0
return vec
def sample_random_context():
return encode_context(
np.random.choice(AGE_GROUPS), np.random.choice(DEVICES),
np.random.choice(TIMES_OF_DAY), np.random.choice(CONTENT_CATS),
np.random.choice(REGIONS),
)
# ─────────────────────────────────────────────────────────────────────────────
# Algorithm classes
# ─────────────────────────────────────────────────────────────────────────────
class EpsilonGreedyNeuralBandit:
NAME = "ε-Greedy"
COLOR = "#f59e0b"
def __init__(self, epsilon=0.15, epsilon_min=0.01, decay=0.995, lr=0.01):
self.epsilon_0 = epsilon
self.epsilon_min = epsilon_min
self.decay = decay
self.lr = lr
self.reset()
def reset(self):
self.t = 0
self.n_updates = 0
self.model = nn.Sequential(
nn.Linear(CONTEXT_DIM + N_ADS, 32), nn.ReLU(),
nn.Linear(32, 16), nn.ReLU(),
nn.Linear(16, 1), nn.Sigmoid(),
)
self.optimizer = optim.SGD(self.model.parameters(), lr=self.lr)
self.criterion = nn.MSELoss()
def _inp(self, ctx, ad_idx):
oh = np.zeros(N_ADS, dtype=np.float32); oh[ad_idx] = 1.0
return torch.FloatTensor(np.concatenate([ctx, oh]))
def _pred(self, ctx, ad_idx):
self.model.eval()
with torch.no_grad():
return self.model(self._inp(ctx, ad_idx)).item()
def select(self, ctx):
eps = max(self.epsilon_min, self.epsilon_0 * (self.decay ** self.t))
if np.random.rand() < eps:
return int(np.random.randint(N_ADS))
ctx_rep = np.tile(ctx, (N_ADS, 1))
ad_eye = np.eye(N_ADS, dtype=np.float32)
batch = torch.FloatTensor(np.hstack([ctx_rep, ad_eye]))
self.model.eval()
with torch.no_grad():
scores = self.model(batch).squeeze().numpy()
return int(np.argmax(scores))
def predict_ctr(self, ctx, ad_idx):
return self._pred(ctx, ad_idx)
def update(self, ctx, action, reward):
self.model.train()
x = self._inp(ctx, action).unsqueeze(0)
y = torch.FloatTensor([[float(reward)]])
self.optimizer.zero_grad()
self.criterion(self.model(x), y).backward()
self.optimizer.step()
self.t += 1
self.n_updates += 1
class UCB1Bandit:
NAME = "UCB1"
COLOR = "#10b981"
def __init__(self):
self.reset()
def reset(self):
self.n_a = np.zeros(N_ADS)
self.R_a = np.zeros(N_ADS)
self.t = 0
self._init_idx = 0
self.n_updates = 0
def select(self, ctx):
if self._init_idx < N_ADS:
return self._init_idx
mu = self.R_a / np.maximum(self.n_a, 1)
bonus = np.sqrt(2.0 * np.log(max(self.t, 1)) / np.maximum(self.n_a, 1))
return int(np.argmax(mu + bonus))
def predict_ctr(self, ctx, ad_idx):
if self.n_a[ad_idx] == 0:
return 0.0
return float(self.R_a[ad_idx] / self.n_a[ad_idx])
def update(self, ctx, action, reward):
if self._init_idx < N_ADS:
self._init_idx += 1
self.n_a[action] += 1
self.R_a[action] += reward
self.t += 1
self.n_updates += 1
class ThompsonSamplingBandit:
NAME = "Thompson"
COLOR = "#3b82f6"
def __init__(self):
self.reset()
def reset(self):
self.alpha = np.ones(N_ADS)
self.beta_p = np.ones(N_ADS)
self.n_updates = 0
def select(self, ctx):
return int(np.argmax(np.random.beta(self.alpha, self.beta_p)))
def predict_ctr(self, ctx, ad_idx):
return float(self.alpha[ad_idx] / (self.alpha[ad_idx] + self.beta_p[ad_idx]))
def update(self, ctx, action, reward):
if reward == 1:
self.alpha[action] += 1
else:
self.beta_p[action] += 1
self.n_updates += 1
class LinUCBBandit:
NAME = "LinUCB"
COLOR = "#ef4444"
def __init__(self, alpha=1.0):
self.alpha = alpha
self.reset()
def reset(self):
d = CONTEXT_DIM
self.A = [np.identity(d) for _ in range(N_ADS)]
self.A_inv = [np.identity(d) for _ in range(N_ADS)]
self.b = [np.zeros(d) for _ in range(N_ADS)]
self.n_updates = 0
def _ucb_score(self, ctx, ad_idx):
A_inv = self.A_inv[ad_idx]
theta = A_inv @ self.b[ad_idx]
x = ctx
return float(theta @ x + self.alpha * math.sqrt(max(float(x @ A_inv @ x), 0.0)))
def select(self, ctx):
return int(np.argmax([self._ucb_score(ctx, a) for a in range(N_ADS)]))
def predict_ctr(self, ctx, ad_idx):
return float((self.A_inv[ad_idx] @ self.b[ad_idx]) @ ctx)
def update(self, ctx, action, reward):
x = ctx
Ai = self.A_inv[action]
Aix = Ai @ x
self.A_inv[action] = Ai - np.outer(Aix, Aix) / (1.0 + x @ Aix)
self.A[action] += np.outer(x, x)
self.b[action] += reward * x
self.n_updates += 1
# ─────────────────────────────────────────────────────────────────────────────
# Global state
# ─────────────────────────────────────────────────────────────────────────────
ALGO_KEYS = ["epsilon_greedy", "ucb1", "thompson", "linucb"]
ALGO_CLASSES = {
"epsilon_greedy": EpsilonGreedyNeuralBandit,
"ucb1": UCB1Bandit,
"thompson": ThompsonSamplingBandit,
"linucb": LinUCBBandit,
}
ALGO_DISPLAY = {
"epsilon_greedy": "ε-Greedy", "ucb1": "UCB1",
"thompson": "Thompson", "linucb": "LinUCB",
}
ALGO_COLORS = {
"epsilon_greedy": "#f59e0b", "ucb1": "#10b981",
"thompson": "#3b82f6", "linucb": "#ef4444",
}
algorithms = {k: cls() for k, cls in ALGO_CLASSES.items()}
sim_lock = threading.Lock()
sim_state = {"running": False, "step": 0, "total": 0, "last_results": None}
# ─────────────────────────────────────────────────────────────────────────────
# HTML Template
# ─────────────────────────────────────────────────────────────────────────────
TEMPLATE = """
AdRL Studio
🎬 AdRL Studio
Contextual Bandit Ad Engine
Live Ad ServingModel Ready
🎓 User Context
🎭 Algorithm Recommendations
ε-Greedy
—
—
—
UCB1
—
—
—
Thompson
—
—
—
LinUCB
—
—
—
⚙ Simulation Settings
1,00010,000
1100
📈 Rolling CTR (100-impression window)
📊 Simulation Summary
Run a simulation to see results.
📉 Cumulative Regret Comparison
Cumulative regret measures the total reward missed vs. always picking the oracle best arm.
Lower is better. LinUCB and Thompson typically achieve sub-linear regret.
📋 Regret Summary
Run a simulation first (Online Learning tab).
⚖ A/B Test Settings
1,00020,000
📊 A/B Test Results
—
Policy A CTR
—
Policy B CTR
—
Absolute Lift
—
Relative Lift
—
Z-Statistic
—
P-Value
—
95% CI (Lift)
—
Verdict
🌡 Reward Landscape Settings
Estimated CTR for each user content category × ad category pair. Context held at: adult, desktop, afternoon, north_america.
📈 Estimated CTR Heatmap
"""
# ─────────────────────────────────────────────────────────────────────────────
# Flask routes
# ─────────────────────────────────────────────────────────────────────────────
@app.route('/')
def index():
return render_template_string(TEMPLATE)
@app.route('/api/status')
def api_status():
with sim_lock:
return jsonify({
"running": sim_state["running"],
"step": sim_state["step"],
"total": sim_state["total"],
})
@app.route('/api/recommend', methods=['POST'])
def api_recommend():
data = request.get_json(force=True)
try:
ctx = encode_context(
data['age'], data['device'], data['tod'],
data['content'], data['region']
)
except (KeyError, ValueError) as e:
return jsonify({"error": str(e)}), 400
result = {}
for key, algo in algorithms.items():
ad_idx = algo.select(ctx)
score = algo.predict_ctr(ctx, ad_idx)
ad_id = AD_IDS[ad_idx]
result[key] = {
"ad_id": ad_id,
"category": AD_CAT_MAP[ad_id],
"format": AD_FORMATS[ad_id],
"bid": AD_BIDS[ad_id],
"score": round(score, 4),
}
return jsonify(result)
@app.route('/api/simulate', methods=['POST'])
def api_simulate():
data = request.get_json(force=True)
n_impressions = int(data.get('n_impressions', 3000))
seed = int(data.get('seed', 42))
n_impressions = max(1000, min(10000, n_impressions))
def generate():
# Reset all algorithm states
for algo in algorithms.values():
algo.reset()
np.random.seed(seed)
with sim_lock:
sim_state['running'] = True
sim_state['step'] = 0
sim_state['total'] = n_impressions
rewards = {k: [] for k in ALGO_KEYS}
checkpoint_interval = 50
# Per-checkpoint rolling window (last 100 impressions)
rolling_window = 100
rolling_ctr_series = {k: [] for k in ALGO_KEYS}
steps_series = []
# Incremental cumulative regret (avoids O(n²) post-loop recomputation)
cum_regret = {k: 0.0 for k in ALGO_KEYS}
cum_regret_series = {k: [] for k in ALGO_KEYS}
for t in range(n_impressions):
ctx = sample_random_context()
# Vectorized oracle best arm
all_ctrs = np.clip(_sigmoid(_TRUE_WEIGHTS @ ctx), 0.02, 0.25)
oracle_idx = int(np.argmax(all_ctrs))
oracle_r = int(np.random.rand() < all_ctrs[oracle_idx])
# Each algorithm selects, receives reward, updates
for k, algo in algorithms.items():
act = algo.select(ctx)
r = int(np.random.rand() < all_ctrs[act])
algo.update(ctx, act, r)
rewards[k].append(r)
cum_regret[k] += oracle_r - r
# Checkpoint every `checkpoint_interval` steps
if (t + 1) % checkpoint_interval == 0 or t == n_impressions - 1:
steps_series.append(t + 1)
for k in ALGO_KEYS:
start = max(0, len(rewards[k]) - rolling_window)
window = rewards[k][start:]
rolling_ctr_series[k].append(round(sum(window) / len(window), 4))
cum_regret_series[k].append(round(cum_regret[k], 4))
with sim_lock:
sim_state['step'] = t + 1
payload = {
"step": t + 1,
"total": n_impressions,
"done": False,
}
yield f"data: {json.dumps(payload)}\n\n"
# Final payload with full series
final_ctr = {k: round(sum(rewards[k]) / len(rewards[k]), 4) for k in ALGO_KEYS}
total_rew = {k: int(sum(rewards[k])) for k in ALGO_KEYS}
n_upd = {k: algorithms[k].n_updates for k in ALGO_KEYS}
# Store for /api/regret
with sim_lock:
sim_state['running'] = False
sim_state['last_results'] = {
'steps': steps_series,
'cumulative_regret': cum_regret_series,
'final_regret': {k: cum_regret_series[k][-1] for k in ALGO_KEYS},
'avg_regret': {k: round(cum_regret_series[k][-1] / n_impressions, 5) for k in ALGO_KEYS},
}
final_payload = {
"done": True,
"step": n_impressions,
"total": n_impressions,
"n_impressions": n_impressions,
"steps": steps_series,
"rolling_ctr": rolling_ctr_series,
"final_ctr": final_ctr,
"total_reward": total_rew,
"n_updates": n_upd,
}
yield f"data: {json.dumps(final_payload)}\n\n"
return Response(
generate(),
mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'},
)
@app.route('/api/regret')
def api_regret():
with sim_lock:
results = sim_state.get('last_results')
if results is None:
return jsonify({"error": "No simulation results available. Run a simulation first."}), 404
return jsonify(results)
@app.route('/api/abtest', methods=['POST'])
def api_abtest():
data = request.get_json(force=True)
key_a = data.get('policy_a', 'linucb')
key_b = data.get('policy_b', 'ucb1')
n_tot = int(data.get('n_impressions', 5000))
n_tot = max(1000, min(20000, n_tot))
if key_a not in ALGO_CLASSES or key_b not in ALGO_CLASSES:
return jsonify({"error": "Invalid policy key"}), 400
if key_a == key_b:
return jsonify({"error": "Policy A and B must differ"}), 400
algo_a = ALGO_CLASSES[key_a]()
algo_b = ALGO_CLASSES[key_b]()
n_each = n_tot // 2
np.random.seed(1)
r_a, r_b = [], []
for _ in range(n_each):
ctx = sample_random_context()
act = algo_a.select(ctx)
rew = int(np.random.rand() < true_ctr(act, ctx))
algo_a.update(ctx, act, rew)
r_a.append(rew)
for _ in range(n_each):
ctx = sample_random_context()
act = algo_b.select(ctx)
rew = int(np.random.rand() < true_ctr(act, ctx))
algo_b.update(ctx, act, rew)
r_b.append(rew)
n1, n2 = len(r_a), len(r_b)
p1, p2 = sum(r_a) / n1, sum(r_b) / n2
p_pool = (sum(r_a) + sum(r_b)) / (n1 + n2)
se = math.sqrt(p_pool * (1 - p_pool) * (1/n1 + 1/n2)) if p_pool not in (0, 1) else 1e-9
z = (p1 - p2) / se
p_value = float(2 * (1 - stats.norm.cdf(abs(z))))
se_diff = math.sqrt(p1*(1-p1)/n1 + p2*(1-p2)/n2)
ci_low = (p1 - p2) - 1.96 * se_diff
ci_high = (p1 - p2) + 1.96 * se_diff
return jsonify({
"ctr_a": round(p1, 5),
"ctr_b": round(p2, 5),
"n_a": n1,
"n_b": n2,
"lift_abs": round(p1 - p2, 5),
"lift_rel": round((p1 - p2) / max(p2, 1e-9), 5),
"z_stat": round(z, 4),
"p_value": round(p_value, 5),
"ci_low": round(ci_low, 5),
"ci_high": round(ci_high, 5),
"significant": p_value < 0.05,
})
@app.route('/api/heatmap', methods=['POST'])
def api_heatmap():
data = request.get_json(force=True)
algo_key = data.get('algorithm', 'linucb')
if algo_key not in algorithms:
return jsonify({"error": "Invalid algorithm"}), 400
algo = algorithms[algo_key]
ad_cats = ["Tech", "Fashion", "Finance", "Food", "Travel"]
matrix = []
for content in CONTENT_CATS:
row = []
for ad_cat in ad_cats:
# Representative ad: first ad of this category
ad_idx_for_cat = ad_cats.index(ad_cat) * 4 # ad_01, ad_05, ad_09, ad_13, ad_17
ctx = encode_context("adult", "desktop", "afternoon", content, "north_america")
score = algo.predict_ctr(ctx, ad_idx_for_cat)
row.append(round(float(score), 5))
matrix.append(row)
return jsonify({
"matrix": matrix,
"content_cats": CONTENT_CATS,
"ad_cats": ad_cats,
"algorithm": algo_key,
})
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False, threaded=True)