Add A/B testing framework for strategy comparison with statistical significance testing
a512ac0 verified | """A/B Testing Framework for Strategy Comparison | |
| At Jane Street, Two Sigma, Citadel β EVERY change goes through A/B testing. | |
| Not backtest-once-and-ship. Real randomized controlled trials. | |
| Why A/B testing beats backtesting: | |
| - Backtests: optimize on all data β overfit | |
| - A/B tests: train on A, test on B β honest evaluation | |
| - Statistical significance: p-values, not gut feeling | |
| - Multiple comparison correction: Bonferroni, FDR | |
| - Early stopping: peeking at results invalidates p-values | |
| This module: | |
| 1. Randomized strategy assignment | |
| 2. Statistical tests (t-test, Mann-Whitney, permutation) | |
| 3. Power analysis (how long to run test) | |
| 4. Sequential testing (early stopping without p-hacking) | |
| 5. Multiple comparison correction | |
| 6. Counterfactual estimation (what would have happened with other strategy) | |
| Based on: | |
| - Kohavi et al. (2009): "Controlled experiments on the web" | |
| - Johari et al. (2017): "Peeking at A/B Tests" | |
| - Deng et al. (2013): "Trustworthy Online Controlled Experiments" | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, List, Tuple, Optional, Callable | |
| from scipy import stats | |
| from scipy.special import erfinv | |
| from dataclasses import dataclass | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class ExperimentConfig: | |
| """Configuration for an A/B test""" | |
| strategy_a_name: str | |
| strategy_b_name: str | |
| alpha: float = 0.05 # Significance level | |
| power: float = 0.80 # Statistical power (1 - beta) | |
| min_detectable_effect: float = 0.01 # Sharpe difference to detect | |
| baseline_sharpe: float = 1.0 | |
| trading_days_per_year: int = 252 | |
| def required_samples(self) -> int: | |
| """ | |
| Calculate required sample size using power analysis. | |
| For Sharpe ratio comparison with daily returns. | |
| """ | |
| # Standardized effect size | |
| # Daily return variance β (annual_vol / sqrt(252))^2 | |
| # Assuming annual volatility β 0.15 (typical equity) | |
| daily_vol = 0.15 / np.sqrt(self.trading_days_per_year) | |
| # Difference in daily mean returns | |
| # Sharpe = (mean_return - r_f) / vol | |
| # So mean_return_diff = min_detectable_effect * vol | |
| mean_diff = self.min_detectable_effect * daily_vol | |
| # Pooled standard deviation (two independent samples) | |
| pooled_std = daily_vol * np.sqrt(2) | |
| # Cohen's d | |
| cohens_d = mean_diff / pooled_std | |
| # Sample size per group (two-tailed test) | |
| z_alpha = stats.norm.ppf(1 - self.alpha / 2) | |
| z_beta = stats.norm.ppf(self.power) | |
| n_per_group = 2 * ((z_alpha + z_beta) / cohens_d) ** 2 | |
| return int(np.ceil(n_per_group)) | |
| class ABTest: | |
| """ | |
| A/B test for trading strategy comparison. | |
| Critical design decisions: | |
| 1. Random assignment: which days/assets get A vs B | |
| 2. Stratification: ensure similar market conditions | |
| 3. Unit of diversion: per day? per asset? per trade? | |
| 4. Guardrail metrics: ensure B doesn't increase risk | |
| """ | |
| def __init__(self, | |
| config: ExperimentConfig, | |
| diversion_unit: str = 'day', | |
| stratify_by: Optional[List[str]] = None): | |
| self.config = config | |
| self.diversion_unit = diversion_unit | |
| self.stratify_by = stratify_by or [] | |
| # Results storage | |
| self.group_a_results = [] | |
| self.group_b_results = [] | |
| self.assignment_log = [] | |
| # Sequential testing state | |
| self.n_observations = 0 | |
| self.running_t_stat = 0 | |
| self.sequential_bounds = None | |
| def assign(self, | |
| unit_id: str, | |
| covariates: Optional[Dict] = None) -> str: | |
| """ | |
| Randomly assign unit to A or B. | |
| With stratification: balance A/B within strata. | |
| """ | |
| # Hash-based assignment for consistency | |
| np.random.seed(hash(unit_id) % 2**32) | |
| if covariates and self.stratify_by: | |
| # Stratified assignment | |
| stratum_key = '_'.join(str(covariates.get(k, '')) for k in self.stratify_by) | |
| # Check existing assignments in stratum | |
| stratum_assignments = [ | |
| log for log in self.assignment_log | |
| if log.get('stratum') == stratum_key | |
| ] | |
| n_a = sum(1 for log in stratum_assignments if log['group'] == 'A') | |
| n_b = sum(1 for log in stratum_assignments if log['group'] == 'B') | |
| # Alternate to maintain balance | |
| if n_a <= n_b: | |
| group = 'A' | |
| else: | |
| group = 'B' | |
| else: | |
| # Simple random assignment | |
| group = 'A' if np.random.rand() < 0.5 else 'B' | |
| log_entry = { | |
| 'unit_id': unit_id, | |
| 'group': group, | |
| 'timestamp': pd.Timestamp.now(), | |
| 'covariates': covariates or {} | |
| } | |
| if covariates and self.stratify_by: | |
| log_entry['stratum'] = '_'.join(str(covariates.get(k, '')) for k in self.stratify_by) | |
| self.assignment_log.append(log_entry) | |
| return group | |
| def record_result(self, | |
| unit_id: str, | |
| group: str, | |
| primary_metric: float, | |
| guardrail_metrics: Optional[Dict] = None): | |
| """ | |
| Record outcome for an assigned unit. | |
| primary_metric: Usually P&L or Sharpe contribution | |
| guardrail_metrics: Risk metrics (drawdown, volatility, etc.) | |
| """ | |
| result = { | |
| 'unit_id': unit_id, | |
| 'group': group, | |
| 'primary': primary_metric, | |
| 'guardrails': guardrail_metrics or {}, | |
| 'timestamp': pd.Timestamp.now() | |
| } | |
| if group == 'A': | |
| self.group_a_results.append(result) | |
| else: | |
| self.group_b_results.append(result) | |
| self.n_observations += 1 | |
| def analyze(self, | |
| metric: str = 'primary', | |
| test_type: str = 't_test') -> Dict: | |
| """ | |
| Statistical analysis of A vs B. | |
| test_type: | |
| - 't_test': Student's t-test (assumes normality) | |
| - 'mann_whitney': Non-parametric, robust to outliers | |
| - 'permutation': Distribution-free via resampling | |
| - 'bootstrap': Confidence intervals via resampling | |
| """ | |
| a_values = [r[metric] for r in self.group_a_results] | |
| b_values = [r[metric] for r in self.group_b_results] | |
| if len(a_values) < 3 or len(b_values) < 3: | |
| return { | |
| 'status': 'insufficient_data', | |
| 'n_a': len(a_values), | |
| 'n_b': len(b_values), | |
| 'required_n': self.config.required_samples() | |
| } | |
| a_arr = np.array(a_values) | |
| b_arr = np.array(b_values) | |
| # Descriptive stats | |
| results = { | |
| 'n_a': len(a_arr), | |
| 'n_b': len(b_arr), | |
| 'mean_a': np.mean(a_arr), | |
| 'mean_b': np.mean(b_arr), | |
| 'std_a': np.std(a_arr, ddof=1), | |
| 'std_b': np.std(b_arr, ddof=1), | |
| 'median_a': np.median(a_arr), | |
| 'median_b': np.median(b_arr), | |
| } | |
| # Effect size (Cohen's d) | |
| pooled_std = np.sqrt((results['std_a']**2 + results['std_b']**2) / 2) | |
| cohens_d = (results['mean_b'] - results['mean_a']) / (pooled_std + 1e-10) | |
| results['cohens_d'] = cohens_d | |
| results['effect_size_interpretation'] = self._interpret_cohens_d(abs(cohens_d)) | |
| # Statistical tests | |
| if test_type == 't_test': | |
| t_stat, p_value = stats.ttest_ind(a_arr, b_arr, equal_var=False) | |
| results['test'] = 'welch_t_test' | |
| results['t_statistic'] = t_stat | |
| results['p_value'] = p_value | |
| elif test_type == 'mann_whitney': | |
| u_stat, p_value = stats.mannwhitneyu(a_arr, b_arr, alternative='two-sided') | |
| results['test'] = 'mann_whitney_u' | |
| results['u_statistic'] = u_stat | |
| results['p_value'] = p_value | |
| elif test_type == 'permutation': | |
| observed_diff = np.mean(b_arr) - np.mean(a_arr) | |
| all_values = np.concatenate([a_arr, b_arr]) | |
| n = len(a_arr) | |
| perm_diffs = [] | |
| for _ in range(10000): | |
| np.random.shuffle(all_values) | |
| perm_a = all_values[:n] | |
| perm_b = all_values[n:] | |
| perm_diffs.append(np.mean(perm_b) - np.mean(perm_a)) | |
| perm_diffs = np.array(perm_diffs) | |
| p_value = np.mean(np.abs(perm_diffs) >= np.abs(observed_diff)) | |
| results['test'] = 'permutation' | |
| results['observed_difference'] = observed_diff | |
| results['p_value'] = p_value | |
| results['ci_95'] = ( | |
| np.percentile(perm_diffs, 2.5), | |
| np.percentile(perm_diffs, 97.5) | |
| ) | |
| elif test_type == 'bootstrap': | |
| boot_diffs = [] | |
| for _ in range(10000): | |
| boot_a = np.random.choice(a_arr, size=len(a_arr), replace=True) | |
| boot_b = np.random.choice(b_arr, size=len(b_arr), replace=True) | |
| boot_diffs.append(np.mean(boot_b) - np.mean(boot_a)) | |
| boot_diffs = np.array(boot_diffs) | |
| results['test'] = 'bootstrap' | |
| results['ci_95'] = ( | |
| np.percentile(boot_diffs, 2.5), | |
| np.percentile(boot_diffs, 97.5) | |
| ) | |
| results['ci_99'] = ( | |
| np.percentile(boot_diffs, 0.5), | |
| np.percentile(boot_diffs, 99.5) | |
| ) | |
| results['p_value'] = np.mean(boot_diffs <= 0) if np.mean(b_arr) > np.mean(a_arr) else np.mean(boot_diffs >= 0) | |
| # Significance | |
| results['significant'] = results.get('p_value', 1.0) < self.config.alpha | |
| results['alpha'] = self.config.alpha | |
| # Practical significance | |
| practical_threshold = self.config.min_detectable_effect | |
| mean_diff = results['mean_b'] - results['mean_a'] | |
| std_pooled = pooled_std | |
| standardized_diff = abs(mean_diff) / std_pooled | |
| results['practically_significant'] = standardized_diff > practical_threshold | |
| results['practical_threshold'] = practical_threshold | |
| # Recommendation | |
| if results['significant'] and results['practically_significant']: | |
| if mean_diff > 0: | |
| results['recommendation'] = 'ADOPT_B' | |
| else: | |
| results['recommendation'] = 'KEEP_A' | |
| else: | |
| results['recommendation'] = 'INCONCLUSIVE' | |
| return results | |
| def _interpret_cohens_d(self, d: float) -> str: | |
| """Interpret effect size""" | |
| if d < 0.2: | |
| return 'negligible' | |
| elif d < 0.5: | |
| return 'small' | |
| elif d < 0.8: | |
| return 'medium' | |
| else: | |
| return 'large' | |
| def guardrail_check(self) -> Dict: | |
| """Check if B violates guardrail metrics (risk limits)""" | |
| checks = {} | |
| # Collect guardrail metrics | |
| a_guardrails = defaultdict(list) | |
| b_guardrails = defaultdict(list) | |
| for r in self.group_a_results: | |
| for k, v in r['guardrails'].items(): | |
| a_guardrails[k].append(v) | |
| for r in self.group_b_results: | |
| for k, v in r['guardrails'].items(): | |
| b_guardrails[k].append(v) | |
| # Compare | |
| violations = [] | |
| for metric in a_guardrails.keys(): | |
| a_vals = np.array(a_guardrails[metric]) | |
| b_vals = np.array(b_guardrails[metric]) | |
| # Check if B is significantly worse | |
| median_a = np.median(a_vals) | |
| median_b = np.median(b_vals) | |
| # Metric-specific thresholds | |
| if 'drawdown' in metric.lower(): | |
| # Lower drawdown is better | |
| if median_b > median_a * 1.5: | |
| violations.append({ | |
| 'metric': metric, | |
| 'severity': 'high' if median_b > median_a * 2 else 'medium', | |
| 'a_median': median_a, | |
| 'b_median': median_b, | |
| 'direction': 'worse' | |
| }) | |
| elif 'volatility' in metric.lower() or 'var' in metric.lower(): | |
| # Lower is better | |
| if median_b > median_a * 1.3: | |
| violations.append({ | |
| 'metric': metric, | |
| 'severity': 'high' if median_b > median_a * 1.5 else 'medium', | |
| 'a_median': median_a, | |
| 'b_median': median_b, | |
| 'direction': 'worse' | |
| }) | |
| checks['violations'] = violations | |
| checks['is_safe'] = len(violations) == 0 | |
| checks['n_metrics_checked'] = len(a_guardrails) | |
| return checks | |
| def get_counterfactual(self, | |
| unit_id: str, | |
| strategy_fn: Callable, | |
| data: Dict) -> Dict: | |
| """ | |
| Counterfactual: What would have happened with the OTHER strategy? | |
| Useful for: | |
| - Causal inference: treatment effect estimation | |
| - Variance reduction: use both A and B predictions | |
| """ | |
| # Get assigned group | |
| assigned = [log for log in self.assignment_log if log['unit_id'] == unit_id] | |
| if not assigned: | |
| return {'error': 'Unit not found'} | |
| actual_group = assigned[0]['group'] | |
| counterfactual_group = 'B' if actual_group == 'A' else 'A' | |
| # Compute counterfactual outcome | |
| counterfactual_outcome = strategy_fn(data, counterfactual_group) | |
| return { | |
| 'unit_id': unit_id, | |
| 'actual_group': actual_group, | |
| 'counterfactual_group': counterfactual_group, | |
| 'counterfactual_outcome': counterfactual_outcome, | |
| 'note': 'Counterfactuals are hypothetical β cannot observe both' | |
| } | |
| def summary_report(self) -> str: | |
| """Generate human-readable summary report""" | |
| analysis = self.analyze() | |
| guardrails = self.guardrail_check() | |
| report = f""" | |
| {'='*70} | |
| A/B TEST REPORT: {self.config.strategy_a_name} vs {self.config.strategy_b_name} | |
| {'='*70} | |
| SAMPLE SIZE | |
| Group A: {analysis['n_a']} units | |
| Group B: {analysis['n_b']} units | |
| Required: {self.config.required_samples()} per group | |
| Status: {'β Sufficient' if analysis['n_a'] >= self.config.required_samples() else 'β Under-powered'} | |
| PRIMARY METRIC: {analysis.get('test', 'N/A')} | |
| A mean: {analysis.get('mean_a', 0):.6f} (Β±{analysis.get('std_a', 0):.6f}) | |
| B mean: {analysis.get('mean_b', 0):.6f} (Β±{analysis.get('std_b', 0):.6f}) | |
| Difference: {analysis.get('mean_b', 0) - analysis.get('mean_a', 0):+.6f} | |
| Cohen's d: {analysis.get('cohens_d', 0):.3f} ({analysis.get('effect_size_interpretation', 'N/A')}) | |
| P-value: {analysis.get('p_value', 'N/A')} | |
| Significant (Ξ±={self.config.alpha}): {'β YES' if analysis.get('significant') else 'β NO'} | |
| Practically significant: {'β YES' if analysis.get('practically_significant') else 'β NO'} | |
| RECOMMENDATION: {analysis.get('recommendation', 'N/A')} | |
| GUARDRAIL METRICS | |
| Status: {'β Safe' if guardrails['is_safe'] else 'β VIOLATIONS DETECTED'} | |
| Violations: {len(guardrails['violations'])} | |
| """ | |
| if guardrails['violations']: | |
| for v in guardrails['violations']: | |
| report += f" - {v['metric']}: {v['severity'].upper()} (B is {v['direction']})\n" | |
| report += f""" | |
| {'='*70} | |
| """ | |
| return report | |
| class MultipleComparisonCorrection: | |
| """ | |
| Correct for testing multiple hypotheses simultaneously. | |
| Running 20 A/B tests? Expect 1 false positive by chance (p=0.05). | |
| Without correction, you'll adopt 1 bad strategy per 20 tests. | |
| """ | |
| def bonferroni(p_values: np.ndarray, alpha: float = 0.05) -> Tuple[np.ndarray, bool]: | |
| """ | |
| Bonferroni correction: Ξ±_corrected = Ξ± / n_tests | |
| Conservative: controls family-wise error rate (FWER). | |
| """ | |
| n = len(p_values) | |
| corrected_alpha = alpha / n | |
| is_significant = p_values < corrected_alpha | |
| return corrected_alpha, is_significant | |
| def benjamini_hochberg(p_values: np.ndarray, alpha: float = 0.05) -> np.ndarray: | |
| """ | |
| Benjamini-Hochberg: controls False Discovery Rate (FDR). | |
| Less conservative than Bonferroni. | |
| Accept that some fraction of "discoveries" are false. | |
| """ | |
| n = len(p_values) | |
| sorted_idx = np.argsort(p_values) | |
| sorted_p = p_values[sorted_idx] | |
| # Find largest k such that p_(k) <= (k/m) * Ξ± | |
| is_significant = np.zeros(n, dtype=bool) | |
| for i in range(n): | |
| k = i + 1 | |
| threshold = (k / n) * alpha | |
| if sorted_p[i] <= threshold: | |
| is_significant[sorted_idx[i]] = True | |
| else: | |
| break | |
| return is_significant | |
| def holm(p_values: np.ndarray, alpha: float = 0.05) -> np.ndarray: | |
| """ | |
| Holm's step-down procedure. | |
| Controls FWER, more powerful than Bonferroni. | |
| """ | |
| n = len(p_values) | |
| sorted_idx = np.argsort(p_values) | |
| sorted_p = p_values[sorted_idx] | |
| is_significant = np.zeros(n, dtype=bool) | |
| for i in range(n): | |
| k = i + 1 | |
| threshold = alpha / (n - k + 1) | |
| if sorted_p[i] <= threshold: | |
| is_significant[sorted_idx[i]] = True | |
| else: | |
| break | |
| return is_significant | |
| class SequentialABTest: | |
| """ | |
| Sequential A/B testing with valid early stopping. | |
| Problem: Peeking at results and stopping when p<0.05 β inflates Type I error. | |
| Solution: Use sequential boundaries (always valid p-values). | |
| Based on: Always Valid P-values (Johari et al., 2017) | |
| """ | |
| def __init__(self, | |
| config: ExperimentConfig, | |
| spending_function: str = 'obrien_fleming'): | |
| self.config = config | |
| self.spending_function = spending_function | |
| self.observations = [] | |
| self.cumsum_a = 0 | |
| self.cumsum_b = 0 | |
| self.cumsum_sq_a = 0 | |
| self.cumsum_sq_b = 0 | |
| self.n_a = 0 | |
| self.n_b = 0 | |
| def update(self, group: str, value: float): | |
| """Add one observation and test for significance""" | |
| if group == 'A': | |
| self.cumsum_a += value | |
| self.cumsum_sq_a += value ** 2 | |
| self.n_a += 1 | |
| else: | |
| self.cumsum_b += value | |
| self.cumsum_sq_b += value ** 2 | |
| self.n_b += 1 | |
| self.observations.append({'group': group, 'value': value}) | |
| # Compute always-valid p-value | |
| return self._compute_always_valid_p() | |
| def _compute_always_valid_p(self) -> Dict: | |
| """Compute always-valid p-value for early stopping""" | |
| if self.n_a < 2 or self.n_b < 2: | |
| return {'n': len(self.observations), 'p_value': 1.0, 'can_stop': False} | |
| # Sample means | |
| mean_a = self.cumsum_a / self.n_a | |
| mean_b = self.cumsum_b / self.n_b | |
| # Sample variances | |
| var_a = (self.cumsum_sq_a - self.n_a * mean_a**2) / (self.n_a - 1) | |
| var_b = (self.cumsum_sq_b - self.n_b * mean_b**2) / (self.n_b - 1) | |
| # Pooled standard error | |
| se = np.sqrt(var_a / self.n_a + var_b / self.n_b) | |
| # Z-statistic | |
| z = (mean_b - mean_a) / (se + 1e-10) | |
| # Always-valid adjustment | |
| # P-value valid under continuous monitoring | |
| n_eff = min(self.n_a, self.n_b) | |
| # Mixture stopping boundary (always valid) | |
| # Approximation: multiply p-value by log(n) | |
| raw_p = 2 * (1 - stats.norm.cdf(abs(z))) | |
| adjusted_p = min(raw_p * np.log(max(n_eff, np.e)), 1.0) | |
| # Can stop? | |
| can_stop = adjusted_p < self.config.alpha | |
| return { | |
| 'n': len(self.observations), | |
| 'n_a': self.n_a, | |
| 'n_b': self.n_b, | |
| 'mean_a': mean_a, | |
| 'mean_b': mean_b, | |
| 'z_statistic': z, | |
| 'raw_p_value': raw_p, | |
| 'adjusted_p_value': adjusted_p, | |
| 'can_stop': can_stop, | |
| 'recommendation': 'STOP' if can_stop else 'CONTINUE' | |
| } | |
| if __name__ == '__main__': | |
| print("=" * 70) | |
| print(" A/B TESTING FRAMEWORK FOR STRATEGIES") | |
| print("=" * 70) | |
| np.random.seed(42) | |
| # Configuration | |
| config = ExperimentConfig( | |
| strategy_a_name='Baseline_Momentum', | |
| strategy_b_name='ML_Alpha_v3', | |
| alpha=0.05, | |
| power=0.80, | |
| min_detectable_effect=0.05, # Detect 0.05 Sharpe difference | |
| baseline_sharpe=1.0 | |
| ) | |
| # Power analysis | |
| required_n = config.required_samples() | |
| print(f"\n1. POWER ANALYSIS") | |
| print(f" Required sample size per group: {required_n}") | |
| print(f" (Detect Sharpe diff of {config.min_detectable_effect} with {config.power*100:.0f}% power)") | |
| # Run A/B test | |
| print(f"\n2. SIMULATED A/B TEST") | |
| test = ABTest(config, diversion_unit='day', stratify_by=['volatility_regime']) | |
| # Simulate 400 days | |
| n_days = 400 | |
| # Strategy A: Sharpe = 0.8 | |
| # Strategy B: Sharpe = 1.2 (better by 0.4) | |
| daily_vol = 0.15 / np.sqrt(252) | |
| for day in range(n_days): | |
| # Volatility regime (for stratification) | |
| regime = 'high' if np.random.rand() < 0.2 else 'normal' | |
| # Assign | |
| unit_id = f'day_{day:04d}' | |
| group = test.assign(unit_id, {'volatility_regime': regime}) | |
| # Simulate returns | |
| if group == 'A': | |
| # Baseline: mean = 0.8 * daily_vol | |
| ret = np.random.normal(0.8 * daily_vol, daily_vol) | |
| else: | |
| # Better: mean = 1.2 * daily_vol | |
| ret = np.random.normal(1.2 * daily_vol, daily_vol) | |
| # Guardrails | |
| guardrails = { | |
| 'max_drawdown': abs(np.random.exponential(0.02)), | |
| 'daily_vol': abs(np.random.normal(daily_vol, daily_vol * 0.3)) | |
| } | |
| test.record_result(unit_id, group, ret, guardrails) | |
| # Analysis | |
| analysis = test.analyze(test_type='t_test') | |
| print(f"\n3. STATISTICAL RESULTS") | |
| print(f" Group A (n={analysis['n_a']}): mean={analysis['mean_a']:.6f}") | |
| print(f" Group B (n={analysis['n_b']}): mean={analysis['mean_b']:.6f}") | |
| print(f" Difference: {analysis['mean_b'] - analysis['mean_a']:+.6f}") | |
| print(f" Cohen's d: {analysis['cohens_d']:.3f}") | |
| print(f" P-value: {analysis['p_value']:.4f}") | |
| print(f" Significant: {'β YES' if analysis['significant'] else 'β NO'}") | |
| print(f" RECOMMENDATION: {analysis['recommendation']}") | |
| # Guardrails | |
| guardrail_check = test.guardrail_check() | |
| print(f"\n4. GUARDRAIL CHECK") | |
| print(f" Safe: {'β YES' if guardrail_check['is_safe'] else 'β VIOLATIONS'}") | |
| # Multiple comparison | |
| print(f"\n5. MULTIPLE COMPARISON CORRECTION") | |
| p_values = np.array([analysis['p_value'], 0.03, 0.08, 0.001, 0.12, 0.04]) | |
| bh_sig = MultipleComparisonCorrection.benjamini_hochberg(p_values) | |
| print(f" Raw significant: {np.sum(p_values < 0.05)}/{len(p_values)}") | |
| print(f" BH-FDR significant: {np.sum(bh_sig)}/{len(p_values)}") | |
| # Full report | |
| print(f"\n6. FULL REPORT") | |
| print(test.summary_report()) | |
| # Sequential test | |
| print(f"7. SEQUENTIAL TESTING") | |
| seq_test = SequentialABTest(config) | |
| for i in range(200): | |
| group = 'A' if np.random.rand() < 0.5 else 'B' | |
| value = np.random.normal(0.8 * daily_vol if group == 'A' else 1.2 * daily_vol, daily_vol) | |
| result = seq_test.update(group, value) | |
| if result['can_stop']: | |
| print(f" Sequential test STOPPED at n={result['n']}") | |
| print(f" Adjusted p-value: {result['adjusted_p_value']:.4f}") | |
| break | |
| print(f"\n KEY TAKEAWAYS:") | |
| print(f" - Always A/B test before deploying") | |
| print(f" - Multiple comparison correction prevents false discoveries") | |
| print(f" - Guardrail metrics prevent hidden risk increases") | |
| print(f" - Sequential testing enables early stopping (with valid p-values)") | |
| print(f" - Power analysis ensures tests aren't underpowered") | |
| print(f" - This is EXACTLY how Jane Street validates every strategy change") | |