Suhasdev's picture
Deploy Universal Prompt Optimizer to HF Spaces (clean)
cacd4d0
"""
Configuration models for GEPA Optimizer
"""
import os
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any, Union, Tuple
@dataclass
class ModelConfig:
"""Configuration for any LLM provider"""
provider: str # Required: "openai", "anthropic", "huggingface", "vllm", etc.
model_name: str # Required: actual model name
api_key: str # Required: API key for the provider
base_url: Optional[str] = None # Optional: custom endpoint URL
temperature: float = 0.7
max_tokens: int = 2048
top_p: float = 1.0
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
def __post_init__(self):
"""Validate required fields after initialization"""
if not self.provider:
raise ValueError("Provider is required (e.g., 'openai', 'anthropic', 'huggingface')")
if not self.model_name:
raise ValueError("Model name is required (e.g., 'gpt-4', 'claude-3-opus')")
if not self.api_key:
raise ValueError(f"API key is required for {self.provider} provider")
@classmethod
def from_string(cls, model_string: str) -> 'ModelConfig':
"""Create ModelConfig from string like 'openai/gpt-4' or 'gpt-4'"""
if "/" in model_string:
provider, model_name = model_string.split("/", 1)
else:
# Default to OpenAI if no provider specified
provider = "openai"
model_name = model_string
# Get API key from environment
api_key = cls._get_api_key_for_provider(provider)
if not api_key:
raise ValueError(
f"No API key found for {provider}. Please set {provider.upper()}_API_KEY environment variable"
)
return cls(
provider=provider,
model_name=model_name,
api_key=api_key
)
@classmethod
def from_dict(cls, config_dict: dict) -> 'ModelConfig':
"""Create ModelConfig from dictionary"""
return cls(**config_dict)
def to_dict(self) -> dict:
"""Convert ModelConfig to dictionary"""
return {
'provider': self.provider,
'model_name': self.model_name,
'api_key': self.api_key,
'base_url': self.base_url,
'temperature': self.temperature,
'max_tokens': self.max_tokens,
'top_p': self.top_p,
'frequency_penalty': self.frequency_penalty,
'presence_penalty': self.presence_penalty
}
@staticmethod
def _get_api_key_for_provider(provider: str) -> Optional[str]:
"""Get API key for provider from environment variables"""
env_var_map = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"huggingface": "HUGGINGFACE_API_KEY",
"cohere": "COHERE_API_KEY",
"ai21": "AI21_API_KEY",
"together": "TOGETHER_API_KEY",
"replicate": "REPLICATE_API_TOKEN",
"groq": "GROQ_API_KEY",
"ollama": "OLLAMA_API_KEY"
}
env_var = env_var_map.get(provider.lower())
if env_var:
return os.getenv(env_var)
# Fallback: try generic pattern
return os.getenv(f"{provider.upper()}_API_KEY")
@dataclass
class DataSplitConfig:
"""Configuration for dataset splitting into train/val/test sets
🔥 ADAPTIVE SPLITTING: Automatically adjusts ratios based on dataset size for optimal results.
- Small datasets (< 15): Prioritizes validation set (70/25/5) for reliable candidate ranking
- Medium datasets (15-50): Balanced split (60/20/20)
- Large datasets (50+): More training data (70/15/15)
"""
# Split ratios (must sum to 1.0) - used as defaults, but adaptive strategy overrides for small datasets
train_ratio: float = 0.6 # 60% for training (Dfeedback - reflection examples)
val_ratio: float = 0.2 # 20% for validation (Dpareto - Pareto selection)
test_ratio: float = 0.2 # 20% for test (held-out final evaluation)
# Minimum samples per split
min_train_samples: int = 3
min_val_samples: int = 3 # 🔥 INCREASED from 2 to 3 for more reliable validation scores
min_test_samples: int = 1 # 🔥 REDUCED from 2 to 1 (test set less critical, only used once)
# Strategy for handling small datasets
small_dataset_strategy: str = 'adaptive' # 🔥 DEFAULT: 'adaptive', 'duplicate_val', 'no_test', 'error'
def __post_init__(self):
"""Validate split configuration"""
total = self.train_ratio + self.val_ratio + self.test_ratio
if not (0.99 <= total <= 1.01): # Allow small floating point errors
raise ValueError(
f"Split ratios must sum to 1.0, got {total:.3f} "
f"(train={self.train_ratio}, val={self.val_ratio}, test={self.test_ratio})"
)
if self.train_ratio <= 0 or self.val_ratio <= 0 or self.test_ratio < 0:
raise ValueError("Split ratios must be positive (test_ratio can be 0 to disable)")
if self.small_dataset_strategy not in {'adaptive', 'duplicate_val', 'no_test', 'error'}:
raise ValueError(
f"Invalid small_dataset_strategy: {self.small_dataset_strategy}. "
f"Must be 'adaptive', 'duplicate_val', 'no_test', or 'error'"
)
def get_adaptive_ratios(self, dataset_size: int) -> Tuple[float, float, float]:
"""
🔥 NEW: Get adaptive split ratios based on dataset size.
For prompt optimization:
- Small datasets (< 15): Prioritize validation (70/25/5) for reliable candidate ranking
- Medium (15-50): Balanced (60/20/20)
- Large (50+): More training (70/15/15)
Args:
dataset_size: Total number of samples in dataset
Returns:
Tuple of (train_ratio, val_ratio, test_ratio)
"""
if dataset_size < 15:
# Small dataset: Prioritize validation for reliable candidate ranking
# Validation set is CRITICAL - used for every candidate evaluation
return (0.70, 0.25, 0.05) # 70% train, 25% val, 5% test
elif dataset_size < 50:
# Medium dataset: Balanced split
return (0.60, 0.20, 0.20) # 60% train, 20% val, 20% test
else:
# Large dataset: More training data, can reduce validation/test
return (0.70, 0.15, 0.15) # 70% train, 15% val, 15% test
def get_split_indices(self, dataset_size: int) -> Tuple[int, int, int, int]:
"""
Calculate split indices for a dataset with adaptive ratios.
🔥 ADAPTIVE SPLITTING: Automatically adjusts ratios based on dataset size.
This ensures optimal allocation:
- Small datasets: More validation samples for reliable ranking
- Medium datasets: Balanced split
- Large datasets: More training data
Args:
dataset_size: Total number of samples in dataset
Returns:
Tuple of (train_end, val_end, test_end, dataset_size) indices
Raises:
ValueError: If dataset is too small for configured splits
"""
# 🔥 NEW: Use adaptive ratios if strategy is 'adaptive'
if self.small_dataset_strategy == 'adaptive':
train_ratio, val_ratio, test_ratio = self.get_adaptive_ratios(dataset_size)
else:
train_ratio, val_ratio, test_ratio = self.train_ratio, self.val_ratio, self.test_ratio
if dataset_size < self.min_train_samples + self.min_val_samples:
if self.small_dataset_strategy == 'error':
raise ValueError(
f"Dataset too small ({dataset_size} samples). "
f"Need at least {self.min_train_samples + self.min_val_samples} samples."
)
# Calculate ideal split points with adaptive ratios
train_end = max(self.min_train_samples, int(dataset_size * train_ratio))
val_end = train_end + max(self.min_val_samples, int(dataset_size * val_ratio))
# Adjust for small datasets
if val_end >= dataset_size:
if self.small_dataset_strategy in {'adaptive', 'duplicate_val'}:
# Ensure minimum validation samples, use remainder for test
val_end = min(dataset_size, train_end + self.min_val_samples)
test_end = dataset_size
elif self.small_dataset_strategy == 'no_test':
# No test set for small datasets
val_end = dataset_size
test_end = dataset_size
else: # error
raise ValueError(
f"Dataset too small ({dataset_size} samples) for train/val/test split. "
f"Need at least {self.min_train_samples + self.min_val_samples + self.min_test_samples} samples."
)
else:
test_end = dataset_size
return train_end, val_end, test_end, dataset_size
@dataclass
class OptimizationConfig:
"""Configuration class for GEPA optimization process"""
# Core models - REQUIRED by user
model: Union[str, ModelConfig] # No default - user must specify
reflection_model: Union[str, ModelConfig] # No default - user must specify
# Optimization parameters - REQUIRED by user
max_iterations: int # No default - user decides their budget
max_metric_calls: int # No default - user sets their budget
batch_size: int # No default - user decides based on memory
# Dataset splitting configuration
data_split: DataSplitConfig = field(default_factory=DataSplitConfig)
# Reflection settings (separate from evaluation batch_size)
reflection_examples: int = 3 # Number of examples for each reflection (small!)
# Optional optimization settings with sensible fallbacks
early_stopping: bool = True
learning_rate: float = 0.01
# Multi-objective optimization
multi_objective: bool = False
objectives: List[str] = field(default_factory=lambda: ["accuracy"])
# Advanced settings
custom_metrics: Optional[Dict[str, Any]] = None
use_cache: bool = True
parallel_evaluation: bool = False
# Backwards compatibility (deprecated)
train_split_ratio: Optional[float] = None # Use data_split instead
min_dataset_size: int = 2
# Cost and budget - user controlled
max_cost_usd: Optional[float] = None
timeout_seconds: Optional[int] = None
# GEPA-specific optimization parameters (based on actual GEPA library)
candidate_selection_strategy: str = 'pareto' # Use Pareto selection strategy
skip_perfect_score: bool = False # Don't skip perfect scores (set to True for early stopping)
reflection_minibatch_size: Optional[int] = None # Will use reflection_examples if None
perfect_score: float = 1.0 # Perfect score threshold
module_selector: str = 'round_robin' # Component selection strategy
verbose: bool = True # Enable detailed GEPA logging
# Test set evaluation
evaluate_on_test: bool = True # Evaluate final prompt on held-out test set
# 🆕 LLEGO Genetic Operator Parameters (Optional - for faster convergence)
# Based on ICLR 2025 paper: "Decision Tree Induction Through LLMs via Semantically-Aware Evolution"
# Optimized for small datasets (6-10 samples)
use_llego_operators: bool = False # Enable LLEGO genetic operators
# 🔥 HYBRID MODE: Combine GEPA Reflection + LLEGO Operators
# When both enabled, candidates are generated from BOTH sources for maximum diversity
enable_gepa_reflection_with_llego: bool = False # Enable hybrid GEPA+LLEGO mode
num_gepa_reflection_candidates: int = 3 # Number of GEPA reflection candidates per iteration (default: 3 for better exploration, range: 2-5)
# Fitness-guided crossover parameters (FIX #3: Conservative alpha)
alpha: float = 0.05 # FIX #3: Fitness extrapolation (0.05 = 5% above best parent, realistic for prompt optimization)
n_crossover: int = 2 # Number of offspring from crossover per iteration
# Diversity-guided mutation parameters
tau: float = 8.0 # Diversity temperature (8.0 = moderate diversity, balanced exploration/exploitation)
nu: int = 3 # Parent arity (3 parents optimal for small populations ~6 samples)
n_mutation: int = 2 # Number of offspring from mutation per iteration (total 4 offspring with crossover)
# Population management (for genetic operators)
population_size: int = 8 # Size of prompt population (small but diverse for 6-sample dataset)
# 🆕 LLM-as-Judge configuration (Phase 2)
use_llm_as_judge: bool = True # Enable LLM-as-Judge feedback for detailed, actionable analysis
llm_as_judge_threshold: float = 0.8 # Use LLM-as-Judge for scores below this threshold
llm_as_judge_model: Optional[ModelConfig] = None # Optional: use different model (defaults to reflection_model)
# 🆕 Logging configuration (Phase 3)
log_level: str = "INFO" # Logging level: "DEBUG", "INFO", "WARNING", "ERROR"
def __post_init__(self):
"""Validate and process configuration after initialization"""
# Handle backwards compatibility for train_split_ratio
if self.train_split_ratio is not None and self.train_split_ratio != 0.8:
import warnings
warnings.warn(
"train_split_ratio is deprecated. Use data_split=DataSplitConfig(...) instead. "
"Converting to 3-way split with your ratio.",
DeprecationWarning,
stacklevel=2
)
# Convert 2-way split to 3-way: use train_ratio, split remainder between val/test
remainder = 1.0 - self.train_split_ratio
self.data_split = DataSplitConfig(
train_ratio=self.train_split_ratio,
val_ratio=remainder * 0.5,
test_ratio=remainder * 0.5
)
# Convert string models to ModelConfig objects
self.model = self._parse_model_config(self.model, "model")
self.reflection_model = self._parse_model_config(self.reflection_model, "reflection_model")
# Set reflection_minibatch_size default
if self.reflection_minibatch_size is None:
self.reflection_minibatch_size = self.reflection_examples
# Validate required parameters
self._validate_required_params()
# Validate ranges
self._validate_ranges()
def _parse_model_config(self, model: Union[str, ModelConfig], field_name: str) -> ModelConfig:
"""Parse string model specification into ModelConfig"""
if isinstance(model, ModelConfig):
return model
if isinstance(model, str):
# Parse "provider/model-name" format
if "/" in model:
provider, model_name = model.split("/", 1)
else:
# Default to openai if no provider specified
provider = "openai"
model_name = model
# Try to get API key from environment
api_key = self._get_api_key_for_provider(provider)
if not api_key:
raise ValueError(
f"No API key found for {provider}. Please set environment variable "
f"or provide ModelConfig with api_key for {field_name}"
)
return ModelConfig(
provider=provider,
model_name=model_name,
api_key=api_key
)
raise ValueError(f"{field_name} must be either a string or ModelConfig object")
def _get_api_key_for_provider(self, provider: str) -> Optional[str]:
"""Get API key for provider from environment variables"""
return ModelConfig._get_api_key_for_provider(provider)
def _validate_required_params(self):
"""Validate that all required parameters are provided"""
required_fields = {
"max_iterations": self.max_iterations,
"max_metric_calls": self.max_metric_calls,
"batch_size": self.batch_size,
}
for field_name, value in required_fields.items():
if value is None:
raise ValueError(f"{field_name} is required and must be specified by user")
def _validate_ranges(self):
"""Validate parameter ranges"""
if self.max_iterations <= 0:
raise ValueError("max_iterations must be positive")
if self.max_metric_calls <= 0:
raise ValueError("max_metric_calls must be positive")
if self.batch_size <= 0:
raise ValueError("batch_size must be positive")
if self.reflection_examples <= 0 or self.reflection_examples > 10:
raise ValueError("reflection_examples must be between 1 and 10 (recommended: 2-5)")
if self.reflection_minibatch_size <= 0:
raise ValueError("reflection_minibatch_size must be positive")
if hasattr(self.model, 'max_tokens') and self.model.max_tokens <= 0:
raise ValueError("model.max_tokens must be a positive integer")
# Validate hybrid mode parameters
if self.enable_gepa_reflection_with_llego and not self.use_llego_operators:
raise ValueError("enable_gepa_reflection_with_llego requires use_llego_operators=True")
if self.num_gepa_reflection_candidates <= 0 or self.num_gepa_reflection_candidates > 5:
raise ValueError("num_gepa_reflection_candidates must be between 1 and 5 (recommended: 3 for balanced exploration)")
# Validate log_level
valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR"]
if self.log_level.upper() not in valid_log_levels:
raise ValueError(f"log_level must be one of {valid_log_levels}, got: {self.log_level}")
def validate_api_connectivity(self) -> Dict[str, bool]:
"""Test API connectivity for both models"""
results = {}
for model_name, model_config in [("model", self.model), ("reflection_model", self.reflection_model)]:
try:
# This would be implemented to actually test the API
# For now, just check if we have the required info
if model_config.api_key and model_config.provider and model_config.model_name:
results[model_name] = True
else:
results[model_name] = False
except Exception:
results[model_name] = False
return results
def get_estimated_cost(self) -> Dict[str, Any]:
"""Estimate cost based on configuration"""
# This would calculate estimated costs based on:
# - max_metric_calls
# - model pricing
# - expected tokens per call
return {
"max_calls": self.max_metric_calls,
"estimated_cost_range": "To be calculated based on provider pricing",
"cost_factors": {
"model_calls": self.max_metric_calls,
"reflection_calls": self.max_iterations,
"batch_size": self.batch_size
}
}
@classmethod
def create_example_config(cls, provider: str = "openai") -> str:
"""Generate example configuration code for users"""
examples = {
"openai": '''
# Example OpenAI Configuration
config = OptimizationConfig(
model="openai/gpt-4-turbo", # or ModelConfig(...)
reflection_model="openai/gpt-4-turbo",
max_iterations=50, # Your choice based on budget
max_metric_calls=300, # Your choice based on budget
batch_size=8, # Your choice based on memory
early_stopping=True,
learning_rate=0.01
)
''',
"anthropic": '''
# Example Anthropic Configuration
config = OptimizationConfig(
model=ModelConfig(
provider="anthropic",
model_name="claude-3-opus-20240229",
api_key="your-anthropic-key",
temperature=0.7
),
reflection_model="anthropic/claude-3-sonnet-20240229",
max_iterations=30,
max_metric_calls=200,
batch_size=4
)
''',
"mixed": '''
# Example Mixed Providers Configuration
config = OptimizationConfig(
model="openai/gpt-4-turbo", # Main model
reflection_model="anthropic/claude-3-opus", # Reflection model
max_iterations=25,
max_metric_calls=250,
batch_size=6,
max_cost_usd=100.0, # Budget limit
timeout_seconds=3600 # 1 hour limit
)
'''
}
return examples.get(provider, examples["openai"])