#!/usr/bin/env python3 """ Enhanced Experimentation Framework Comprehensive platform for running controlled experiments and accelerating innovation """ import asyncio import logging import random import time from collections.abc import Callable from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any logger = logging.getLogger(__name__) class ExperimentStatus(Enum): PROPOSED = "proposed" APPROVED = "approved" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class ExperimentType(Enum): A_B_TEST = "a_b_test" MULTIVARIATE = "multivariate" FEATURE_FLAG = "feature_flag" CANARY_DEPLOYMENT = "canary_deployment" CHAOS_ENGINEERING = "chaos_engineering" PERFORMANCE_TEST = "performance_test" @dataclass class ExperimentVariant: """A/B test variant""" variant_id: str name: str configuration: dict[str, Any] traffic_percentage: float metrics: dict[str, float] = field(default_factory=dict) @dataclass class Experiment: """Enhanced experiment representation""" experiment_id: str title: str hypothesis: str type: ExperimentType variants: list[ExperimentVariant] success_criteria: list[str] status: ExperimentStatus owner: str created_at: datetime started_at: datetime | None = None completed_at: datetime | None = None results: dict[str, Any] | None = None lessons_learned: list[str] = field(default_factory=list) statistical_significance: float | None = None confidence_interval: tuple[float, float] | None = None class EnhancedExperimentationPlatform: """Comprehensive experimentation platform with advanced features""" def __init__(self): self.experiments: dict[str, Experiment] = {} self._background_tasks: list[asyncio.Task] = [] self.experiment_templates: dict[str, dict] = {} self.metrics_collectors: dict[str, Callable] = {} self.auto_scaling_enabled = True self.confidence_threshold = 0.95 self.min_sample_size = 1000 self._initialize_experiment_templates() self._setup_metrics_collection() def _initialize_experiment_templates(self): """Initialize experiment templates for rapid experimentation""" self.experiment_templates = { "a_b_feature_rollout": { "type": ExperimentType.A_B_TEST, "description": "Standard A/B test for feature rollout", "variants": [ {"name": "control", "percentage": 50}, {"name": "treatment", "percentage": 50}, ], "metrics": ["conversion_rate", "user_engagement", "error_rate"], "duration_days": 14, "min_sample_size": 5000, }, "canary_deployment": { "type": ExperimentType.CANARY_DEPLOYMENT, "description": "Gradual rollout with automated rollback", "variants": [ {"name": "baseline", "percentage": 95}, {"name": "canary", "percentage": 5}, ], "metrics": ["response_time", "error_rate", "resource_usage"], "duration_days": 7, "auto_rollback_triggers": ["error_rate > 0.05", "response_time > 2.0"], }, "chaos_experiment": { "type": ExperimentType.CHAOS_ENGINEERING, "description": "Controlled failure injection testing", "variants": [{"name": "baseline", "percentage": 100}], "metrics": ["system_stability", "recovery_time", "error_handling"], "duration_minutes": 30, "failure_types": ["cpu_stress", "memory_pressure", "network_partition"], }, } def _setup_metrics_collection(self): """Setup automated metrics collection""" self.metrics_collectors = { "conversion_rate": self._collect_conversion_metrics, "user_engagement": self._collect_engagement_metrics, "error_rate": self._collect_error_metrics, "response_time": self._collect_performance_metrics, "resource_usage": self._collect_resource_metrics, "system_stability": self._collect_stability_metrics, } async def create_experiment_from_template( self, template_name: str, title: str, hypothesis: str, owner: str ) -> Experiment: """Rapid experiment creation from template""" if template_name not in self.experiment_templates: raise ValueError(f"Template {template_name} not found") template = self.experiment_templates[template_name] experiment_id = f"exp_{int(time.time())}_{template_name}" # Create variants from template variants = [] for i, variant_config in enumerate(template["variants"]): variant = ExperimentVariant( variant_id=f"{experiment_id}_v{i}", name=variant_config["name"], configuration={}, traffic_percentage=variant_config["percentage"], ) variants.append(variant) experiment = Experiment( experiment_id=experiment_id, title=title, hypothesis=hypothesis, type=template["type"], variants=variants, success_criteria=template.get("success_criteria", []), status=ExperimentStatus.PROPOSED, owner=owner, created_at=datetime.now(), ) self.experiments[experiment_id] = experiment logger.info(f"Created experiment {experiment_id} from template {template_name}") return experiment async def start_experiment(self, experiment_id: str) -> bool: """Start experiment with automated setup""" if experiment_id not in self.experiments: return False experiment = self.experiments[experiment_id] # Auto-approve for rapid experimentation if experiment.status == ExperimentStatus.PROPOSED: experiment.status = ExperimentStatus.APPROVED experiment.status = ExperimentStatus.RUNNING experiment.started_at = datetime.now() # Setup automated monitoring monitor_task = asyncio.create_task(self._monitor_experiment(experiment_id)) self._background_tasks.append(monitor_task) logger.info(f"Started experiment: {experiment_id}") return True async def _monitor_experiment(self, experiment_id: str): """Automated experiment monitoring and analysis""" experiment = self.experiments[experiment_id] while experiment.status == ExperimentStatus.RUNNING: await asyncio.sleep(300) # Check every 5 minutes # Collect metrics for all variants for variant in experiment.variants: for metric_name in self.metrics_collectors: if metric_name in experiment.results.get("target_metrics", []): try: value = await self.metrics_collectors[metric_name]( variant.variant_id ) variant.metrics[metric_name] = value except Exception as e: logger.error( f"Failed to collect {metric_name} for {variant.variant_id}: {e}" ) # Check for statistical significance if self._check_statistical_significance(experiment): await self._complete_experiment( experiment_id, "Statistical significance achieved" ) break # Check auto-rollback conditions for canary deployments if experiment.type == ExperimentType.CANARY_DEPLOYMENT: if self._should_rollback(experiment): await self._rollback_experiment( experiment_id, "Auto-rollback triggered" ) break def _check_statistical_significance(self, experiment: Experiment) -> bool: """Check if experiment results are statistically significant""" if len(experiment.variants) < 2: return False # Simplified statistical significance check # In production, use proper statistical tests (t-test, chi-square, etc.) control_variant = next( (v for v in experiment.variants if v.name == "control"), experiment.variants[0], ) treatment_variant = next( (v for v in experiment.variants if v.name != control_variant.name), None ) if not treatment_variant: return False # Check if we have minimum sample size if sum(control_variant.metrics.values()) < self.min_sample_size: return False # Calculate effect size and confidence # This is a simplified implementation effect_size = abs( treatment_variant.metrics.get("conversion_rate", 0) - control_variant.metrics.get("conversion_rate", 0) ) # Assume significance if effect size > 5% and confidence > threshold return effect_size > 0.05 and random.random() > (1 - self.confidence_threshold) def _should_rollback(self, experiment: Experiment) -> bool: """Check if canary deployment should be rolled back""" canary_variant = next( (v for v in experiment.variants if v.name == "canary"), None ) if not canary_variant: return False # Check rollback triggers error_rate = canary_variant.metrics.get("error_rate", 0) response_time = canary_variant.metrics.get("response_time", 0) return error_rate > 0.05 or response_time > 2.0 async def _complete_experiment(self, experiment_id: str, reason: str): """Complete experiment and analyze results""" experiment = self.experiments[experiment_id] experiment.status = ExperimentStatus.COMPLETED experiment.completed_at = datetime.now() # Analyze results results = self._analyze_experiment_results(experiment) experiment.results = results experiment.statistical_significance = results.get("significance", 0.0) logger.info(f"Completed experiment {experiment_id}: {reason}") async def _rollback_experiment(self, experiment_id: str, reason: str): """Rollback failed canary deployment""" experiment = self.experiments[experiment_id] experiment.status = ExperimentStatus.FAILED experiment.completed_at = datetime.now() experiment.lessons_learned.append(f"Auto-rollback: {reason}") logger.warning(f"Rolled back experiment {experiment_id}: {reason}") def _analyze_experiment_results(self, experiment: Experiment) -> dict[str, Any]: """Analyze experiment results and generate insights""" analysis = { "winner": None, "effect_size": 0.0, "significance": 0.0, "insights": [], "recommendations": [], } if len(experiment.variants) >= 2: # Find best performing variant best_variant = max( experiment.variants, key=lambda v: v.metrics.get("conversion_rate", 0) ) analysis["winner"] = best_variant.name # Calculate effect size (simplified) control = next( (v for v in experiment.variants if v.name == "control"), experiment.variants[0], ) analysis["effect_size"] = abs( best_variant.metrics.get("conversion_rate", 0) - control.metrics.get("conversion_rate", 0) ) # Generate insights if analysis["effect_size"] > 0.1: analysis["insights"].append("Strong positive effect detected") analysis["recommendations"].append("Roll out winning variant to 100%") elif analysis["effect_size"] > 0.05: analysis["insights"].append("Moderate effect detected") analysis["recommendations"].append("Consider gradual rollout") else: analysis["insights"].append("No significant effect detected") analysis["recommendations"].append( "Reconsider hypothesis or test different variants" ) return analysis # Metrics collection methods (simplified implementations) async def _collect_conversion_metrics(self, variant_id: str) -> float: """Collect conversion rate metrics""" # In production, integrate with analytics platform return random.uniform(0.02, 0.08) async def _collect_engagement_metrics(self, variant_id: str) -> float: """Collect user engagement metrics""" return random.uniform(0.1, 0.5) async def _collect_error_metrics(self, variant_id: str) -> float: """Collect error rate metrics""" return random.uniform(0.001, 0.01) async def _collect_performance_metrics(self, variant_id: str) -> float: """Collect response time metrics""" return random.uniform(0.5, 2.5) async def _collect_resource_metrics(self, variant_id: str) -> float: """Collect resource usage metrics""" return random.uniform(0.3, 0.9) async def _collect_stability_metrics(self, variant_id: str) -> float: """Collect system stability metrics""" return random.uniform(0.85, 0.99) async def get_experiment_status(self, experiment_id: str) -> dict[str, Any]: """Get comprehensive experiment status""" if experiment_id not in self.experiments: return {"error": "Experiment not found"} experiment = self.experiments[experiment_id] return { "experiment_id": experiment.experiment_id, "title": experiment.title, "status": experiment.status.value, "progress": self._calculate_progress(experiment), "variants": [ { "name": v.name, "traffic_percentage": v.traffic_percentage, "metrics": v.metrics, } for v in experiment.variants ], "results": experiment.results, "statistical_significance": experiment.statistical_significance, } def _calculate_progress(self, experiment: Experiment) -> float: """Calculate experiment progress (0.0 to 1.0)""" if experiment.status != ExperimentStatus.RUNNING: return 1.0 if experiment.status == ExperimentStatus.COMPLETED else 0.0 # Simplified progress calculation based on sample size total_samples = sum(sum(v.metrics.values()) for v in experiment.variants) target_samples = self.min_sample_size * len(experiment.variants) return min(1.0, total_samples / target_samples) # Global instance enhanced_experimentation_platform = EnhancedExperimentationPlatform()