RL-Recommendation-System / src /data /ad_dataset.py
mnoorchenar's picture
Update 2026-03-23 09:33:03
f19eb84
"""
Synthetic advertising dataset.
Generates realistic user profiles, ad inventory, and impression logs
using a latent-factor model to simulate interest-based click / conversion
probabilities that mirror real-world RTB (Real-Time Bidding) behaviour.
Key domain concepts implemented
--------------------------------
* Users – demographic profile + interest vector
* Ads – advertiser, category, format, bid price, targeting
* Impressions – (user, ad, timestamp, clicked, converted, dwell_time, revenue)
* Fatigue – click-probability decays with repeated exposure to same ad
* Reward – composite: click + conversion bonus − fatigue penalty + revenue signal
"""
import os
import numpy as np
import pandas as pd
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CATEGORIES = ['tech', 'fashion', 'sports', 'travel', 'food',
'finance', 'health', 'gaming', 'music', 'automotive']
ADVERTISERS = ['TechCorp', 'FashionHub', 'SportsPro', 'TravelEase',
'FoodieDeals', 'FinancePlus', 'HealthFirst', 'GamingZone',
'MusicStream', 'AutoDrive']
AD_FORMATS = ['banner', 'video', 'native', 'carousel']
AGE_GROUPS = ['18-24', '25-34', '35-44', '45-54', '55+']
GENDERS = ['M', 'F', 'Other']
DEVICES = ['mobile', 'desktop', 'tablet']
# reward weights
R_CLICK = 1.0
R_CONVERT = 3.0
R_FATIGUE = 0.2 # subtracted per repeated impression beyond 2
R_REVENUE = 0.001 # per dollar of bid price
# ---------------------------------------------------------------------------
class AdDataset:
def __init__(self, data_dir: str = 'data', n_users: int = 300, n_ads: int = 100):
self.data_dir = data_dir
self.n_users = n_users
self.n_ads = n_ads
os.makedirs(data_dir, exist_ok=True)
users_path = os.path.join(data_dir, 'users.csv')
ads_path = os.path.join(data_dir, 'ads.csv')
imp_path = os.path.join(data_dir, 'impressions.csv')
if all(os.path.exists(p) for p in (users_path, ads_path, imp_path)):
self.users_df = pd.read_csv(users_path)
self.ads_df = pd.read_csv(ads_path)
self.impressions_df = pd.read_csv(imp_path)
else:
self._generate()
self._preprocess()
# ------------------------------------------------------------------
# Data generation
# ------------------------------------------------------------------
def _generate(self):
np.random.seed(42)
# ── Users ──────────────────────────────────────────────────────
users = []
for uid in range(self.n_users):
n_interests = np.random.randint(1, 4)
interests = np.random.choice(CATEGORIES, size=n_interests, replace=False)
users.append({
'user_id': uid,
'age_group': np.random.choice(AGE_GROUPS),
'gender': np.random.choice(GENDERS),
'interests': '|'.join(interests),
'device': np.random.choice(DEVICES),
})
self.users_df = pd.DataFrame(users)
# ── Ads ────────────────────────────────────────────────────────
ads = []
for ad_id in range(self.n_ads):
cat = CATEGORIES[ad_id % len(CATEGORIES)]
adv = ADVERTISERS[ad_id % len(ADVERTISERS)]
fmt = np.random.choice(AD_FORMATS)
bid = round(np.random.uniform(0.5, 8.0), 2)
budget = round(np.random.uniform(50, 500), 0)
n_tgt = np.random.randint(2, 5)
target_ages = '|'.join(np.random.choice(AGE_GROUPS, size=n_tgt, replace=False))
headlines = {
'tech': 'Upgrade your tech today!', 'fashion': 'Style up this season!',
'sports': 'Gear up for greatness!', 'travel': 'Explore the world now!',
'food': 'Delicious deals await!', 'finance': 'Grow your wealth today!',
'health': 'Live healthier, longer!', 'gaming': 'Level up your game!',
'music': 'Discover new sounds!', 'automotive': 'Drive your dream car!',
}
ads.append({
'ad_id': ad_id,
'advertiser': adv,
'category': cat,
'format': fmt,
'bid_price': bid,
'daily_budget': budget,
'target_ages': target_ages,
'headline': headlines[cat],
'ctr_base': round(np.random.uniform(0.02, 0.06), 4),
'cvr_base': round(np.random.uniform(0.05, 0.15), 4),
})
self.ads_df = pd.DataFrame(ads)
# ── Impressions ────────────────────────────────────────────────
impressions = []
imp_id = 0
for uid in range(self.n_users):
user = self.users_df.iloc[uid]
u_ints = set(user['interests'].split('|'))
n_imps = np.random.randint(30, 100)
ad_ids = np.random.choice(self.n_ads, size=n_imps, replace=True)
freq_count = {} # ad → exposure count for fatigue
for t, ad_id in enumerate(ad_ids):
ad = self.ads_df.iloc[ad_id]
freq = freq_count.get(ad_id, 0)
freq_count[ad_id] = freq + 1
# click probability: base + interest boost + fatigue decay
match = ad['category'] in u_ints
ctr = ad['ctr_base'] * (3.0 if match else 1.0) * (0.8 ** max(0, freq - 1))
ctr = min(ctr, 0.35)
clicked = int(np.random.rand() < ctr)
cvr = ad['cvr_base'] * (1.5 if match else 1.0) if clicked else 0
converted = int(np.random.rand() < cvr)
dwell = np.random.exponential(15.0) if clicked else np.random.exponential(2.0)
revenue = ad['bid_price'] if clicked else 0.0
# reward
fatigue_pen = max(0, freq - 1)
reward = (
R_CLICK * clicked
+ R_CONVERT * converted
- R_FATIGUE * fatigue_pen
+ R_REVENUE * revenue
)
hour = np.random.randint(0, 24)
dow = np.random.randint(0, 7)
impressions.append({
'impression_id': imp_id,
'user_id': uid,
'ad_id': int(ad_id),
'timestamp': 1_000_000 + uid * 1000 + t,
'clicked': clicked,
'converted': converted,
'dwell_time': round(dwell, 2),
'revenue': round(revenue, 4),
'reward': round(reward, 4),
'freq_count': freq,
'hour_of_day': hour,
'day_of_week': dow,
})
imp_id += 1
self.impressions_df = pd.DataFrame(impressions)
# Save
self.users_df.to_csv(os.path.join(self.data_dir, 'users.csv'), index=False)
self.ads_df.to_csv(os.path.join(self.data_dir, 'ads.csv'), index=False)
self.impressions_df.to_csv(os.path.join(self.data_dir, 'impressions.csv'), index=False)
# ------------------------------------------------------------------
# Preprocessing
# ------------------------------------------------------------------
def _preprocess(self):
df = self.impressions_df.sort_values('timestamp')
# Build per-user interaction sequences
self.user_sequences: dict = {}
for uid, grp in df.groupby('user_id'):
self.user_sequences[int(uid)] = list(zip(
grp['ad_id'].values.tolist(),
grp['reward'].values.tolist(),
grp['clicked'].values.tolist(),
grp['converted'].values.tolist(),
))
# Train / test split (80 / 20 per user)
train_rows, test_rows = [], []
for uid, seq in self.user_sequences.items():
split = max(1, int(len(seq) * 0.8))
for i, (ad_id, reward, clicked, converted) in enumerate(seq):
row = {'user_id': uid, 'ad_id': ad_id, 'reward': reward,
'clicked': clicked, 'converted': converted}
(train_rows if i < split else test_rows).append(row)
self.train_df = pd.DataFrame(train_rows)
self.test_df = pd.DataFrame(test_rows)
self.n_users_actual = int(df['user_id'].nunique())
self.n_ads_actual = int(df['ad_id'].nunique())
# Pre-compute aggregate analytics
self._compute_analytics()
def _compute_analytics(self):
df = self.impressions_df
total_imp = len(df)
total_clk = df['clicked'].sum()
total_conv = df['converted'].sum()
total_rev = df['revenue'].sum()
self.analytics = {
'total_impressions': int(total_imp),
'total_clicks': int(total_clk),
'total_conversions': int(total_conv),
'total_revenue': round(float(total_rev), 2),
'ctr': round(float(total_clk / max(1, total_imp)), 4),
'cvr': round(float(total_conv / max(1, total_clk)), 4),
'ecpm': round(float(total_rev / max(1, total_imp) * 1000), 4),
}
# By category
merged = df.merge(self.ads_df[['ad_id', 'category', 'advertiser', 'bid_price']],
on='ad_id', how='left')
cat_stats = []
for cat, g in merged.groupby('category'):
cat_stats.append({
'category': cat,
'impressions': int(len(g)),
'clicks': int(g['clicked'].sum()),
'conversions': int(g['converted'].sum()),
'revenue': round(float(g['revenue'].sum()), 2),
'ctr': round(float(g['clicked'].mean()), 4),
})
self.analytics['by_category'] = cat_stats
# By advertiser
adv_stats = []
for adv, g in merged.groupby('advertiser'):
spend = g['revenue'].sum()
conv = g['converted'].sum()
adv_stats.append({
'advertiser': adv,
'impressions': int(len(g)),
'clicks': int(g['clicked'].sum()),
'conversions': int(conv),
'spend': round(float(spend), 2),
'ctr': round(float(g['clicked'].mean()), 4),
'roas': round(float(conv * 50 / max(0.01, spend)), 2),
})
self.analytics['by_advertiser'] = adv_stats
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def get_user_history(self, user_id: int, max_len: int = 20) -> list:
"""Returns [(ad_id, reward, clicked, converted), ...]."""
return self.user_sequences.get(user_id, [])[-max_len:]
def get_user_features(self, user_id: int) -> np.ndarray:
"""Return a 21-dim binary/float feature vector for the user."""
row = self.users_df[self.users_df['user_id'] == user_id]
if len(row) == 0:
return np.zeros(21, dtype=np.float32)
r = row.iloc[0]
age_oh = np.zeros(5); age_oh[AGE_GROUPS.index(r['age_group'])] = 1
gen_oh = np.zeros(3); gen_oh[GENDERS.index(r['gender'])] = 1
int_oh = np.zeros(10)
for interest in str(r['interests']).split('|'):
if interest in CATEGORIES:
int_oh[CATEGORIES.index(interest)] = 1
dev_oh = np.zeros(3); dev_oh[DEVICES.index(r['device'])] = 1
return np.concatenate([age_oh, gen_oh, int_oh, dev_oh]).astype(np.float32)
def get_context_features(self, hour: int = None, dow: int = None) -> np.ndarray:
"""4-dim sinusoidal encoding of hour and day-of-week."""
if hour is None:
import datetime
now = datetime.datetime.now()
hour = now.hour
dow = now.weekday()
return np.array([
np.sin(2 * np.pi * hour / 24),
np.cos(2 * np.pi * hour / 24),
np.sin(2 * np.pi * dow / 7),
np.cos(2 * np.pi * dow / 7),
], dtype=np.float32)
def get_ad_info(self, ad_id: int) -> dict:
row = self.ads_df[self.ads_df['ad_id'] == ad_id]
if len(row) == 0:
return {'ad_id': ad_id, 'advertiser': 'Unknown', 'category': 'unknown',
'format': 'banner', 'bid_price': 1.0, 'headline': '—'}
r = row.iloc[0]
return {
'ad_id': int(ad_id),
'advertiser': str(r['advertiser']),
'category': str(r['category']),
'format': str(r['format']),
'bid_price': float(r['bid_price']),
'headline': str(r['headline']),
'ctr_base': float(r['ctr_base']),
'cvr_base': float(r['cvr_base']),
}
def get_user_profile(self, user_id: int) -> dict:
row = self.users_df[self.users_df['user_id'] == user_id]
if len(row) == 0:
return {}
r = row.iloc[0]
return {
'user_id': int(user_id),
'age_group': str(r['age_group']),
'gender': str(r['gender']),
'interests': str(r['interests']).split('|'),
'device': str(r['device']),
}