import numpy as np import scipy.stats as stats from typing import Dict, List, Union, Callable, Any, Optional from tinytroupe.experimentation import logger class StatisticalTester: """ A class to perform statistical tests on experiment results. To do so, a control is defined, and then one or more treatments are compared to the control. The class supports various statistical tests, including t-tests, Mann-Whitney U tests, and ANOVA. The user can specify the type of test to run, the significance level, and the specific metrics to analyze. The results of the tests are returned in a structured format. """ def __init__(self, control_experiment_data: Dict[str, list], treatments_experiment_data: Dict[str, Dict[str, list]], results_key:str = None): """ Initialize with experiment results. Args: control_experiment_data (dict): Dictionary containing control experiment results with keys as metric names and values as lists of values. e.g.,{"control_exp": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4], ...}} treatments_experiment_data (dict): Dictionary containing experiment results with keys as experiment IDs and values as dicts of metric names to lists of values. e.g., {"exp1": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}, "exp2": {"metric1": [0.5, 0.6], "metric2": [0.7, 0.8]}, ...} """ # if results_key is provided, use it to extract the relevant data from the control and treatment data # e.g., {"exp1": {"results": {"metric1": [0.1, 0.2], "metric2": [0.3, 0.4]}} if results_key: control_experiment_data = {k: v[results_key] for k, v in control_experiment_data.items()} treatments_experiment_data = {k: v[results_key] for k, v in treatments_experiment_data.items()} self.control_experiment_data = control_experiment_data self.treatments_experiment_data = treatments_experiment_data # Validate input data self._validate_input_data() def _validate_input_data(self): """Validate the input data formats and structure.""" # Check that control and treatments are dictionaries if not isinstance(self.control_experiment_data, dict): raise TypeError("Control experiment data must be a dictionary") if not isinstance(self.treatments_experiment_data, dict): raise TypeError("Treatments experiment data must be a dictionary") # Check that control has at least one experiment if not self.control_experiment_data: raise ValueError("Control experiment data cannot be empty") # Check only one control if len(self.control_experiment_data) > 1: raise ValueError("Only one control experiment is allowed") # Validate control experiment structure for control_id, control_metrics in self.control_experiment_data.items(): if not isinstance(control_metrics, dict): raise TypeError(f"Metrics for control experiment '{control_id}' must be a dictionary") # Check that the metrics dictionary is not empty if not control_metrics: raise ValueError(f"Control experiment '{control_id}' has no metrics") # Validate that metric values are lists for metric, values in control_metrics.items(): if not isinstance(values, list): raise TypeError(f"Values for metric '{metric}' in control experiment '{control_id}' must be a list") # Check treatments have at least one experiment if not self.treatments_experiment_data: raise ValueError("Treatments experiment data cannot be empty") # Validate treatment experiment structure for treatment_id, treatment_data in self.treatments_experiment_data.items(): if not isinstance(treatment_data, dict): raise TypeError(f"Data for treatment '{treatment_id}' must be a dictionary") # Check that the metrics dictionary is not empty if not treatment_data: raise ValueError(f"Treatment '{treatment_id}' has no metrics") # Get all control metrics for overlap checking all_control_metrics = set() for control_metrics in self.control_experiment_data.values(): all_control_metrics.update(control_metrics.keys()) # Check if there's any overlap between control and treatment metrics common_metrics = all_control_metrics.intersection(set(treatment_data.keys())) if not common_metrics: logger.warning(f"Treatment '{treatment_id}' has no metrics in common with any control experiment") # Check that treatment metrics are lists for metric, values in treatment_data.items(): if not isinstance(values, list): raise TypeError(f"Values for metric '{metric}' in treatment '{treatment_id}' must be a list") def run_test(self, test_type: str="welch_t_test", alpha: float = 0.05, **kwargs) -> Dict[str, Dict[str, Any]]: """ Run the specified statistical test on the control and treatments data. Args: test_type (str): Type of statistical test to run. Options: 't_test', 'welch_t_test', 'mann_whitney', 'anova', 'chi_square', 'ks_test' alpha (float): Significance level, defaults to 0.05 **kwargs: Additional arguments for specific test types. Returns: dict: Dictionary containing the results of the statistical tests for each treatment (vs the one control). Each key is the treatment ID and each value is a dictionary with test results. """ supported_tests = { 't_test': self._run_t_test, 'welch_t_test': self._run_welch_t_test, 'mann_whitney': self._run_mann_whitney, 'anova': self._run_anova, 'chi_square': self._run_chi_square, 'ks_test': self._run_ks_test } if test_type not in supported_tests: raise ValueError(f"Unsupported test type: {test_type}. Supported types: {list(supported_tests.keys())}") results = {} for control_id, control_data in self.control_experiment_data.items(): # get all metrics from control data metrics = set() metrics.update(control_data.keys()) for treatment_id, treatment_data in self.treatments_experiment_data.items(): results[treatment_id] = {} for metric in metrics: # Skip metrics not in treatment data if metric not in treatment_data: logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'") continue control_values = control_data[metric] treatment_values = treatment_data[metric] # Skip if either control or treatment has no values if len(control_values) == 0 or len(treatment_values) == 0: logger.warning(f"Skipping metric '{metric}' for treatment '{treatment_id}' due to empty values") continue # Run the selected test and convert to JSON serializable types test_result = supported_tests[test_type](control_values, treatment_values, alpha, **kwargs) results[treatment_id][metric] = convert_to_serializable(test_result) return results def _run_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]: """Run Student's t-test (equal variance assumed).""" # Convert to numpy arrays for calculations control = np.array(control_values, dtype=float) treatment = np.array(treatment_values, dtype=float) # Calculate basic statistics control_mean = np.mean(control) treatment_mean = np.mean(treatment) mean_diff = treatment_mean - control_mean # Run the t-test t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=True) # Calculate confidence interval control_std = np.std(control, ddof=1) treatment_std = np.std(treatment, ddof=1) pooled_std = np.sqrt(((len(control) - 1) * control_std**2 + (len(treatment) - 1) * treatment_std**2) / (len(control) + len(treatment) - 2)) se = pooled_std * np.sqrt(1/len(control) + 1/len(treatment)) critical_value = stats.t.ppf(1 - alpha/2, len(control) + len(treatment) - 2) margin_error = critical_value * se ci_lower = mean_diff - margin_error ci_upper = mean_diff + margin_error # Determine if the result is significant significant = p_value < alpha return { 'test_type': 'Student t-test (equal variance)', 'control_mean': control_mean, 'treatment_mean': treatment_mean, 'mean_difference': mean_diff, 'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'), 't_statistic': t_stat, 'p_value': p_value, 'confidence_interval': (ci_lower, ci_upper), 'confidence_level': 1 - alpha, 'significant': significant, 'control_sample_size': len(control), 'treatment_sample_size': len(treatment), 'control_std': control_std, 'treatment_std': treatment_std, 'effect_size': cohen_d(control, treatment) } def _run_welch_t_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]: """Run Welch's t-test (unequal variance).""" # Convert to numpy arrays for calculations control = np.array(control_values, dtype=float) treatment = np.array(treatment_values, dtype=float) # Calculate basic statistics control_mean = np.mean(control) treatment_mean = np.mean(treatment) mean_diff = treatment_mean - control_mean # Run Welch's t-test t_stat, p_value = stats.ttest_ind(control, treatment, equal_var=False) # Calculate confidence interval (for Welch's t-test) control_var = np.var(control, ddof=1) treatment_var = np.var(treatment, ddof=1) # Calculate effective degrees of freedom (Welch-Satterthwaite equation) v_num = (control_var/len(control) + treatment_var/len(treatment))**2 v_denom = (control_var/len(control))**2/(len(control)-1) + (treatment_var/len(treatment))**2/(len(treatment)-1) df = v_num / v_denom if v_denom > 0 else float('inf') se = np.sqrt(control_var/len(control) + treatment_var/len(treatment)) critical_value = stats.t.ppf(1 - alpha/2, df) margin_error = critical_value * se ci_lower = mean_diff - margin_error ci_upper = mean_diff + margin_error control_std = np.std(control, ddof=1) treatment_std = np.std(treatment, ddof=1) # Determine if the result is significant significant = p_value < alpha return { 'test_type': 'Welch t-test (unequal variance)', 'control_mean': control_mean, 'treatment_mean': treatment_mean, 'mean_difference': mean_diff, 'percent_change': (mean_diff / control_mean * 100) if control_mean != 0 else float('inf'), 't_statistic': t_stat, 'p_value': p_value, 'confidence_interval': (ci_lower, ci_upper), 'confidence_level': 1 - alpha, 'significant': significant, 'degrees_of_freedom': df, 'control_sample_size': len(control), 'treatment_sample_size': len(treatment), 'control_std': control_std, 'treatment_std': treatment_std, 'effect_size': cohen_d(control, treatment) } def _run_mann_whitney(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]: """Run Mann-Whitney U test (non-parametric test).""" # Convert to numpy arrays control = np.array(control_values, dtype=float) treatment = np.array(treatment_values, dtype=float) # Calculate basic statistics control_median = np.median(control) treatment_median = np.median(treatment) median_diff = treatment_median - control_median # Run the Mann-Whitney U test u_stat, p_value = stats.mannwhitneyu(control, treatment, alternative='two-sided') # Calculate common language effect size # (probability that a randomly selected value from treatment is greater than control) count = 0 for tc in treatment: for cc in control: if tc > cc: count += 1 cles = count / (len(treatment) * len(control)) # Calculate approximate confidence interval using bootstrap try: from scipy.stats import bootstrap def median_diff_func(x, y): return np.median(x) - np.median(y) res = bootstrap((control, treatment), median_diff_func, confidence_level=1-alpha, n_resamples=1000, random_state=42) ci_lower, ci_upper = res.confidence_interval except ImportError: # If bootstrap is not available, return None for confidence interval ci_lower, ci_upper = None, None logger.warning("SciPy bootstrap not available, skipping confidence interval calculation") # Determine if the result is significant significant = p_value < alpha return { 'test_type': 'Mann-Whitney U test', 'control_median': control_median, 'treatment_median': treatment_median, 'median_difference': median_diff, 'percent_change': (median_diff / control_median * 100) if control_median != 0 else float('inf'), 'u_statistic': u_stat, 'p_value': p_value, 'confidence_interval': (ci_lower, ci_upper) if ci_lower is not None else None, 'confidence_level': 1 - alpha, 'significant': significant, 'control_sample_size': len(control), 'treatment_sample_size': len(treatment), 'effect_size': cles } def _run_anova(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]: """Run one-way ANOVA test.""" # For ANOVA, we typically need multiple groups, but we can still run it with just two # Convert to numpy arrays control = np.array(control_values, dtype=float) treatment = np.array(treatment_values, dtype=float) # Run one-way ANOVA f_stat, p_value = stats.f_oneway(control, treatment) # Calculate effect size (eta-squared) total_values = np.concatenate([control, treatment]) grand_mean = np.mean(total_values) ss_total = np.sum((total_values - grand_mean) ** 2) ss_between = (len(control) * (np.mean(control) - grand_mean) ** 2 + len(treatment) * (np.mean(treatment) - grand_mean) ** 2) eta_squared = ss_between / ss_total if ss_total > 0 else 0 # Determine if the result is significant significant = p_value < alpha return { 'test_type': 'One-way ANOVA', 'f_statistic': f_stat, 'p_value': p_value, 'significant': significant, 'control_sample_size': len(control), 'treatment_sample_size': len(treatment), 'effect_size': eta_squared, 'effect_size_type': 'eta_squared' } def _run_chi_square(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]: """Run Chi-square test for categorical data.""" # For chi-square, we assume the values represent counts in different categories # Convert to numpy arrays control = np.array(control_values, dtype=float) treatment = np.array(treatment_values, dtype=float) # Check if the arrays are the same length (same number of categories) if len(control) != len(treatment): raise ValueError("Control and treatment must have the same number of categories for chi-square test") # Run chi-square test contingency_table = np.vstack([control, treatment]) chi2_stat, p_value, dof, expected = stats.chi2_contingency(contingency_table) # Calculate Cramer's V as effect size n = np.sum(contingency_table) min_dim = min(contingency_table.shape) - 1 cramers_v = np.sqrt(chi2_stat / (n * min_dim)) if n * min_dim > 0 else 0 # Determine if the result is significant significant = p_value < alpha return { 'test_type': 'Chi-square test', 'chi2_statistic': chi2_stat, 'p_value': p_value, 'degrees_of_freedom': dof, 'significant': significant, 'effect_size': cramers_v, 'effect_size_type': 'cramers_v' } def check_assumptions(self, metric: str) -> Dict[str, Dict[str, Any]]: """ Check statistical assumptions for the given metric across all treatments. Args: metric (str): The metric to check assumptions for. Returns: dict: Dictionary with results of assumption checks for each treatment. """ if metric not in self.control_experiment_data: raise ValueError(f"Metric '{metric}' not found in control data") results = {} control_values = np.array(self.control_experiment_data[metric], dtype=float) # Check normality of control control_shapiro = stats.shapiro(control_values) control_normality = { 'test': 'Shapiro-Wilk', 'statistic': control_shapiro[0], 'p_value': control_shapiro[1], 'normal': control_shapiro[1] >= 0.05 } for treatment_id, treatment_data in self.treatments_experiment_data.items(): if metric not in treatment_data: logger.warning(f"Metric '{metric}' not found in treatment '{treatment_id}'") continue treatment_values = np.array(treatment_data[metric], dtype=float) # Check normality of treatment treatment_shapiro = stats.shapiro(treatment_values) treatment_normality = { 'test': 'Shapiro-Wilk', 'statistic': treatment_shapiro[0], 'p_value': treatment_shapiro[1], 'normal': treatment_shapiro[1] >= 0.05 } # Check homogeneity of variance levene_test = stats.levene(control_values, treatment_values) variance_homogeneity = { 'test': 'Levene', 'statistic': levene_test[0], 'p_value': levene_test[1], 'equal_variance': levene_test[1] >= 0.05 } # Store results and convert to JSON serializable types results[treatment_id] = convert_to_serializable({ 'control_normality': control_normality, 'treatment_normality': treatment_normality, 'variance_homogeneity': variance_homogeneity, 'recommended_test': self._recommend_test(control_normality['normal'], treatment_normality['normal'], variance_homogeneity['equal_variance']) }) return results def _recommend_test(self, control_normal: bool, treatment_normal: bool, equal_variance: bool) -> str: """Recommend a statistical test based on assumption checks.""" if control_normal and treatment_normal: if equal_variance: return 't_test' else: return 'welch_t_test' else: return 'mann_whitney' def _run_ks_test(self, control_values: list, treatment_values: list, alpha: float, **kwargs) -> Dict[str, Any]: """ Run Kolmogorov-Smirnov test to compare distributions. This test compares the empirical cumulative distribution functions (ECDFs) of two samples to determine if they come from the same distribution. It's particularly useful for: - Categorical responses (e.g., "Yes"/"No"/"Maybe") when converted to ordinal values - Continuous data where you want to compare entire distributions, not just means - Detecting differences in distribution shape, spread, or location """ # Convert to numpy arrays control = np.array(control_values, dtype=float) treatment = np.array(treatment_values, dtype=float) # Calculate basic statistics control_median = np.median(control) treatment_median = np.median(treatment) control_mean = np.mean(control) treatment_mean = np.mean(treatment) # Run the Kolmogorov-Smirnov test ks_stat, p_value = stats.ks_2samp(control, treatment) # Calculate distribution characteristics control_std = np.std(control, ddof=1) treatment_std = np.std(treatment, ddof=1) # Calculate effect size using the KS statistic itself as a measure # KS statistic ranges from 0 (identical distributions) to 1 (completely different) effect_size = ks_stat # Additional distribution comparison metrics # Calculate overlap coefficient (area under the minimum of two PDFs) try: # Create histograms for overlap calculation combined_range = np.linspace( min(np.min(control), np.min(treatment)), max(np.max(control), np.max(treatment)), 50 ) control_hist, _ = np.histogram(control, bins=combined_range, density=True) treatment_hist, _ = np.histogram(treatment, bins=combined_range, density=True) # Calculate overlap (intersection over union-like metric) overlap = np.sum(np.minimum(control_hist, treatment_hist)) / np.sum(np.maximum(control_hist, treatment_hist)) overlap = overlap if not np.isnan(overlap) else 0.0 except: overlap = None # Calculate percentile differences for additional insights percentiles = [25, 50, 75, 90, 95] percentile_diffs = {} for p in percentiles: control_p = np.percentile(control, p) treatment_p = np.percentile(treatment, p) percentile_diffs[f"p{p}_diff"] = treatment_p - control_p # Determine significance significant = p_value < alpha return { 'test_type': 'Kolmogorov-Smirnov test', 'control_mean': control_mean, 'treatment_mean': treatment_mean, 'control_median': control_median, 'treatment_median': treatment_median, 'control_std': control_std, 'treatment_std': treatment_std, 'ks_statistic': ks_stat, 'p_value': p_value, 'significant': significant, 'control_sample_size': len(control), 'treatment_sample_size': len(treatment), 'effect_size': effect_size, 'overlap_coefficient': overlap, 'percentile_differences': percentile_diffs, 'interpretation': self._interpret_ks_result(ks_stat, significant), 'confidence_level': 1 - alpha } def _interpret_ks_result(self, ks_stat: float, significant: bool) -> str: """Provide interpretation of KS test results.""" if not significant: return "No significant difference between distributions" if ks_stat < 0.1: return "Very small difference between distributions" elif ks_stat < 0.25: return "Small difference between distributions" elif ks_stat < 0.5: return "Moderate difference between distributions" else: return "Large difference between distributions" def cohen_d(x: Union[list, np.ndarray], y: Union[list, np.ndarray]) -> float: """ Calculate Cohen's d effect size for two samples. Args: x: First sample y: Second sample Returns: float: Cohen's d effect size """ nx = len(x) ny = len(y) # Convert to numpy arrays x = np.array(x, dtype=float) y = np.array(y, dtype=float) # Calculate means mx = np.mean(x) my = np.mean(y) # Calculate standard deviations sx = np.std(x, ddof=1) sy = np.std(y, ddof=1) # Pooled standard deviation pooled_sd = np.sqrt(((nx - 1) * sx**2 + (ny - 1) * sy**2) / (nx + ny - 2)) # Cohen's d return (my - mx) / pooled_sd if pooled_sd > 0 else 0 def convert_to_serializable(obj): """ Convert NumPy types to native Python types recursively to ensure JSON serialization works. Args: obj: Any object that might contain NumPy types Returns: Object with NumPy types converted to Python native types """ if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, (np.number, np.bool_)): return obj.item() elif isinstance(obj, dict): return {k: convert_to_serializable(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(i) for i in obj] elif isinstance(obj, tuple): return tuple(convert_to_serializable(i) for i in obj) else: return obj