alphaforge-quant-system / ab_testing.py
Premchan369's picture
Add A/B testing framework for strategy comparison with statistical significance testing
a512ac0 verified
"""A/B Testing Framework for Strategy Comparison
At Jane Street, Two Sigma, Citadel β€” EVERY change goes through A/B testing.
Not backtest-once-and-ship. Real randomized controlled trials.
Why A/B testing beats backtesting:
- Backtests: optimize on all data β†’ overfit
- A/B tests: train on A, test on B β†’ honest evaluation
- Statistical significance: p-values, not gut feeling
- Multiple comparison correction: Bonferroni, FDR
- Early stopping: peeking at results invalidates p-values
This module:
1. Randomized strategy assignment
2. Statistical tests (t-test, Mann-Whitney, permutation)
3. Power analysis (how long to run test)
4. Sequential testing (early stopping without p-hacking)
5. Multiple comparison correction
6. Counterfactual estimation (what would have happened with other strategy)
Based on:
- Kohavi et al. (2009): "Controlled experiments on the web"
- Johari et al. (2017): "Peeking at A/B Tests"
- Deng et al. (2013): "Trustworthy Online Controlled Experiments"
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Callable
from scipy import stats
from scipy.special import erfinv
from dataclasses import dataclass
import warnings
warnings.filterwarnings('ignore')
@dataclass
class ExperimentConfig:
"""Configuration for an A/B test"""
strategy_a_name: str
strategy_b_name: str
alpha: float = 0.05 # Significance level
power: float = 0.80 # Statistical power (1 - beta)
min_detectable_effect: float = 0.01 # Sharpe difference to detect
baseline_sharpe: float = 1.0
trading_days_per_year: int = 252
def required_samples(self) -> int:
"""
Calculate required sample size using power analysis.
For Sharpe ratio comparison with daily returns.
"""
# Standardized effect size
# Daily return variance β‰ˆ (annual_vol / sqrt(252))^2
# Assuming annual volatility β‰ˆ 0.15 (typical equity)
daily_vol = 0.15 / np.sqrt(self.trading_days_per_year)
# Difference in daily mean returns
# Sharpe = (mean_return - r_f) / vol
# So mean_return_diff = min_detectable_effect * vol
mean_diff = self.min_detectable_effect * daily_vol
# Pooled standard deviation (two independent samples)
pooled_std = daily_vol * np.sqrt(2)
# Cohen's d
cohens_d = mean_diff / pooled_std
# Sample size per group (two-tailed test)
z_alpha = stats.norm.ppf(1 - self.alpha / 2)
z_beta = stats.norm.ppf(self.power)
n_per_group = 2 * ((z_alpha + z_beta) / cohens_d) ** 2
return int(np.ceil(n_per_group))
class ABTest:
"""
A/B test for trading strategy comparison.
Critical design decisions:
1. Random assignment: which days/assets get A vs B
2. Stratification: ensure similar market conditions
3. Unit of diversion: per day? per asset? per trade?
4. Guardrail metrics: ensure B doesn't increase risk
"""
def __init__(self,
config: ExperimentConfig,
diversion_unit: str = 'day',
stratify_by: Optional[List[str]] = None):
self.config = config
self.diversion_unit = diversion_unit
self.stratify_by = stratify_by or []
# Results storage
self.group_a_results = []
self.group_b_results = []
self.assignment_log = []
# Sequential testing state
self.n_observations = 0
self.running_t_stat = 0
self.sequential_bounds = None
def assign(self,
unit_id: str,
covariates: Optional[Dict] = None) -> str:
"""
Randomly assign unit to A or B.
With stratification: balance A/B within strata.
"""
# Hash-based assignment for consistency
np.random.seed(hash(unit_id) % 2**32)
if covariates and self.stratify_by:
# Stratified assignment
stratum_key = '_'.join(str(covariates.get(k, '')) for k in self.stratify_by)
# Check existing assignments in stratum
stratum_assignments = [
log for log in self.assignment_log
if log.get('stratum') == stratum_key
]
n_a = sum(1 for log in stratum_assignments if log['group'] == 'A')
n_b = sum(1 for log in stratum_assignments if log['group'] == 'B')
# Alternate to maintain balance
if n_a <= n_b:
group = 'A'
else:
group = 'B'
else:
# Simple random assignment
group = 'A' if np.random.rand() < 0.5 else 'B'
log_entry = {
'unit_id': unit_id,
'group': group,
'timestamp': pd.Timestamp.now(),
'covariates': covariates or {}
}
if covariates and self.stratify_by:
log_entry['stratum'] = '_'.join(str(covariates.get(k, '')) for k in self.stratify_by)
self.assignment_log.append(log_entry)
return group
def record_result(self,
unit_id: str,
group: str,
primary_metric: float,
guardrail_metrics: Optional[Dict] = None):
"""
Record outcome for an assigned unit.
primary_metric: Usually P&L or Sharpe contribution
guardrail_metrics: Risk metrics (drawdown, volatility, etc.)
"""
result = {
'unit_id': unit_id,
'group': group,
'primary': primary_metric,
'guardrails': guardrail_metrics or {},
'timestamp': pd.Timestamp.now()
}
if group == 'A':
self.group_a_results.append(result)
else:
self.group_b_results.append(result)
self.n_observations += 1
def analyze(self,
metric: str = 'primary',
test_type: str = 't_test') -> Dict:
"""
Statistical analysis of A vs B.
test_type:
- 't_test': Student's t-test (assumes normality)
- 'mann_whitney': Non-parametric, robust to outliers
- 'permutation': Distribution-free via resampling
- 'bootstrap': Confidence intervals via resampling
"""
a_values = [r[metric] for r in self.group_a_results]
b_values = [r[metric] for r in self.group_b_results]
if len(a_values) < 3 or len(b_values) < 3:
return {
'status': 'insufficient_data',
'n_a': len(a_values),
'n_b': len(b_values),
'required_n': self.config.required_samples()
}
a_arr = np.array(a_values)
b_arr = np.array(b_values)
# Descriptive stats
results = {
'n_a': len(a_arr),
'n_b': len(b_arr),
'mean_a': np.mean(a_arr),
'mean_b': np.mean(b_arr),
'std_a': np.std(a_arr, ddof=1),
'std_b': np.std(b_arr, ddof=1),
'median_a': np.median(a_arr),
'median_b': np.median(b_arr),
}
# Effect size (Cohen's d)
pooled_std = np.sqrt((results['std_a']**2 + results['std_b']**2) / 2)
cohens_d = (results['mean_b'] - results['mean_a']) / (pooled_std + 1e-10)
results['cohens_d'] = cohens_d
results['effect_size_interpretation'] = self._interpret_cohens_d(abs(cohens_d))
# Statistical tests
if test_type == 't_test':
t_stat, p_value = stats.ttest_ind(a_arr, b_arr, equal_var=False)
results['test'] = 'welch_t_test'
results['t_statistic'] = t_stat
results['p_value'] = p_value
elif test_type == 'mann_whitney':
u_stat, p_value = stats.mannwhitneyu(a_arr, b_arr, alternative='two-sided')
results['test'] = 'mann_whitney_u'
results['u_statistic'] = u_stat
results['p_value'] = p_value
elif test_type == 'permutation':
observed_diff = np.mean(b_arr) - np.mean(a_arr)
all_values = np.concatenate([a_arr, b_arr])
n = len(a_arr)
perm_diffs = []
for _ in range(10000):
np.random.shuffle(all_values)
perm_a = all_values[:n]
perm_b = all_values[n:]
perm_diffs.append(np.mean(perm_b) - np.mean(perm_a))
perm_diffs = np.array(perm_diffs)
p_value = np.mean(np.abs(perm_diffs) >= np.abs(observed_diff))
results['test'] = 'permutation'
results['observed_difference'] = observed_diff
results['p_value'] = p_value
results['ci_95'] = (
np.percentile(perm_diffs, 2.5),
np.percentile(perm_diffs, 97.5)
)
elif test_type == 'bootstrap':
boot_diffs = []
for _ in range(10000):
boot_a = np.random.choice(a_arr, size=len(a_arr), replace=True)
boot_b = np.random.choice(b_arr, size=len(b_arr), replace=True)
boot_diffs.append(np.mean(boot_b) - np.mean(boot_a))
boot_diffs = np.array(boot_diffs)
results['test'] = 'bootstrap'
results['ci_95'] = (
np.percentile(boot_diffs, 2.5),
np.percentile(boot_diffs, 97.5)
)
results['ci_99'] = (
np.percentile(boot_diffs, 0.5),
np.percentile(boot_diffs, 99.5)
)
results['p_value'] = np.mean(boot_diffs <= 0) if np.mean(b_arr) > np.mean(a_arr) else np.mean(boot_diffs >= 0)
# Significance
results['significant'] = results.get('p_value', 1.0) < self.config.alpha
results['alpha'] = self.config.alpha
# Practical significance
practical_threshold = self.config.min_detectable_effect
mean_diff = results['mean_b'] - results['mean_a']
std_pooled = pooled_std
standardized_diff = abs(mean_diff) / std_pooled
results['practically_significant'] = standardized_diff > practical_threshold
results['practical_threshold'] = practical_threshold
# Recommendation
if results['significant'] and results['practically_significant']:
if mean_diff > 0:
results['recommendation'] = 'ADOPT_B'
else:
results['recommendation'] = 'KEEP_A'
else:
results['recommendation'] = 'INCONCLUSIVE'
return results
def _interpret_cohens_d(self, d: float) -> str:
"""Interpret effect size"""
if d < 0.2:
return 'negligible'
elif d < 0.5:
return 'small'
elif d < 0.8:
return 'medium'
else:
return 'large'
def guardrail_check(self) -> Dict:
"""Check if B violates guardrail metrics (risk limits)"""
checks = {}
# Collect guardrail metrics
a_guardrails = defaultdict(list)
b_guardrails = defaultdict(list)
for r in self.group_a_results:
for k, v in r['guardrails'].items():
a_guardrails[k].append(v)
for r in self.group_b_results:
for k, v in r['guardrails'].items():
b_guardrails[k].append(v)
# Compare
violations = []
for metric in a_guardrails.keys():
a_vals = np.array(a_guardrails[metric])
b_vals = np.array(b_guardrails[metric])
# Check if B is significantly worse
median_a = np.median(a_vals)
median_b = np.median(b_vals)
# Metric-specific thresholds
if 'drawdown' in metric.lower():
# Lower drawdown is better
if median_b > median_a * 1.5:
violations.append({
'metric': metric,
'severity': 'high' if median_b > median_a * 2 else 'medium',
'a_median': median_a,
'b_median': median_b,
'direction': 'worse'
})
elif 'volatility' in metric.lower() or 'var' in metric.lower():
# Lower is better
if median_b > median_a * 1.3:
violations.append({
'metric': metric,
'severity': 'high' if median_b > median_a * 1.5 else 'medium',
'a_median': median_a,
'b_median': median_b,
'direction': 'worse'
})
checks['violations'] = violations
checks['is_safe'] = len(violations) == 0
checks['n_metrics_checked'] = len(a_guardrails)
return checks
def get_counterfactual(self,
unit_id: str,
strategy_fn: Callable,
data: Dict) -> Dict:
"""
Counterfactual: What would have happened with the OTHER strategy?
Useful for:
- Causal inference: treatment effect estimation
- Variance reduction: use both A and B predictions
"""
# Get assigned group
assigned = [log for log in self.assignment_log if log['unit_id'] == unit_id]
if not assigned:
return {'error': 'Unit not found'}
actual_group = assigned[0]['group']
counterfactual_group = 'B' if actual_group == 'A' else 'A'
# Compute counterfactual outcome
counterfactual_outcome = strategy_fn(data, counterfactual_group)
return {
'unit_id': unit_id,
'actual_group': actual_group,
'counterfactual_group': counterfactual_group,
'counterfactual_outcome': counterfactual_outcome,
'note': 'Counterfactuals are hypothetical β€” cannot observe both'
}
def summary_report(self) -> str:
"""Generate human-readable summary report"""
analysis = self.analyze()
guardrails = self.guardrail_check()
report = f"""
{'='*70}
A/B TEST REPORT: {self.config.strategy_a_name} vs {self.config.strategy_b_name}
{'='*70}
SAMPLE SIZE
Group A: {analysis['n_a']} units
Group B: {analysis['n_b']} units
Required: {self.config.required_samples()} per group
Status: {'βœ“ Sufficient' if analysis['n_a'] >= self.config.required_samples() else '⚠ Under-powered'}
PRIMARY METRIC: {analysis.get('test', 'N/A')}
A mean: {analysis.get('mean_a', 0):.6f} (Β±{analysis.get('std_a', 0):.6f})
B mean: {analysis.get('mean_b', 0):.6f} (Β±{analysis.get('std_b', 0):.6f})
Difference: {analysis.get('mean_b', 0) - analysis.get('mean_a', 0):+.6f}
Cohen's d: {analysis.get('cohens_d', 0):.3f} ({analysis.get('effect_size_interpretation', 'N/A')})
P-value: {analysis.get('p_value', 'N/A')}
Significant (Ξ±={self.config.alpha}): {'βœ“ YES' if analysis.get('significant') else 'βœ— NO'}
Practically significant: {'βœ“ YES' if analysis.get('practically_significant') else 'βœ— NO'}
RECOMMENDATION: {analysis.get('recommendation', 'N/A')}
GUARDRAIL METRICS
Status: {'βœ“ Safe' if guardrails['is_safe'] else '⚠ VIOLATIONS DETECTED'}
Violations: {len(guardrails['violations'])}
"""
if guardrails['violations']:
for v in guardrails['violations']:
report += f" - {v['metric']}: {v['severity'].upper()} (B is {v['direction']})\n"
report += f"""
{'='*70}
"""
return report
class MultipleComparisonCorrection:
"""
Correct for testing multiple hypotheses simultaneously.
Running 20 A/B tests? Expect 1 false positive by chance (p=0.05).
Without correction, you'll adopt 1 bad strategy per 20 tests.
"""
@staticmethod
def bonferroni(p_values: np.ndarray, alpha: float = 0.05) -> Tuple[np.ndarray, bool]:
"""
Bonferroni correction: Ξ±_corrected = Ξ± / n_tests
Conservative: controls family-wise error rate (FWER).
"""
n = len(p_values)
corrected_alpha = alpha / n
is_significant = p_values < corrected_alpha
return corrected_alpha, is_significant
@staticmethod
def benjamini_hochberg(p_values: np.ndarray, alpha: float = 0.05) -> np.ndarray:
"""
Benjamini-Hochberg: controls False Discovery Rate (FDR).
Less conservative than Bonferroni.
Accept that some fraction of "discoveries" are false.
"""
n = len(p_values)
sorted_idx = np.argsort(p_values)
sorted_p = p_values[sorted_idx]
# Find largest k such that p_(k) <= (k/m) * Ξ±
is_significant = np.zeros(n, dtype=bool)
for i in range(n):
k = i + 1
threshold = (k / n) * alpha
if sorted_p[i] <= threshold:
is_significant[sorted_idx[i]] = True
else:
break
return is_significant
@staticmethod
def holm(p_values: np.ndarray, alpha: float = 0.05) -> np.ndarray:
"""
Holm's step-down procedure.
Controls FWER, more powerful than Bonferroni.
"""
n = len(p_values)
sorted_idx = np.argsort(p_values)
sorted_p = p_values[sorted_idx]
is_significant = np.zeros(n, dtype=bool)
for i in range(n):
k = i + 1
threshold = alpha / (n - k + 1)
if sorted_p[i] <= threshold:
is_significant[sorted_idx[i]] = True
else:
break
return is_significant
class SequentialABTest:
"""
Sequential A/B testing with valid early stopping.
Problem: Peeking at results and stopping when p<0.05 β†’ inflates Type I error.
Solution: Use sequential boundaries (always valid p-values).
Based on: Always Valid P-values (Johari et al., 2017)
"""
def __init__(self,
config: ExperimentConfig,
spending_function: str = 'obrien_fleming'):
self.config = config
self.spending_function = spending_function
self.observations = []
self.cumsum_a = 0
self.cumsum_b = 0
self.cumsum_sq_a = 0
self.cumsum_sq_b = 0
self.n_a = 0
self.n_b = 0
def update(self, group: str, value: float):
"""Add one observation and test for significance"""
if group == 'A':
self.cumsum_a += value
self.cumsum_sq_a += value ** 2
self.n_a += 1
else:
self.cumsum_b += value
self.cumsum_sq_b += value ** 2
self.n_b += 1
self.observations.append({'group': group, 'value': value})
# Compute always-valid p-value
return self._compute_always_valid_p()
def _compute_always_valid_p(self) -> Dict:
"""Compute always-valid p-value for early stopping"""
if self.n_a < 2 or self.n_b < 2:
return {'n': len(self.observations), 'p_value': 1.0, 'can_stop': False}
# Sample means
mean_a = self.cumsum_a / self.n_a
mean_b = self.cumsum_b / self.n_b
# Sample variances
var_a = (self.cumsum_sq_a - self.n_a * mean_a**2) / (self.n_a - 1)
var_b = (self.cumsum_sq_b - self.n_b * mean_b**2) / (self.n_b - 1)
# Pooled standard error
se = np.sqrt(var_a / self.n_a + var_b / self.n_b)
# Z-statistic
z = (mean_b - mean_a) / (se + 1e-10)
# Always-valid adjustment
# P-value valid under continuous monitoring
n_eff = min(self.n_a, self.n_b)
# Mixture stopping boundary (always valid)
# Approximation: multiply p-value by log(n)
raw_p = 2 * (1 - stats.norm.cdf(abs(z)))
adjusted_p = min(raw_p * np.log(max(n_eff, np.e)), 1.0)
# Can stop?
can_stop = adjusted_p < self.config.alpha
return {
'n': len(self.observations),
'n_a': self.n_a,
'n_b': self.n_b,
'mean_a': mean_a,
'mean_b': mean_b,
'z_statistic': z,
'raw_p_value': raw_p,
'adjusted_p_value': adjusted_p,
'can_stop': can_stop,
'recommendation': 'STOP' if can_stop else 'CONTINUE'
}
if __name__ == '__main__':
print("=" * 70)
print(" A/B TESTING FRAMEWORK FOR STRATEGIES")
print("=" * 70)
np.random.seed(42)
# Configuration
config = ExperimentConfig(
strategy_a_name='Baseline_Momentum',
strategy_b_name='ML_Alpha_v3',
alpha=0.05,
power=0.80,
min_detectable_effect=0.05, # Detect 0.05 Sharpe difference
baseline_sharpe=1.0
)
# Power analysis
required_n = config.required_samples()
print(f"\n1. POWER ANALYSIS")
print(f" Required sample size per group: {required_n}")
print(f" (Detect Sharpe diff of {config.min_detectable_effect} with {config.power*100:.0f}% power)")
# Run A/B test
print(f"\n2. SIMULATED A/B TEST")
test = ABTest(config, diversion_unit='day', stratify_by=['volatility_regime'])
# Simulate 400 days
n_days = 400
# Strategy A: Sharpe = 0.8
# Strategy B: Sharpe = 1.2 (better by 0.4)
daily_vol = 0.15 / np.sqrt(252)
for day in range(n_days):
# Volatility regime (for stratification)
regime = 'high' if np.random.rand() < 0.2 else 'normal'
# Assign
unit_id = f'day_{day:04d}'
group = test.assign(unit_id, {'volatility_regime': regime})
# Simulate returns
if group == 'A':
# Baseline: mean = 0.8 * daily_vol
ret = np.random.normal(0.8 * daily_vol, daily_vol)
else:
# Better: mean = 1.2 * daily_vol
ret = np.random.normal(1.2 * daily_vol, daily_vol)
# Guardrails
guardrails = {
'max_drawdown': abs(np.random.exponential(0.02)),
'daily_vol': abs(np.random.normal(daily_vol, daily_vol * 0.3))
}
test.record_result(unit_id, group, ret, guardrails)
# Analysis
analysis = test.analyze(test_type='t_test')
print(f"\n3. STATISTICAL RESULTS")
print(f" Group A (n={analysis['n_a']}): mean={analysis['mean_a']:.6f}")
print(f" Group B (n={analysis['n_b']}): mean={analysis['mean_b']:.6f}")
print(f" Difference: {analysis['mean_b'] - analysis['mean_a']:+.6f}")
print(f" Cohen's d: {analysis['cohens_d']:.3f}")
print(f" P-value: {analysis['p_value']:.4f}")
print(f" Significant: {'βœ“ YES' if analysis['significant'] else 'βœ— NO'}")
print(f" RECOMMENDATION: {analysis['recommendation']}")
# Guardrails
guardrail_check = test.guardrail_check()
print(f"\n4. GUARDRAIL CHECK")
print(f" Safe: {'βœ“ YES' if guardrail_check['is_safe'] else 'βœ— VIOLATIONS'}")
# Multiple comparison
print(f"\n5. MULTIPLE COMPARISON CORRECTION")
p_values = np.array([analysis['p_value'], 0.03, 0.08, 0.001, 0.12, 0.04])
bh_sig = MultipleComparisonCorrection.benjamini_hochberg(p_values)
print(f" Raw significant: {np.sum(p_values < 0.05)}/{len(p_values)}")
print(f" BH-FDR significant: {np.sum(bh_sig)}/{len(p_values)}")
# Full report
print(f"\n6. FULL REPORT")
print(test.summary_report())
# Sequential test
print(f"7. SEQUENTIAL TESTING")
seq_test = SequentialABTest(config)
for i in range(200):
group = 'A' if np.random.rand() < 0.5 else 'B'
value = np.random.normal(0.8 * daily_vol if group == 'A' else 1.2 * daily_vol, daily_vol)
result = seq_test.update(group, value)
if result['can_stop']:
print(f" Sequential test STOPPED at n={result['n']}")
print(f" Adjusted p-value: {result['adjusted_p_value']:.4f}")
break
print(f"\n KEY TAKEAWAYS:")
print(f" - Always A/B test before deploying")
print(f" - Multiple comparison correction prevents false discoveries")
print(f" - Guardrail metrics prevent hidden risk increases")
print(f" - Sequential testing enables early stopping (with valid p-values)")
print(f" - Power analysis ensures tests aren't underpowered")
print(f" - This is EXACTLY how Jane Street validates every strategy change")