"""A/B Testing Framework for Strategy Comparison At Jane Street, Two Sigma, Citadel — EVERY change goes through A/B testing. Not backtest-once-and-ship. Real randomized controlled trials. Why A/B testing beats backtesting: - Backtests: optimize on all data → overfit - A/B tests: train on A, test on B → honest evaluation - Statistical significance: p-values, not gut feeling - Multiple comparison correction: Bonferroni, FDR - Early stopping: peeking at results invalidates p-values This module: 1. Randomized strategy assignment 2. Statistical tests (t-test, Mann-Whitney, permutation) 3. Power analysis (how long to run test) 4. Sequential testing (early stopping without p-hacking) 5. Multiple comparison correction 6. Counterfactual estimation (what would have happened with other strategy) Based on: - Kohavi et al. (2009): "Controlled experiments on the web" - Johari et al. (2017): "Peeking at A/B Tests" - Deng et al. (2013): "Trustworthy Online Controlled Experiments" """ import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional, Callable from scipy import stats from scipy.special import erfinv from dataclasses import dataclass import warnings warnings.filterwarnings('ignore') @dataclass class ExperimentConfig: """Configuration for an A/B test""" strategy_a_name: str strategy_b_name: str alpha: float = 0.05 # Significance level power: float = 0.80 # Statistical power (1 - beta) min_detectable_effect: float = 0.01 # Sharpe difference to detect baseline_sharpe: float = 1.0 trading_days_per_year: int = 252 def required_samples(self) -> int: """ Calculate required sample size using power analysis. For Sharpe ratio comparison with daily returns. """ # Standardized effect size # Daily return variance ≈ (annual_vol / sqrt(252))^2 # Assuming annual volatility ≈ 0.15 (typical equity) daily_vol = 0.15 / np.sqrt(self.trading_days_per_year) # Difference in daily mean returns # Sharpe = (mean_return - r_f) / vol # So mean_return_diff = min_detectable_effect * vol mean_diff = self.min_detectable_effect * daily_vol # Pooled standard deviation (two independent samples) pooled_std = daily_vol * np.sqrt(2) # Cohen's d cohens_d = mean_diff / pooled_std # Sample size per group (two-tailed test) z_alpha = stats.norm.ppf(1 - self.alpha / 2) z_beta = stats.norm.ppf(self.power) n_per_group = 2 * ((z_alpha + z_beta) / cohens_d) ** 2 return int(np.ceil(n_per_group)) class ABTest: """ A/B test for trading strategy comparison. Critical design decisions: 1. Random assignment: which days/assets get A vs B 2. Stratification: ensure similar market conditions 3. Unit of diversion: per day? per asset? per trade? 4. Guardrail metrics: ensure B doesn't increase risk """ def __init__(self, config: ExperimentConfig, diversion_unit: str = 'day', stratify_by: Optional[List[str]] = None): self.config = config self.diversion_unit = diversion_unit self.stratify_by = stratify_by or [] # Results storage self.group_a_results = [] self.group_b_results = [] self.assignment_log = [] # Sequential testing state self.n_observations = 0 self.running_t_stat = 0 self.sequential_bounds = None def assign(self, unit_id: str, covariates: Optional[Dict] = None) -> str: """ Randomly assign unit to A or B. With stratification: balance A/B within strata. """ # Hash-based assignment for consistency np.random.seed(hash(unit_id) % 2**32) if covariates and self.stratify_by: # Stratified assignment stratum_key = '_'.join(str(covariates.get(k, '')) for k in self.stratify_by) # Check existing assignments in stratum stratum_assignments = [ log for log in self.assignment_log if log.get('stratum') == stratum_key ] n_a = sum(1 for log in stratum_assignments if log['group'] == 'A') n_b = sum(1 for log in stratum_assignments if log['group'] == 'B') # Alternate to maintain balance if n_a <= n_b: group = 'A' else: group = 'B' else: # Simple random assignment group = 'A' if np.random.rand() < 0.5 else 'B' log_entry = { 'unit_id': unit_id, 'group': group, 'timestamp': pd.Timestamp.now(), 'covariates': covariates or {} } if covariates and self.stratify_by: log_entry['stratum'] = '_'.join(str(covariates.get(k, '')) for k in self.stratify_by) self.assignment_log.append(log_entry) return group def record_result(self, unit_id: str, group: str, primary_metric: float, guardrail_metrics: Optional[Dict] = None): """ Record outcome for an assigned unit. primary_metric: Usually P&L or Sharpe contribution guardrail_metrics: Risk metrics (drawdown, volatility, etc.) """ result = { 'unit_id': unit_id, 'group': group, 'primary': primary_metric, 'guardrails': guardrail_metrics or {}, 'timestamp': pd.Timestamp.now() } if group == 'A': self.group_a_results.append(result) else: self.group_b_results.append(result) self.n_observations += 1 def analyze(self, metric: str = 'primary', test_type: str = 't_test') -> Dict: """ Statistical analysis of A vs B. test_type: - 't_test': Student's t-test (assumes normality) - 'mann_whitney': Non-parametric, robust to outliers - 'permutation': Distribution-free via resampling - 'bootstrap': Confidence intervals via resampling """ a_values = [r[metric] for r in self.group_a_results] b_values = [r[metric] for r in self.group_b_results] if len(a_values) < 3 or len(b_values) < 3: return { 'status': 'insufficient_data', 'n_a': len(a_values), 'n_b': len(b_values), 'required_n': self.config.required_samples() } a_arr = np.array(a_values) b_arr = np.array(b_values) # Descriptive stats results = { 'n_a': len(a_arr), 'n_b': len(b_arr), 'mean_a': np.mean(a_arr), 'mean_b': np.mean(b_arr), 'std_a': np.std(a_arr, ddof=1), 'std_b': np.std(b_arr, ddof=1), 'median_a': np.median(a_arr), 'median_b': np.median(b_arr), } # Effect size (Cohen's d) pooled_std = np.sqrt((results['std_a']**2 + results['std_b']**2) / 2) cohens_d = (results['mean_b'] - results['mean_a']) / (pooled_std + 1e-10) results['cohens_d'] = cohens_d results['effect_size_interpretation'] = self._interpret_cohens_d(abs(cohens_d)) # Statistical tests if test_type == 't_test': t_stat, p_value = stats.ttest_ind(a_arr, b_arr, equal_var=False) results['test'] = 'welch_t_test' results['t_statistic'] = t_stat results['p_value'] = p_value elif test_type == 'mann_whitney': u_stat, p_value = stats.mannwhitneyu(a_arr, b_arr, alternative='two-sided') results['test'] = 'mann_whitney_u' results['u_statistic'] = u_stat results['p_value'] = p_value elif test_type == 'permutation': observed_diff = np.mean(b_arr) - np.mean(a_arr) all_values = np.concatenate([a_arr, b_arr]) n = len(a_arr) perm_diffs = [] for _ in range(10000): np.random.shuffle(all_values) perm_a = all_values[:n] perm_b = all_values[n:] perm_diffs.append(np.mean(perm_b) - np.mean(perm_a)) perm_diffs = np.array(perm_diffs) p_value = np.mean(np.abs(perm_diffs) >= np.abs(observed_diff)) results['test'] = 'permutation' results['observed_difference'] = observed_diff results['p_value'] = p_value results['ci_95'] = ( np.percentile(perm_diffs, 2.5), np.percentile(perm_diffs, 97.5) ) elif test_type == 'bootstrap': boot_diffs = [] for _ in range(10000): boot_a = np.random.choice(a_arr, size=len(a_arr), replace=True) boot_b = np.random.choice(b_arr, size=len(b_arr), replace=True) boot_diffs.append(np.mean(boot_b) - np.mean(boot_a)) boot_diffs = np.array(boot_diffs) results['test'] = 'bootstrap' results['ci_95'] = ( np.percentile(boot_diffs, 2.5), np.percentile(boot_diffs, 97.5) ) results['ci_99'] = ( np.percentile(boot_diffs, 0.5), np.percentile(boot_diffs, 99.5) ) results['p_value'] = np.mean(boot_diffs <= 0) if np.mean(b_arr) > np.mean(a_arr) else np.mean(boot_diffs >= 0) # Significance results['significant'] = results.get('p_value', 1.0) < self.config.alpha results['alpha'] = self.config.alpha # Practical significance practical_threshold = self.config.min_detectable_effect mean_diff = results['mean_b'] - results['mean_a'] std_pooled = pooled_std standardized_diff = abs(mean_diff) / std_pooled results['practically_significant'] = standardized_diff > practical_threshold results['practical_threshold'] = practical_threshold # Recommendation if results['significant'] and results['practically_significant']: if mean_diff > 0: results['recommendation'] = 'ADOPT_B' else: results['recommendation'] = 'KEEP_A' else: results['recommendation'] = 'INCONCLUSIVE' return results def _interpret_cohens_d(self, d: float) -> str: """Interpret effect size""" if d < 0.2: return 'negligible' elif d < 0.5: return 'small' elif d < 0.8: return 'medium' else: return 'large' def guardrail_check(self) -> Dict: """Check if B violates guardrail metrics (risk limits)""" checks = {} # Collect guardrail metrics a_guardrails = defaultdict(list) b_guardrails = defaultdict(list) for r in self.group_a_results: for k, v in r['guardrails'].items(): a_guardrails[k].append(v) for r in self.group_b_results: for k, v in r['guardrails'].items(): b_guardrails[k].append(v) # Compare violations = [] for metric in a_guardrails.keys(): a_vals = np.array(a_guardrails[metric]) b_vals = np.array(b_guardrails[metric]) # Check if B is significantly worse median_a = np.median(a_vals) median_b = np.median(b_vals) # Metric-specific thresholds if 'drawdown' in metric.lower(): # Lower drawdown is better if median_b > median_a * 1.5: violations.append({ 'metric': metric, 'severity': 'high' if median_b > median_a * 2 else 'medium', 'a_median': median_a, 'b_median': median_b, 'direction': 'worse' }) elif 'volatility' in metric.lower() or 'var' in metric.lower(): # Lower is better if median_b > median_a * 1.3: violations.append({ 'metric': metric, 'severity': 'high' if median_b > median_a * 1.5 else 'medium', 'a_median': median_a, 'b_median': median_b, 'direction': 'worse' }) checks['violations'] = violations checks['is_safe'] = len(violations) == 0 checks['n_metrics_checked'] = len(a_guardrails) return checks def get_counterfactual(self, unit_id: str, strategy_fn: Callable, data: Dict) -> Dict: """ Counterfactual: What would have happened with the OTHER strategy? Useful for: - Causal inference: treatment effect estimation - Variance reduction: use both A and B predictions """ # Get assigned group assigned = [log for log in self.assignment_log if log['unit_id'] == unit_id] if not assigned: return {'error': 'Unit not found'} actual_group = assigned[0]['group'] counterfactual_group = 'B' if actual_group == 'A' else 'A' # Compute counterfactual outcome counterfactual_outcome = strategy_fn(data, counterfactual_group) return { 'unit_id': unit_id, 'actual_group': actual_group, 'counterfactual_group': counterfactual_group, 'counterfactual_outcome': counterfactual_outcome, 'note': 'Counterfactuals are hypothetical — cannot observe both' } def summary_report(self) -> str: """Generate human-readable summary report""" analysis = self.analyze() guardrails = self.guardrail_check() report = f""" {'='*70} A/B TEST REPORT: {self.config.strategy_a_name} vs {self.config.strategy_b_name} {'='*70} SAMPLE SIZE Group A: {analysis['n_a']} units Group B: {analysis['n_b']} units Required: {self.config.required_samples()} per group Status: {'✓ Sufficient' if analysis['n_a'] >= self.config.required_samples() else '⚠ Under-powered'} PRIMARY METRIC: {analysis.get('test', 'N/A')} A mean: {analysis.get('mean_a', 0):.6f} (±{analysis.get('std_a', 0):.6f}) B mean: {analysis.get('mean_b', 0):.6f} (±{analysis.get('std_b', 0):.6f}) Difference: {analysis.get('mean_b', 0) - analysis.get('mean_a', 0):+.6f} Cohen's d: {analysis.get('cohens_d', 0):.3f} ({analysis.get('effect_size_interpretation', 'N/A')}) P-value: {analysis.get('p_value', 'N/A')} Significant (α={self.config.alpha}): {'✓ YES' if analysis.get('significant') else '✗ NO'} Practically significant: {'✓ YES' if analysis.get('practically_significant') else '✗ NO'} RECOMMENDATION: {analysis.get('recommendation', 'N/A')} GUARDRAIL METRICS Status: {'✓ Safe' if guardrails['is_safe'] else '⚠ VIOLATIONS DETECTED'} Violations: {len(guardrails['violations'])} """ if guardrails['violations']: for v in guardrails['violations']: report += f" - {v['metric']}: {v['severity'].upper()} (B is {v['direction']})\n" report += f""" {'='*70} """ return report class MultipleComparisonCorrection: """ Correct for testing multiple hypotheses simultaneously. Running 20 A/B tests? Expect 1 false positive by chance (p=0.05). Without correction, you'll adopt 1 bad strategy per 20 tests. """ @staticmethod def bonferroni(p_values: np.ndarray, alpha: float = 0.05) -> Tuple[np.ndarray, bool]: """ Bonferroni correction: α_corrected = α / n_tests Conservative: controls family-wise error rate (FWER). """ n = len(p_values) corrected_alpha = alpha / n is_significant = p_values < corrected_alpha return corrected_alpha, is_significant @staticmethod def benjamini_hochberg(p_values: np.ndarray, alpha: float = 0.05) -> np.ndarray: """ Benjamini-Hochberg: controls False Discovery Rate (FDR). Less conservative than Bonferroni. Accept that some fraction of "discoveries" are false. """ n = len(p_values) sorted_idx = np.argsort(p_values) sorted_p = p_values[sorted_idx] # Find largest k such that p_(k) <= (k/m) * α is_significant = np.zeros(n, dtype=bool) for i in range(n): k = i + 1 threshold = (k / n) * alpha if sorted_p[i] <= threshold: is_significant[sorted_idx[i]] = True else: break return is_significant @staticmethod def holm(p_values: np.ndarray, alpha: float = 0.05) -> np.ndarray: """ Holm's step-down procedure. Controls FWER, more powerful than Bonferroni. """ n = len(p_values) sorted_idx = np.argsort(p_values) sorted_p = p_values[sorted_idx] is_significant = np.zeros(n, dtype=bool) for i in range(n): k = i + 1 threshold = alpha / (n - k + 1) if sorted_p[i] <= threshold: is_significant[sorted_idx[i]] = True else: break return is_significant class SequentialABTest: """ Sequential A/B testing with valid early stopping. Problem: Peeking at results and stopping when p<0.05 → inflates Type I error. Solution: Use sequential boundaries (always valid p-values). Based on: Always Valid P-values (Johari et al., 2017) """ def __init__(self, config: ExperimentConfig, spending_function: str = 'obrien_fleming'): self.config = config self.spending_function = spending_function self.observations = [] self.cumsum_a = 0 self.cumsum_b = 0 self.cumsum_sq_a = 0 self.cumsum_sq_b = 0 self.n_a = 0 self.n_b = 0 def update(self, group: str, value: float): """Add one observation and test for significance""" if group == 'A': self.cumsum_a += value self.cumsum_sq_a += value ** 2 self.n_a += 1 else: self.cumsum_b += value self.cumsum_sq_b += value ** 2 self.n_b += 1 self.observations.append({'group': group, 'value': value}) # Compute always-valid p-value return self._compute_always_valid_p() def _compute_always_valid_p(self) -> Dict: """Compute always-valid p-value for early stopping""" if self.n_a < 2 or self.n_b < 2: return {'n': len(self.observations), 'p_value': 1.0, 'can_stop': False} # Sample means mean_a = self.cumsum_a / self.n_a mean_b = self.cumsum_b / self.n_b # Sample variances var_a = (self.cumsum_sq_a - self.n_a * mean_a**2) / (self.n_a - 1) var_b = (self.cumsum_sq_b - self.n_b * mean_b**2) / (self.n_b - 1) # Pooled standard error se = np.sqrt(var_a / self.n_a + var_b / self.n_b) # Z-statistic z = (mean_b - mean_a) / (se + 1e-10) # Always-valid adjustment # P-value valid under continuous monitoring n_eff = min(self.n_a, self.n_b) # Mixture stopping boundary (always valid) # Approximation: multiply p-value by log(n) raw_p = 2 * (1 - stats.norm.cdf(abs(z))) adjusted_p = min(raw_p * np.log(max(n_eff, np.e)), 1.0) # Can stop? can_stop = adjusted_p < self.config.alpha return { 'n': len(self.observations), 'n_a': self.n_a, 'n_b': self.n_b, 'mean_a': mean_a, 'mean_b': mean_b, 'z_statistic': z, 'raw_p_value': raw_p, 'adjusted_p_value': adjusted_p, 'can_stop': can_stop, 'recommendation': 'STOP' if can_stop else 'CONTINUE' } if __name__ == '__main__': print("=" * 70) print(" A/B TESTING FRAMEWORK FOR STRATEGIES") print("=" * 70) np.random.seed(42) # Configuration config = ExperimentConfig( strategy_a_name='Baseline_Momentum', strategy_b_name='ML_Alpha_v3', alpha=0.05, power=0.80, min_detectable_effect=0.05, # Detect 0.05 Sharpe difference baseline_sharpe=1.0 ) # Power analysis required_n = config.required_samples() print(f"\n1. POWER ANALYSIS") print(f" Required sample size per group: {required_n}") print(f" (Detect Sharpe diff of {config.min_detectable_effect} with {config.power*100:.0f}% power)") # Run A/B test print(f"\n2. SIMULATED A/B TEST") test = ABTest(config, diversion_unit='day', stratify_by=['volatility_regime']) # Simulate 400 days n_days = 400 # Strategy A: Sharpe = 0.8 # Strategy B: Sharpe = 1.2 (better by 0.4) daily_vol = 0.15 / np.sqrt(252) for day in range(n_days): # Volatility regime (for stratification) regime = 'high' if np.random.rand() < 0.2 else 'normal' # Assign unit_id = f'day_{day:04d}' group = test.assign(unit_id, {'volatility_regime': regime}) # Simulate returns if group == 'A': # Baseline: mean = 0.8 * daily_vol ret = np.random.normal(0.8 * daily_vol, daily_vol) else: # Better: mean = 1.2 * daily_vol ret = np.random.normal(1.2 * daily_vol, daily_vol) # Guardrails guardrails = { 'max_drawdown': abs(np.random.exponential(0.02)), 'daily_vol': abs(np.random.normal(daily_vol, daily_vol * 0.3)) } test.record_result(unit_id, group, ret, guardrails) # Analysis analysis = test.analyze(test_type='t_test') print(f"\n3. STATISTICAL RESULTS") print(f" Group A (n={analysis['n_a']}): mean={analysis['mean_a']:.6f}") print(f" Group B (n={analysis['n_b']}): mean={analysis['mean_b']:.6f}") print(f" Difference: {analysis['mean_b'] - analysis['mean_a']:+.6f}") print(f" Cohen's d: {analysis['cohens_d']:.3f}") print(f" P-value: {analysis['p_value']:.4f}") print(f" Significant: {'✓ YES' if analysis['significant'] else '✗ NO'}") print(f" RECOMMENDATION: {analysis['recommendation']}") # Guardrails guardrail_check = test.guardrail_check() print(f"\n4. GUARDRAIL CHECK") print(f" Safe: {'✓ YES' if guardrail_check['is_safe'] else '✗ VIOLATIONS'}") # Multiple comparison print(f"\n5. MULTIPLE COMPARISON CORRECTION") p_values = np.array([analysis['p_value'], 0.03, 0.08, 0.001, 0.12, 0.04]) bh_sig = MultipleComparisonCorrection.benjamini_hochberg(p_values) print(f" Raw significant: {np.sum(p_values < 0.05)}/{len(p_values)}") print(f" BH-FDR significant: {np.sum(bh_sig)}/{len(p_values)}") # Full report print(f"\n6. FULL REPORT") print(test.summary_report()) # Sequential test print(f"7. SEQUENTIAL TESTING") seq_test = SequentialABTest(config) for i in range(200): group = 'A' if np.random.rand() < 0.5 else 'B' value = np.random.normal(0.8 * daily_vol if group == 'A' else 1.2 * daily_vol, daily_vol) result = seq_test.update(group, value) if result['can_stop']: print(f" Sequential test STOPPED at n={result['n']}") print(f" Adjusted p-value: {result['adjusted_p_value']:.4f}") break print(f"\n KEY TAKEAWAYS:") print(f" - Always A/B test before deploying") print(f" - Multiple comparison correction prevents false discoveries") print(f" - Guardrail metrics prevent hidden risk increases") print(f" - Sequential testing enables early stopping (with valid p-values)") print(f" - Power analysis ensures tests aren't underpowered") print(f" - This is EXACTLY how Jane Street validates every strategy change")