Spaces:
Sleeping
Sleeping
| """ | |
| Configuration models for GEPA Optimizer | |
| """ | |
| import os | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional, Dict, Any, Union, Tuple | |
| class ModelConfig: | |
| """Configuration for any LLM provider""" | |
| provider: str # Required: "openai", "anthropic", "huggingface", "vllm", etc. | |
| model_name: str # Required: actual model name | |
| api_key: str # Required: API key for the provider | |
| base_url: Optional[str] = None # Optional: custom endpoint URL | |
| temperature: float = 0.7 | |
| max_tokens: int = 2048 | |
| top_p: float = 1.0 | |
| frequency_penalty: float = 0.0 | |
| presence_penalty: float = 0.0 | |
| def __post_init__(self): | |
| """Validate required fields after initialization""" | |
| if not self.provider: | |
| raise ValueError("Provider is required (e.g., 'openai', 'anthropic', 'huggingface')") | |
| if not self.model_name: | |
| raise ValueError("Model name is required (e.g., 'gpt-4', 'claude-3-opus')") | |
| if not self.api_key: | |
| raise ValueError(f"API key is required for {self.provider} provider") | |
| def from_string(cls, model_string: str) -> 'ModelConfig': | |
| """Create ModelConfig from string like 'openai/gpt-4' or 'gpt-4'""" | |
| if "/" in model_string: | |
| provider, model_name = model_string.split("/", 1) | |
| else: | |
| # Default to OpenAI if no provider specified | |
| provider = "openai" | |
| model_name = model_string | |
| # Get API key from environment | |
| api_key = cls._get_api_key_for_provider(provider) | |
| if not api_key: | |
| raise ValueError( | |
| f"No API key found for {provider}. Please set {provider.upper()}_API_KEY environment variable" | |
| ) | |
| return cls( | |
| provider=provider, | |
| model_name=model_name, | |
| api_key=api_key | |
| ) | |
| def from_dict(cls, config_dict: dict) -> 'ModelConfig': | |
| """Create ModelConfig from dictionary""" | |
| return cls(**config_dict) | |
| def to_dict(self) -> dict: | |
| """Convert ModelConfig to dictionary""" | |
| return { | |
| 'provider': self.provider, | |
| 'model_name': self.model_name, | |
| 'api_key': self.api_key, | |
| 'base_url': self.base_url, | |
| 'temperature': self.temperature, | |
| 'max_tokens': self.max_tokens, | |
| 'top_p': self.top_p, | |
| 'frequency_penalty': self.frequency_penalty, | |
| 'presence_penalty': self.presence_penalty | |
| } | |
| def _get_api_key_for_provider(provider: str) -> Optional[str]: | |
| """Get API key for provider from environment variables""" | |
| env_var_map = { | |
| "openai": "OPENAI_API_KEY", | |
| "anthropic": "ANTHROPIC_API_KEY", | |
| "huggingface": "HUGGINGFACE_API_KEY", | |
| "cohere": "COHERE_API_KEY", | |
| "ai21": "AI21_API_KEY", | |
| "together": "TOGETHER_API_KEY", | |
| "replicate": "REPLICATE_API_TOKEN", | |
| "groq": "GROQ_API_KEY", | |
| "ollama": "OLLAMA_API_KEY" | |
| } | |
| env_var = env_var_map.get(provider.lower()) | |
| if env_var: | |
| return os.getenv(env_var) | |
| # Fallback: try generic pattern | |
| return os.getenv(f"{provider.upper()}_API_KEY") | |
| class DataSplitConfig: | |
| """Configuration for dataset splitting into train/val/test sets | |
| 🔥 ADAPTIVE SPLITTING: Automatically adjusts ratios based on dataset size for optimal results. | |
| - Small datasets (< 15): Prioritizes validation set (70/25/5) for reliable candidate ranking | |
| - Medium datasets (15-50): Balanced split (60/20/20) | |
| - Large datasets (50+): More training data (70/15/15) | |
| """ | |
| # Split ratios (must sum to 1.0) - used as defaults, but adaptive strategy overrides for small datasets | |
| train_ratio: float = 0.6 # 60% for training (Dfeedback - reflection examples) | |
| val_ratio: float = 0.2 # 20% for validation (Dpareto - Pareto selection) | |
| test_ratio: float = 0.2 # 20% for test (held-out final evaluation) | |
| # Minimum samples per split | |
| min_train_samples: int = 3 | |
| min_val_samples: int = 3 # 🔥 INCREASED from 2 to 3 for more reliable validation scores | |
| min_test_samples: int = 1 # 🔥 REDUCED from 2 to 1 (test set less critical, only used once) | |
| # Strategy for handling small datasets | |
| small_dataset_strategy: str = 'adaptive' # 🔥 DEFAULT: 'adaptive', 'duplicate_val', 'no_test', 'error' | |
| def __post_init__(self): | |
| """Validate split configuration""" | |
| total = self.train_ratio + self.val_ratio + self.test_ratio | |
| if not (0.99 <= total <= 1.01): # Allow small floating point errors | |
| raise ValueError( | |
| f"Split ratios must sum to 1.0, got {total:.3f} " | |
| f"(train={self.train_ratio}, val={self.val_ratio}, test={self.test_ratio})" | |
| ) | |
| if self.train_ratio <= 0 or self.val_ratio <= 0 or self.test_ratio < 0: | |
| raise ValueError("Split ratios must be positive (test_ratio can be 0 to disable)") | |
| if self.small_dataset_strategy not in {'adaptive', 'duplicate_val', 'no_test', 'error'}: | |
| raise ValueError( | |
| f"Invalid small_dataset_strategy: {self.small_dataset_strategy}. " | |
| f"Must be 'adaptive', 'duplicate_val', 'no_test', or 'error'" | |
| ) | |
| def get_adaptive_ratios(self, dataset_size: int) -> Tuple[float, float, float]: | |
| """ | |
| 🔥 NEW: Get adaptive split ratios based on dataset size. | |
| For prompt optimization: | |
| - Small datasets (< 15): Prioritize validation (70/25/5) for reliable candidate ranking | |
| - Medium (15-50): Balanced (60/20/20) | |
| - Large (50+): More training (70/15/15) | |
| Args: | |
| dataset_size: Total number of samples in dataset | |
| Returns: | |
| Tuple of (train_ratio, val_ratio, test_ratio) | |
| """ | |
| if dataset_size < 15: | |
| # Small dataset: Prioritize validation for reliable candidate ranking | |
| # Validation set is CRITICAL - used for every candidate evaluation | |
| return (0.70, 0.25, 0.05) # 70% train, 25% val, 5% test | |
| elif dataset_size < 50: | |
| # Medium dataset: Balanced split | |
| return (0.60, 0.20, 0.20) # 60% train, 20% val, 20% test | |
| else: | |
| # Large dataset: More training data, can reduce validation/test | |
| return (0.70, 0.15, 0.15) # 70% train, 15% val, 15% test | |
| def get_split_indices(self, dataset_size: int) -> Tuple[int, int, int, int]: | |
| """ | |
| Calculate split indices for a dataset with adaptive ratios. | |
| 🔥 ADAPTIVE SPLITTING: Automatically adjusts ratios based on dataset size. | |
| This ensures optimal allocation: | |
| - Small datasets: More validation samples for reliable ranking | |
| - Medium datasets: Balanced split | |
| - Large datasets: More training data | |
| Args: | |
| dataset_size: Total number of samples in dataset | |
| Returns: | |
| Tuple of (train_end, val_end, test_end, dataset_size) indices | |
| Raises: | |
| ValueError: If dataset is too small for configured splits | |
| """ | |
| # 🔥 NEW: Use adaptive ratios if strategy is 'adaptive' | |
| if self.small_dataset_strategy == 'adaptive': | |
| train_ratio, val_ratio, test_ratio = self.get_adaptive_ratios(dataset_size) | |
| else: | |
| train_ratio, val_ratio, test_ratio = self.train_ratio, self.val_ratio, self.test_ratio | |
| if dataset_size < self.min_train_samples + self.min_val_samples: | |
| if self.small_dataset_strategy == 'error': | |
| raise ValueError( | |
| f"Dataset too small ({dataset_size} samples). " | |
| f"Need at least {self.min_train_samples + self.min_val_samples} samples." | |
| ) | |
| # Calculate ideal split points with adaptive ratios | |
| train_end = max(self.min_train_samples, int(dataset_size * train_ratio)) | |
| val_end = train_end + max(self.min_val_samples, int(dataset_size * val_ratio)) | |
| # Adjust for small datasets | |
| if val_end >= dataset_size: | |
| if self.small_dataset_strategy in {'adaptive', 'duplicate_val'}: | |
| # Ensure minimum validation samples, use remainder for test | |
| val_end = min(dataset_size, train_end + self.min_val_samples) | |
| test_end = dataset_size | |
| elif self.small_dataset_strategy == 'no_test': | |
| # No test set for small datasets | |
| val_end = dataset_size | |
| test_end = dataset_size | |
| else: # error | |
| raise ValueError( | |
| f"Dataset too small ({dataset_size} samples) for train/val/test split. " | |
| f"Need at least {self.min_train_samples + self.min_val_samples + self.min_test_samples} samples." | |
| ) | |
| else: | |
| test_end = dataset_size | |
| return train_end, val_end, test_end, dataset_size | |
| class OptimizationConfig: | |
| """Configuration class for GEPA optimization process""" | |
| # Core models - REQUIRED by user | |
| model: Union[str, ModelConfig] # No default - user must specify | |
| reflection_model: Union[str, ModelConfig] # No default - user must specify | |
| # Optimization parameters - REQUIRED by user | |
| max_iterations: int # No default - user decides their budget | |
| max_metric_calls: int # No default - user sets their budget | |
| batch_size: int # No default - user decides based on memory | |
| # Dataset splitting configuration | |
| data_split: DataSplitConfig = field(default_factory=DataSplitConfig) | |
| # Reflection settings (separate from evaluation batch_size) | |
| reflection_examples: int = 3 # Number of examples for each reflection (small!) | |
| # Optional optimization settings with sensible fallbacks | |
| early_stopping: bool = True | |
| learning_rate: float = 0.01 | |
| # Multi-objective optimization | |
| multi_objective: bool = False | |
| objectives: List[str] = field(default_factory=lambda: ["accuracy"]) | |
| # Advanced settings | |
| custom_metrics: Optional[Dict[str, Any]] = None | |
| use_cache: bool = True | |
| parallel_evaluation: bool = False | |
| # Backwards compatibility (deprecated) | |
| train_split_ratio: Optional[float] = None # Use data_split instead | |
| min_dataset_size: int = 2 | |
| # Cost and budget - user controlled | |
| max_cost_usd: Optional[float] = None | |
| timeout_seconds: Optional[int] = None | |
| # GEPA-specific optimization parameters (based on actual GEPA library) | |
| candidate_selection_strategy: str = 'pareto' # Use Pareto selection strategy | |
| skip_perfect_score: bool = False # Don't skip perfect scores (set to True for early stopping) | |
| reflection_minibatch_size: Optional[int] = None # Will use reflection_examples if None | |
| perfect_score: float = 1.0 # Perfect score threshold | |
| module_selector: str = 'round_robin' # Component selection strategy | |
| verbose: bool = True # Enable detailed GEPA logging | |
| # Test set evaluation | |
| evaluate_on_test: bool = True # Evaluate final prompt on held-out test set | |
| # 🆕 LLEGO Genetic Operator Parameters (Optional - for faster convergence) | |
| # Based on ICLR 2025 paper: "Decision Tree Induction Through LLMs via Semantically-Aware Evolution" | |
| # Optimized for small datasets (6-10 samples) | |
| use_llego_operators: bool = False # Enable LLEGO genetic operators | |
| # 🔥 HYBRID MODE: Combine GEPA Reflection + LLEGO Operators | |
| # When both enabled, candidates are generated from BOTH sources for maximum diversity | |
| enable_gepa_reflection_with_llego: bool = False # Enable hybrid GEPA+LLEGO mode | |
| num_gepa_reflection_candidates: int = 3 # Number of GEPA reflection candidates per iteration (default: 3 for better exploration, range: 2-5) | |
| # Fitness-guided crossover parameters (FIX #3: Conservative alpha) | |
| alpha: float = 0.05 # FIX #3: Fitness extrapolation (0.05 = 5% above best parent, realistic for prompt optimization) | |
| n_crossover: int = 2 # Number of offspring from crossover per iteration | |
| # Diversity-guided mutation parameters | |
| tau: float = 8.0 # Diversity temperature (8.0 = moderate diversity, balanced exploration/exploitation) | |
| nu: int = 3 # Parent arity (3 parents optimal for small populations ~6 samples) | |
| n_mutation: int = 2 # Number of offspring from mutation per iteration (total 4 offspring with crossover) | |
| # Population management (for genetic operators) | |
| population_size: int = 8 # Size of prompt population (small but diverse for 6-sample dataset) | |
| # 🆕 LLM-as-Judge configuration (Phase 2) | |
| use_llm_as_judge: bool = True # Enable LLM-as-Judge feedback for detailed, actionable analysis | |
| llm_as_judge_threshold: float = 0.8 # Use LLM-as-Judge for scores below this threshold | |
| llm_as_judge_model: Optional[ModelConfig] = None # Optional: use different model (defaults to reflection_model) | |
| # 🆕 Logging configuration (Phase 3) | |
| log_level: str = "INFO" # Logging level: "DEBUG", "INFO", "WARNING", "ERROR" | |
| def __post_init__(self): | |
| """Validate and process configuration after initialization""" | |
| # Handle backwards compatibility for train_split_ratio | |
| if self.train_split_ratio is not None and self.train_split_ratio != 0.8: | |
| import warnings | |
| warnings.warn( | |
| "train_split_ratio is deprecated. Use data_split=DataSplitConfig(...) instead. " | |
| "Converting to 3-way split with your ratio.", | |
| DeprecationWarning, | |
| stacklevel=2 | |
| ) | |
| # Convert 2-way split to 3-way: use train_ratio, split remainder between val/test | |
| remainder = 1.0 - self.train_split_ratio | |
| self.data_split = DataSplitConfig( | |
| train_ratio=self.train_split_ratio, | |
| val_ratio=remainder * 0.5, | |
| test_ratio=remainder * 0.5 | |
| ) | |
| # Convert string models to ModelConfig objects | |
| self.model = self._parse_model_config(self.model, "model") | |
| self.reflection_model = self._parse_model_config(self.reflection_model, "reflection_model") | |
| # Set reflection_minibatch_size default | |
| if self.reflection_minibatch_size is None: | |
| self.reflection_minibatch_size = self.reflection_examples | |
| # Validate required parameters | |
| self._validate_required_params() | |
| # Validate ranges | |
| self._validate_ranges() | |
| def _parse_model_config(self, model: Union[str, ModelConfig], field_name: str) -> ModelConfig: | |
| """Parse string model specification into ModelConfig""" | |
| if isinstance(model, ModelConfig): | |
| return model | |
| if isinstance(model, str): | |
| # Parse "provider/model-name" format | |
| if "/" in model: | |
| provider, model_name = model.split("/", 1) | |
| else: | |
| # Default to openai if no provider specified | |
| provider = "openai" | |
| model_name = model | |
| # Try to get API key from environment | |
| api_key = self._get_api_key_for_provider(provider) | |
| if not api_key: | |
| raise ValueError( | |
| f"No API key found for {provider}. Please set environment variable " | |
| f"or provide ModelConfig with api_key for {field_name}" | |
| ) | |
| return ModelConfig( | |
| provider=provider, | |
| model_name=model_name, | |
| api_key=api_key | |
| ) | |
| raise ValueError(f"{field_name} must be either a string or ModelConfig object") | |
| def _get_api_key_for_provider(self, provider: str) -> Optional[str]: | |
| """Get API key for provider from environment variables""" | |
| return ModelConfig._get_api_key_for_provider(provider) | |
| def _validate_required_params(self): | |
| """Validate that all required parameters are provided""" | |
| required_fields = { | |
| "max_iterations": self.max_iterations, | |
| "max_metric_calls": self.max_metric_calls, | |
| "batch_size": self.batch_size, | |
| } | |
| for field_name, value in required_fields.items(): | |
| if value is None: | |
| raise ValueError(f"{field_name} is required and must be specified by user") | |
| def _validate_ranges(self): | |
| """Validate parameter ranges""" | |
| if self.max_iterations <= 0: | |
| raise ValueError("max_iterations must be positive") | |
| if self.max_metric_calls <= 0: | |
| raise ValueError("max_metric_calls must be positive") | |
| if self.batch_size <= 0: | |
| raise ValueError("batch_size must be positive") | |
| if self.reflection_examples <= 0 or self.reflection_examples > 10: | |
| raise ValueError("reflection_examples must be between 1 and 10 (recommended: 2-5)") | |
| if self.reflection_minibatch_size <= 0: | |
| raise ValueError("reflection_minibatch_size must be positive") | |
| if hasattr(self.model, 'max_tokens') and self.model.max_tokens <= 0: | |
| raise ValueError("model.max_tokens must be a positive integer") | |
| # Validate hybrid mode parameters | |
| if self.enable_gepa_reflection_with_llego and not self.use_llego_operators: | |
| raise ValueError("enable_gepa_reflection_with_llego requires use_llego_operators=True") | |
| if self.num_gepa_reflection_candidates <= 0 or self.num_gepa_reflection_candidates > 5: | |
| raise ValueError("num_gepa_reflection_candidates must be between 1 and 5 (recommended: 3 for balanced exploration)") | |
| # Validate log_level | |
| valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR"] | |
| if self.log_level.upper() not in valid_log_levels: | |
| raise ValueError(f"log_level must be one of {valid_log_levels}, got: {self.log_level}") | |
| def validate_api_connectivity(self) -> Dict[str, bool]: | |
| """Test API connectivity for both models""" | |
| results = {} | |
| for model_name, model_config in [("model", self.model), ("reflection_model", self.reflection_model)]: | |
| try: | |
| # This would be implemented to actually test the API | |
| # For now, just check if we have the required info | |
| if model_config.api_key and model_config.provider and model_config.model_name: | |
| results[model_name] = True | |
| else: | |
| results[model_name] = False | |
| except Exception: | |
| results[model_name] = False | |
| return results | |
| def get_estimated_cost(self) -> Dict[str, Any]: | |
| """Estimate cost based on configuration""" | |
| # This would calculate estimated costs based on: | |
| # - max_metric_calls | |
| # - model pricing | |
| # - expected tokens per call | |
| return { | |
| "max_calls": self.max_metric_calls, | |
| "estimated_cost_range": "To be calculated based on provider pricing", | |
| "cost_factors": { | |
| "model_calls": self.max_metric_calls, | |
| "reflection_calls": self.max_iterations, | |
| "batch_size": self.batch_size | |
| } | |
| } | |
| def create_example_config(cls, provider: str = "openai") -> str: | |
| """Generate example configuration code for users""" | |
| examples = { | |
| "openai": ''' | |
| # Example OpenAI Configuration | |
| config = OptimizationConfig( | |
| model="openai/gpt-4-turbo", # or ModelConfig(...) | |
| reflection_model="openai/gpt-4-turbo", | |
| max_iterations=50, # Your choice based on budget | |
| max_metric_calls=300, # Your choice based on budget | |
| batch_size=8, # Your choice based on memory | |
| early_stopping=True, | |
| learning_rate=0.01 | |
| ) | |
| ''', | |
| "anthropic": ''' | |
| # Example Anthropic Configuration | |
| config = OptimizationConfig( | |
| model=ModelConfig( | |
| provider="anthropic", | |
| model_name="claude-3-opus-20240229", | |
| api_key="your-anthropic-key", | |
| temperature=0.7 | |
| ), | |
| reflection_model="anthropic/claude-3-sonnet-20240229", | |
| max_iterations=30, | |
| max_metric_calls=200, | |
| batch_size=4 | |
| ) | |
| ''', | |
| "mixed": ''' | |
| # Example Mixed Providers Configuration | |
| config = OptimizationConfig( | |
| model="openai/gpt-4-turbo", # Main model | |
| reflection_model="anthropic/claude-3-opus", # Reflection model | |
| max_iterations=25, | |
| max_metric_calls=250, | |
| batch_size=6, | |
| max_cost_usd=100.0, # Budget limit | |
| timeout_seconds=3600 # 1 hour limit | |
| ) | |
| ''' | |
| } | |
| return examples.get(provider, examples["openai"]) | |