Suhasdev's picture
Add debug logging to diagnose GEPA candidate generation issue
5b2a612
"""
Main GepaOptimizer class - the heart of the optimization system
"""
import time
import logging
from typing import Any, Dict, List, Optional, Union
import asyncio
import io
import sys
from contextlib import redirect_stdout, redirect_stderr
import gepa
from ..utils.api_keys import APIKeyManager
from .result import ResultProcessor
from ..data.converters import UniversalConverter
from ..models.result import OptimizationResult, OptimizedResult
from ..models.config import OptimizationConfig, ModelConfig
from ..utils.helpers import sanitize_prompt
from ..utils.exceptions import GepaDependencyError, InvalidInputError, DatasetError, GepaOptimizerError
logger = logging.getLogger(__name__)
class GepaOptimizer:
"""
Main class for prompt optimization using GEPA
This is the primary interface that users interact with.
Provides both simple and advanced optimization capabilities.
"""
def __init__(self, config: Optional[OptimizationConfig] = None,
adapter_type: str = "universal",
custom_adapter: Optional[Any] = None,
llm_model_name: Optional[str] = None,
metric_weights: Optional[Dict[str, float]] = None,
**kwargs):
"""
Initialize the optimizer
Args:
config: Optimization configuration (required)
adapter_type: Type of adapter to use ("universal" only - fully configurable)
custom_adapter: Custom adapter instance (overrides adapter_type)
llm_model_name: [Deprecated] Use config.model instead. Will be removed in future versions.
metric_weights: [Deprecated] Not used - evaluator handles metrics. Will be removed in future versions.
**kwargs: Additional parameters for universal adapter (llm_client, evaluator, etc.)
Raises:
ValueError: If required configuration is missing
GepaDependencyError: If GEPA library is not available
"""
if config is None:
raise ValueError("config parameter is required. Use OptimizationConfig to configure the optimizer.")
# Initialize logger first
self.logger = logging.getLogger(__name__)
self.config = config
self.converter = UniversalConverter(data_split_config=config.data_split)
self.api_manager = APIKeyManager()
self.result_processor = ResultProcessor()
# Initialize adapter based on configuration
if custom_adapter:
# User provided custom adapter
from .base_adapter import BaseGepaAdapter
if not isinstance(custom_adapter, BaseGepaAdapter):
raise TypeError("custom_adapter must be an instance of BaseGepaAdapter")
self.adapter = custom_adapter
self.logger.info("Using user-provided custom adapter")
elif adapter_type == "universal":
# Universal adapter requires user to provide components
llm_client = kwargs.get('llm_client')
evaluator = kwargs.get('evaluator')
if not llm_client or not evaluator:
raise ValueError(
"llm_client and evaluator are required for universal adapter. "
"Example: GepaOptimizer(config=config, adapter_type='universal', "
"llm_client=llm_client, evaluator=evaluator)"
)
from .universal_adapter import UniversalGepaAdapter
self.adapter = UniversalGepaAdapter(
llm_client=llm_client,
evaluator=evaluator,
data_converter=kwargs.get('data_converter')
)
self.logger.info("Using universal adapter")
else:
raise ValueError(
f"Unknown adapter_type: {adapter_type}. "
f"Only 'universal' is supported. "
f"Provide llm_client and evaluator when using universal adapter."
)
# Keep backward compatibility
self.custom_adapter = self.adapter
# Log model configuration
model_info = self.adapter.get_performance_stats()
self.logger.info(f"Initialized adapter: {model_info}")
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Validate GEPA availability
if gepa is None:
raise GepaDependencyError("GEPA library is not available. Please install it with: pip install gepa")
async def train(self,
seed_prompt: str,
dataset: Union[List[Any], str, Dict, Any],
**kwargs) -> OptimizedResult:
"""
Main training method for prompt optimization
Args:
seed_prompt: Initial prompt to optimize
dataset: Training data in any format
**kwargs: Additional parameters that can override config
Returns:
OptimizedResult: Optimization result with improved prompt
Raises:
InvalidInputError: For invalid input parameters
DatasetError: For issues with dataset processing
GepaOptimizerError: For optimization failures
"""
start_time = time.time()
session_id = f"opt_{int(start_time)}_{id(self)}"
try:
self.logger.info(f"Starting optimization session: {session_id}")
self.logger.info(f"Using model: {self.config.model.model_name} (provider: {self.config.model.provider})")
# Debug logging removed - not needed in production
# 🔥 FIX E: Reset Pareto logger at start of each optimization run
from ..utils.pareto_logger import reset_pareto_logger
reset_pareto_logger()
self.logger.info("✅ Reset Pareto logger for new optimization run")
# Update config with any overrides from kwargs
self._update_config_from_kwargs(kwargs)
# Step 1: Validate inputs
self._validate_inputs(seed_prompt)
# Step 2: Convert dataset to GEPA format with 3-way split
# 🔥 FIX: Support pre-split datasets (user-provided train/val/test)
if isinstance(dataset, dict) and all(k in dataset for k in ['train', 'val', 'test']):
# User provided pre-split dataset - use it directly
self.logger.info("✅ Detected pre-split dataset - using user's split (no re-splitting)")
trainset_raw = dataset.get('train', [])
valset_raw = dataset.get('val', [])
testset_raw = dataset.get('test', [])
# Still need to standardize the format (convert to GEPA format)
trainset = self.converter._standardize(trainset_raw)
valset = self.converter._standardize(valset_raw)
testset = self.converter._standardize(testset_raw) if testset_raw else []
self.logger.info(
f"Using pre-split dataset: {len(trainset)} train (Dfeedback), "
f"{len(valset)} val (Dpareto), {len(testset)} test (held-out)"
)
else:
# Standard path: convert and split automatically
self.logger.info("Converting dataset to GEPA format with 3-way split...")
trainset, valset, testset = self.converter.convert(
dataset,
split_config=self.config.data_split
)
# Log split with adaptive strategy info
split_strategy = self.config.data_split.small_dataset_strategy
strategy_note = ""
if split_strategy == 'adaptive':
total_size = len(trainset) + len(valset) + len(testset)
train_ratio, val_ratio, test_ratio = self.config.data_split.get_adaptive_ratios(total_size)
strategy_note = f" (adaptive: {train_ratio*100:.0f}%/{val_ratio*100:.0f}%/{test_ratio*100:.0f}% ratios)"
self.logger.info(
f"Dataset split{strategy_note}: {len(trainset)} train (Dfeedback), "
f"{len(valset)} val (Dpareto), {len(testset)} test (held-out)"
)
if not trainset:
raise DatasetError("Dataset appears to be empty after conversion")
# Step 3: Create seed candidate
seed_candidate = self._create_seed_candidate(seed_prompt)
# 🔥 CRITICAL: Set valset info in adapter BEFORE baseline evaluation
# This ensures adapter correctly detects 'dpareto' dataset type
# Use direct assignment (don't rely on hasattr) to ensure attributes are set
try:
self.adapter._valset_size = len(valset) if valset else 0
self.logger.info(f"✅ Set valset_size in adapter: {len(valset) if valset else 0} for Dpareto detection")
except AttributeError:
self.logger.warning("⚠️ Could not set _valset_size in adapter - attribute not supported")
try:
self.adapter._valset = valset
self.logger.info(f"✅ Stored valset in adapter ({len(valset) if valset else 0} samples)")
except AttributeError:
self.logger.warning("⚠️ Could not set _valset in adapter - attribute not supported")
# Step 3.5: Calculate baseline score on VALIDATION set (not test set)
# This ensures fair comparison since optimization uses validation set for Pareto selection
baseline_val_score = None
if valset:
self.logger.info("📊 Evaluating seed prompt on validation set for baseline...")
# Set baseline flag so adapter knows this is baseline, not optimization
# Use direct assignment to ensure the flag is set
try:
self.adapter._is_baseline_evaluation = True
self.logger.info("✅ Set baseline evaluation flag in adapter")
except AttributeError:
self.logger.warning("⚠️ Could not set _is_baseline_evaluation in adapter")
try:
# Evaluate on validation set (same as what GEPA will use for Pareto selection)
eval_result = self.adapter.evaluate(
batch=valset,
candidate=seed_candidate,
capture_traces=False
)
baseline_val_score = sum(eval_result.scores) / len(eval_result.scores) if eval_result.scores else 0.0
self.logger.info(f"📊 Baseline validation score: {baseline_val_score:.4f} (on {len(valset)} samples)")
# Store baseline in adapter for later use
if hasattr(self.adapter, '_baseline_score'):
self.adapter._baseline_score = baseline_val_score
# 🔥 CRITICAL FIX: Also set baseline in Pareto logger
# This ensures candidates can be properly evaluated against baseline
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
pareto_log.set_baseline(baseline_val_score)
self.logger.info(f"✅ Baseline set in Pareto logger: {baseline_val_score:.4f}")
except Exception as e:
self.logger.warning(f"Baseline evaluation failed: {e}")
import traceback
self.logger.debug(f"Baseline evaluation error: {traceback.format_exc()}")
finally:
try:
self.adapter._is_baseline_evaluation = False
self.logger.debug("✅ Reset baseline evaluation flag - optimization can begin")
except AttributeError:
pass # Ignore if attribute not supported
# Step 4: Run GEPA optimization
self.logger.info("Starting GEPA optimization...")
gepa_result, actual_iterations = await self._run_gepa_optimization(
adapter=self.adapter,
seed_candidate=seed_candidate,
trainset=trainset,
valset=valset,
**kwargs
)
# Step 5: Extract best candidate
best_candidate = self._extract_best_candidate(gepa_result)
# 🔥 CRITICAL: Extract optimized prompt from best_candidate
# This is the actual optimized prompt that GEPA found
self.logger.info(f"\n{'═'*80}")
self.logger.info(f"📝 EXTRACTING OPTIMIZED PROMPT FROM GEPA RESULT")
self.logger.info(f"{'═'*80}")
self.logger.info(f"best_candidate keys: {list(best_candidate.keys()) if isinstance(best_candidate, dict) else 'N/A'}")
optimized_prompt = best_candidate.get('system_prompt', seed_prompt)
if not optimized_prompt or optimized_prompt.strip() == '':
# Fallback: try other keys or use seed prompt
optimized_prompt = best_candidate.get('prompt', best_candidate.get('text', seed_prompt))
# Get fitness score if available
best_fitness = best_candidate.get('fitness') or self.adapter.get_best_score() if hasattr(self.adapter, 'get_best_score') else None
candidate_source = best_candidate.get('source', 'unknown')
self.logger.info(f"\n✅ EXTRACTED OPTIMIZED PROMPT:")
self.logger.info(f" Source: {candidate_source}")
if best_fitness is not None:
self.logger.info(f" Fitness: f={best_fitness:.4f}")
self.logger.info(f" Length: {len(optimized_prompt)} characters")
self.logger.info(f" Words: {len(optimized_prompt.split())} words")
self.logger.info(f"\n📝 FULL OPTIMIZED PROMPT TEXT:")
self.logger.info(f"{'─'*80}")
self.logger.info(optimized_prompt)
self.logger.info(f"{'─'*80}")
if optimized_prompt != seed_prompt:
self.logger.info(f"\n✅ SUCCESS: Prompt WAS OPTIMIZED!")
self.logger.info(f" Seed length: {len(seed_prompt)} chars")
self.logger.info(f" Optimized length: {len(optimized_prompt)} chars")
self.logger.info(f" Difference: {len(optimized_prompt) - len(seed_prompt):+d} chars")
if best_fitness is not None:
baseline_fitness = 0.5 # Default baseline, could be improved
improvement = best_fitness - baseline_fitness
improvement_pct = (improvement / baseline_fitness * 100) if baseline_fitness > 0 else 0
self.logger.info(f" Fitness: f={best_fitness:.4f} (improvement: {improvement:+.4f} ({improvement_pct:+.1f}%))")
else:
self.logger.warning(f"\n⚠️ WARNING: Optimized prompt is IDENTICAL to seed prompt")
self.logger.warning(f" This means GEPA didn't modify the prompt during optimization")
if best_fitness is not None:
self.logger.warning(f" Best fitness found: f={best_fitness:.4f}")
self.logger.warning(f" 💡 Check if LLEGO best candidate is being properly extracted")
self.logger.info(f"{'═'*80}\n")
# Step 5.5: Calculate improvement metrics (validation vs validation)
optimized_test_score = None
improvement_data = {}
# 🔥 FIX: Calculate improvement based on VALIDATION scores (fair comparison)
# Compare optimized VALIDATION score vs validation baseline (both on Dpareto)
# This ensures fair comparison - both evaluated on the same validation set
optimized_val_score = best_fitness # Best candidate's fitness is from validation set (Dpareto)
if baseline_val_score is not None and optimized_val_score is not None:
absolute_improvement = optimized_val_score - baseline_val_score
relative_improvement = (
(absolute_improvement / baseline_val_score * 100)
if baseline_val_score > 0 else 0
)
improvement_data = {
'baseline_val_score': baseline_val_score,
'optimized_val_score': optimized_val_score,
'absolute_improvement': absolute_improvement,
'relative_improvement_percent': relative_improvement
}
self.logger.info(
f"📈 Validation improvement: {relative_improvement:+.2f}% "
f"(baseline val: {baseline_val_score:.4f} → optimized val: {optimized_val_score:.4f})"
)
# Step 5.6: Evaluate optimized prompt on test set (if available) for final reporting
if testset and self.config.evaluate_on_test:
self.logger.info("📊 Evaluating optimized prompt on test set...")
# 🔥 CRITICAL FIX: Clear LLEGO candidate queue before test evaluation
# This prevents the LLEGO wrapper from intercepting test evaluation calls
# and returning wrong candidates instead of actually running the optimized prompt
from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient
if hasattr(self.adapter, 'llm_client') and isinstance(self.adapter.llm_client, LLEGOEnhancedLLMClient):
if hasattr(self.adapter.llm_client, '_adapter_generated_candidates'):
self.adapter.llm_client._adapter_generated_candidates = []
self.logger.info("✅ Cleared LLEGO candidate queue for clean test evaluation")
if hasattr(self.adapter.llm_client, '_candidate_queue'):
self.adapter.llm_client._candidate_queue = []
self.logger.info("✅ Cleared LLEGO hybrid candidate queue for clean test evaluation")
# Evaluate on test set for final reporting (but improvement is based on validation)
try:
optimized_test_score = self._evaluate_candidate_on_testset(
best_candidate,
testset
)
self.logger.info(f"📊 Optimized test score: {optimized_test_score:.4f}")
# Add test score to improvement_data for reference (but improvement is based on validation)
improvement_data['optimized_test_score'] = optimized_test_score
if baseline_val_score is not None:
test_vs_baseline = (
((optimized_test_score - baseline_val_score) / baseline_val_score * 100)
if baseline_val_score > 0 else 0
)
self.logger.info(
f"📊 Test set vs validation baseline: {test_vs_baseline:+.2f}% "
f"(baseline val: {baseline_val_score:.4f} → optimized test: {optimized_test_score:.4f})"
)
except Exception as e:
self.logger.warning(f"Test evaluation failed: {e}")
# Step 6: Process results
optimization_time = time.time() - start_time
processed_result = self.result_processor.process_full_result(
result=gepa_result,
original_prompt=seed_prompt,
optimization_time=optimization_time,
actual_iterations=actual_iterations,
test_metrics=improvement_data # Add test metrics
)
# Merge improvement data
final_improvement_data = {**processed_result.get('improvement_data', {}), **improvement_data}
# Step 7: Create result objects
# 🔥 CRITICAL: Use extracted optimized_prompt instead of processed_result
result = OptimizedResult(
original_prompt=seed_prompt,
optimized_prompt=optimized_prompt, # Use extracted prompt, not processed_result!
improvement_data=final_improvement_data,
optimization_time=optimization_time,
dataset_size=len(trainset) + len(valset) + len(testset),
total_iterations=processed_result.get('total_iterations', 0),
status=processed_result.get('status', 'completed'),
error_message=processed_result.get('error_message'),
detailed_result=OptimizationResult(
session_id=session_id,
original_prompt=seed_prompt,
optimized_prompt=optimized_prompt, # Use extracted prompt!
improvement_data=final_improvement_data,
optimization_time=optimization_time,
dataset_size=len(trainset) + len(valset) + len(testset),
total_iterations=processed_result.get('total_iterations', 0),
status=processed_result.get('status', 'completed'),
error_message=processed_result.get('error_message')
)
)
self.logger.info(f"✅ Optimization completed in {optimization_time:.2f}s")
return result
except Exception as e:
optimization_time = time.time() - start_time
error_msg = f"Optimization failed: {str(e)}"
self.logger.error(error_msg)
# Return failed result
return OptimizedResult(
original_prompt=seed_prompt,
optimized_prompt=seed_prompt, # Return original on failure
improvement_data={'error': error_msg},
optimization_time=optimization_time,
dataset_size=0,
total_iterations=0,
status='failed',
error_message=error_msg
)
def _update_config_from_kwargs(self, kwargs: Dict[str, Any]) -> None:
"""Update configuration with runtime overrides from kwargs."""
updated_params = []
for key, value in kwargs.items():
if hasattr(self.config, key):
setattr(self.config, key, value)
updated_params.append(f"{key}={value}")
else:
self.logger.warning(f"Unknown parameter '{key}' ignored")
if updated_params:
self.logger.info(f"Updated config parameters: {', '.join(updated_params)}")
def _validate_inputs(self, seed_prompt: str) -> None:
"""
Validate input parameters for optimization
Args:
seed_prompt: The seed prompt to validate
Raises:
InvalidInputError: If validation fails
"""
if not seed_prompt or not isinstance(seed_prompt, str):
raise InvalidInputError("Seed prompt must be a non-empty string")
if len(seed_prompt.strip()) < 10:
raise InvalidInputError("Seed prompt is too short (minimum 10 characters)")
# Validate model configuration
model_config = self.config.model
if not hasattr(model_config, 'model_name') or not model_config.model_name:
raise InvalidInputError("Model name is required")
reflection_config = self.config.reflection_model
if not hasattr(reflection_config, 'model_name') or not reflection_config.model_name:
raise InvalidInputError("Reflection model name is required")
def _clean_reflection_prompt(self, prompt: str, max_length: int = 50000) -> str:
"""
Clean reflection prompt by removing base64 images and truncating if too long.
🔥 CRITICAL: GEPA's reflective dataset includes base64 images which create
massive prompts (7MB+) that exceed token limits. This function:
1. Strips all base64 image data
2. Removes excessive detailed_scores entries
3. Truncates to reasonable size
4. Preserves essential feedback information
Args:
prompt: Original prompt from GEPA (may contain base64)
max_length: Maximum length after cleaning (default: 50K chars)
Returns:
Cleaned prompt without base64, within size limits
"""
import re
# Step 1: Remove base64 image strings (typically very long alphanumeric strings)
# Base64 images are usually 50K+ characters of A-Za-z0-9+/= pattern
# Look for very long base64-like sequences
base64_pattern = r'[A-Za-z0-9+/=]{5000,}' # Sequences of 5000+ base64 chars
cleaned = re.sub(base64_pattern, '[IMAGE_DATA_REMOVED]', prompt)
# Step 2: Remove detailed_scores sections that might contain base64 references
# These are usually in markdown format: "### detailed_scores\n...base64..."
detailed_scores_pattern = r'### detailed_scores[^\n]*\n[^#]*(?:image_base64|base64)[^\n]*(?:\n[^#]*)*'
cleaned = re.sub(detailed_scores_pattern, '### detailed_scores: [REMOVED_FOR_BREVITY]', cleaned, flags=re.IGNORECASE | re.MULTILINE)
# Step 3: Remove any remaining image_base64 references
cleaned = re.sub(r'image_base64[^\n]*', 'image_base64: [REMOVED]', cleaned, flags=re.IGNORECASE)
cleaned = re.sub(r'"[A-Za-z0-9+/=]{10000,}"', '[LARGE_DATA_STRING_REMOVED]', cleaned) # Very long strings likely base64
# Step 4: Truncate if still too long (keep the beginning which usually has the most important info)
if len(cleaned) > max_length:
# Keep first part (usually contains prompt and key feedback)
# Add truncation notice
truncated_size = len(cleaned) - max_length
cleaned = cleaned[:max_length] + f"\n\n[TRUNCATED {truncated_size} characters of detailed evaluation data]"
self.logger.warning(f"⚠️ Prompt truncated: {len(prompt)}{len(cleaned)} chars")
return cleaned
def _validate_models(self, task_lm, reflection_lm):
"""
Validate if specified models are supported.
Note: No hardcoded restrictions - the API provider will validate model existence.
This method is kept for potential future validation logic but doesn't restrict users.
"""
# No hardcoded model restrictions - users can specify any model
# The API provider will handle validation and return errors if model doesn't exist
self.logger.debug(f"Using task model: {task_lm}, reflection model: {reflection_lm}")
def _create_seed_candidate(self, seed_prompt: str) -> Dict[str, str]:
"""Create a seed candidate from the input prompt."""
sanitized_prompt = sanitize_prompt(seed_prompt)
return {'system_prompt': sanitized_prompt}
async def _run_gepa_optimization(self, adapter, seed_candidate: Any, trainset: List[Any], valset: List[Any], **kwargs) -> tuple: # Return tuple
"""
Run GEPA optimization with the given adapter and data
Args:
adapter: Custom adapter for GEPA
seed_candidate: Initial prompt candidate
trainset: Training dataset
valset: Validation dataset
**kwargs: Additional optimization parameters that can override config
Returns:
Dict with optimization results
Raises:
GepaOptimizerError: If optimization fails
Note:
The following parameters are required in the config:
- max_metric_calls: Maximum number of metric evaluations
- batch_size: Batch size for evaluation
- max_iterations: Maximum number of optimization iterations
"""
try:
# Get optimization parameters from config (these are required fields)
max_metric_calls = self.config.max_metric_calls
batch_size = self.config.batch_size
max_iterations = self.config.max_iterations
# Create reflection model client
from ..llms.vision_llm import VisionLLMClient
base_reflection_lm_client = VisionLLMClient(
provider=self.config.reflection_model.provider,
model_name=self.config.reflection_model.model_name,
api_key=self.config.reflection_model.api_key,
base_url=self.config.reflection_model.base_url,
temperature=self.config.reflection_model.temperature,
max_tokens=self.config.reflection_model.max_tokens,
top_p=self.config.reflection_model.top_p,
frequency_penalty=self.config.reflection_model.frequency_penalty,
presence_penalty=self.config.reflection_model.presence_penalty
)
# reflection_lm_client will be set below (may be wrapped with LLEGO)
reflection_lm_client = base_reflection_lm_client
# 🆕 LLEGO Integration: Create enhanced reflection callable
if self.config.use_llego_operators:
self.logger.info("🧬 LLEGO genetic operators ENABLED")
self.logger.info(f" α={self.config.alpha}, τ={self.config.tau}, ν={self.config.nu}")
self.logger.info(f" Crossover offspring: {self.config.n_crossover}, Mutation offspring: {self.config.n_mutation}")
# Import LLEGO operators
from ..operators.llego_operators import LLEGOIntegrationLayer, PromptCandidate
# Initialize LLEGO integration layer
llego = LLEGOIntegrationLayer(
alpha=self.config.alpha,
tau=self.config.tau,
nu=self.config.nu,
population_size=self.config.population_size,
n_crossover=self.config.n_crossover,
n_mutation=self.config.n_mutation
)
# Initialize with seed prompt
llego.initialize_population(
seed_prompt=seed_candidate.get('system_prompt', ''),
initial_fitness=0.5
)
# 🔥 HYBRID MODE FIX: Wrap reflection_lm_client with LLEGO for hybrid mode
# This ensures reflection calls go through LLEGO wrapper for candidate generation
if self.config.enable_gepa_reflection_with_llego:
self.logger.info("🔥 HYBRID MODE: Wrapping reflection_lm_client with LLEGO")
from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient
# Wrap reflection_lm_client with LLEGO so hybrid generation is triggered
reflection_lm_client = LLEGOEnhancedLLMClient(
base_llm=base_reflection_lm_client,
llego_layer=llego,
config=self.config, # Pass config for hybrid mode!
verbose=True
)
self.logger.info("✅ reflection_lm_client wrapped with LLEGO (hybrid mode enabled)")
# 🔥 CRITICAL: Store reflection_lm_client reference in adapter so it can set context
# This allows make_reflective_dataset to set reflection context on BOTH clients
if hasattr(adapter, 'reflection_lm_client'):
adapter.reflection_lm_client = reflection_lm_client
self.logger.info("✅ Stored reflection_lm_client reference in adapter")
else:
# Add reflection_lm_client attribute to adapter
adapter.reflection_lm_client = reflection_lm_client
self.logger.info("✅ Added reflection_lm_client attribute to adapter")
# 🔥 NEW: Also store config and reflection_lm_client for adapter-level generation
if hasattr(adapter, '_config'):
adapter._config = self.config
self.logger.info("✅ Stored config in adapter for hybrid mode")
else:
adapter._config = self.config
self.logger.info("✅ Added _config attribute to adapter")
if hasattr(adapter, '_reflection_lm_client'):
adapter._reflection_lm_client = reflection_lm_client
self.logger.info("✅ Stored _reflection_lm_client in adapter for hybrid mode")
else:
adapter._reflection_lm_client = reflection_lm_client
self.logger.info("✅ Added _reflection_lm_client attribute to adapter")
# 🔥 CRITICAL FIX: Ensure LLEGO layer is stored in adapter
# Without this, adapter.llego will be None and population updates are skipped!
if hasattr(adapter, 'llego'):
if adapter.llego is None:
adapter.llego = llego
self.logger.info("✅ CRITICAL: Set LLEGO layer in adapter (was None)")
else:
self.logger.debug("✅ LLEGO layer already set in adapter")
else:
# Add llego attribute if it doesn't exist
adapter.llego = llego
self.logger.info("✅ CRITICAL: Added LLEGO layer to adapter")
# 🔥 CRITICAL: Always set _reflection_lm_client in adapter (even without hybrid mode)
# This is required for propose_new_texts() to work
if not hasattr(adapter, '_reflection_lm_client') or adapter._reflection_lm_client is None:
adapter._reflection_lm_client = reflection_lm_client
self.logger.info("✅ Set _reflection_lm_client in adapter (required for propose_new_texts)")
# 🔥 HYBRID MODE FIX: Inject config into LLEGO wrapper for hybrid mode
# The adapter already has LLEGO wrapper, we just need to update its config
if self.config.enable_gepa_reflection_with_llego:
# HYBRID MODE: Update the LLEGO wrapper's config
self.logger.info("🔥 HYBRID MODE: Enabling hybrid candidate generation in LLEGO wrapper")
# Get the LLM client (may already be wrapped)
llm_client = self.adapter.llm_client
from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient
if isinstance(llm_client, LLEGOEnhancedLLMClient):
# Already wrapped, just update config
llm_client.config = self.config
self.logger.info("✅ Updated LLEGO wrapper with hybrid mode config")
else:
# Not wrapped yet, wrap it now with config
llego_wrapped_llm = LLEGOEnhancedLLMClient(
base_llm=llm_client,
llego_layer=llego,
config=self.config, # ← Pass config for hybrid mode!
verbose=True
)
# Update adapter's LLM client
self.adapter.llm_client = llego_wrapped_llm
self.logger.info("✅ Wrapped LLM client with LLEGO (hybrid mode enabled)")
adapter = self.adapter
else:
# LLEGO-ONLY MODE: Wrap adapter with LLEGO layer (no hybrid)
self.logger.info("🧬 LLEGO-ONLY MODE: Recreating adapter with LLEGO integration...")
if hasattr(self, 'adapter') and self.adapter:
from .universal_adapter import UniversalGepaAdapter
# Get original LLM client and evaluator from current adapter
original_llm = self.adapter.llm_client
# If it's already wrapped, unwrap it
if hasattr(original_llm, 'base_llm'):
original_llm = original_llm.base_llm
evaluator = self.adapter.evaluator
data_converter = self.adapter.data_converter
# Recreate adapter with LLEGO (no hybrid mode config)
from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient
llego_wrapped_llm = LLEGOEnhancedLLMClient(
base_llm=original_llm,
llego_layer=llego,
config=None, # No hybrid mode
verbose=True
)
adapter = UniversalGepaAdapter(
llm_client=llego_wrapped_llm,
evaluator=evaluator,
data_converter=data_converter,
llego_layer=llego
)
self.logger.info("✅ Adapter recreated with LLEGO-enhanced LLM client")
else:
adapter = self.adapter
# Create LLEGO-enhanced reflection callable
# When hybrid mode is enabled, reflection_lm_client is wrapped with LLEGO
# The wrapper will automatically generate hybrid candidates when called
def reflection_lm_callable(prompt: str) -> str:
"""
Reflection callable that delegates to LLEGO-wrapped client.
In hybrid mode, the wrapper generates candidates from both GEPA and LLEGO.
🔥 CRITICAL: Clean the prompt to remove base64 images and truncate if too long.
"""
# 🔥 FIX: Clean prompt to remove base64 images and truncate excessive data
cleaned_prompt = self._clean_reflection_prompt(prompt)
self.logger.info(f"\n{'🔥'*40}")
self.logger.info(f"🔥 reflection_lm_callable CALLED (delegating to LLEGO wrapper)")
self.logger.info(f"🔥 Original prompt length: {len(prompt)} chars")
self.logger.info(f"🔥 Cleaned prompt length: {len(cleaned_prompt)} chars")
self.logger.info(f"🔥 Truncation: {len(prompt) - len(cleaned_prompt)} chars removed")
self.logger.info(f"🔥 First 200 chars (cleaned): {cleaned_prompt[:200]}...")
self.logger.info(f"{'🔥'*40}\n")
try:
# 🔥 CRITICAL: Set reflection context BEFORE generating
# This signals to the LLEGO wrapper that we're in reflection mode
if isinstance(reflection_lm_client, LLEGOEnhancedLLMClient):
reflection_lm_client.set_reflection_context(
current_prompt=cleaned_prompt, # Use cleaned prompt
feedback=None,
in_reflection=True # Enable reflection mode
)
self.logger.info("✅ Reflection context set on reflection_lm_client")
# 🔥 HYBRID MODE: If reflection_lm_client is wrapped with LLEGO,
# calling generate() will trigger hybrid candidate generation
# The wrapper handles queuing and returns candidates one by one
# 🔥 CRITICAL: System prompt must instruct LLM to generate improved prompt, not feedback
optimization_system_prompt = """You are an expert prompt engineer specializing in iterative prompt optimization.
Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues.
Core Requirements:
1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary)
2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening)
3. PRESERVE the core task domain and output format requirements
4. INTEGRATE improvements from feedback naturally into the prompt structure
5. MAINTAIN clarity, specificity, and actionability
Quality Standards:
- Be specific and concrete (avoid vague instructions)
- Use clear, imperative language for task instructions
- Include edge case handling if feedback identifies confusion
- Ensure the prompt is self-contained and unambiguous
DO NOT include:
- Analysis of what went wrong
- Explanations of your changes
- Meta-text like "Here's an improved version..." or "Based on feedback..."
- Recommendations or suggestions (those are already in the feedback)
Output the improved prompt directly and only the prompt."""
result = reflection_lm_client.generate(
system_prompt=optimization_system_prompt,
user_prompt=cleaned_prompt, # Use cleaned prompt (no base64, truncated)
image_base64=""
)
# Extract content from result
if isinstance(result, dict):
candidate = result.get("content", str(result))
source = result.get("source", "unknown")
self.logger.info(f"✅ Candidate from {source} (FULL TEXT):")
self.logger.info(f" '{candidate}'")
return candidate
else:
candidate = str(result)
self.logger.info(f"✅ Candidate generated (FULL TEXT):")
self.logger.info(f" '{candidate}'")
return candidate
except Exception as e:
self.logger.error(f"❌ Error in reflection_lm_callable: {e}")
import traceback
self.logger.error(traceback.format_exc())
# Fallback: return prompt as-is
return prompt
# Set up reflection context for LLEGO wrapper
if self.config.enable_gepa_reflection_with_llego and isinstance(reflection_lm_client, LLEGOEnhancedLLMClient):
# Store current prompt in reflection context for LLEGO operators
reflection_lm_client.set_reflection_context(
current_prompt=seed_candidate.get('system_prompt', ''),
feedback=None,
in_reflection=True
)
else:
# Standard GEPA reflection (no LLEGO)
adapter = self.adapter # Use the original adapter
# 🔥 CRITICAL: Always set _reflection_lm_client in adapter (even without LLEGO)
# This is required for propose_new_texts() to work
if not hasattr(adapter, '_reflection_lm_client') or adapter._reflection_lm_client is None:
adapter._reflection_lm_client = reflection_lm_client
self.logger.info("✅ Set _reflection_lm_client in adapter (required for propose_new_texts)")
# Define standard reflection callable (no LLEGO enhancement)
def reflection_lm_callable(prompt: str) -> str:
"""Standard callable wrapper for reflection model that GEPA expects"""
try:
# 🔥 CRITICAL: System prompt must instruct LLM to generate improved prompt, not feedback
optimization_system_prompt = """You are an expert prompt engineer specializing in iterative prompt optimization.
Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues.
Core Requirements:
1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary)
2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening)
3. PRESERVE the core task domain and output format requirements
4. INTEGRATE improvements from feedback naturally into the prompt structure
5. MAINTAIN clarity, specificity, and actionability
Quality Standards:
- Be specific and concrete (avoid vague instructions)
- Use clear, imperative language for task instructions
- Include edge case handling if feedback identifies confusion
- Ensure the prompt is self-contained and unambiguous
DO NOT include:
- Analysis of what went wrong
- Explanations of your changes
- Meta-text like "Here's an improved version..." or "Based on feedback..."
- Recommendations or suggestions (those are already in the feedback)
Output the improved prompt directly and only the prompt."""
# For reflection, we only need text generation (no images)
result = reflection_lm_client.generate(
system_prompt=optimization_system_prompt,
user_prompt=prompt,
image_base64="" # No image for reflection
)
# Extract string content from the result dictionary
if isinstance(result, dict):
return result.get("content", str(result))
else:
return str(result)
except Exception as e:
self.logger.error(f"Reflection model error: {e}")
return prompt # Return original prompt on error
self.logger.info(
f"Starting GEPA optimization with {max_iterations} iterations, "
f"batch size {batch_size}, max metric calls: {max_metric_calls}"
)
self.logger.info(
f"GEPA parameters: candidate_selection_strategy=pareto, "
f"reflection_minibatch_size={batch_size}, "
f"skip_perfect_score=False, "
f"module_selector=round_robin"
)
# Prepare optimization parameters with ONLY valid GEPA parameters
# Note: 'adapter' variable is set above (either LLEGO-enhanced or standard)
# 🔥 FIX: ALWAYS pass reflection_lm_callable to GEPA (required for generating new candidates!)
# Previously this was only passed when use_llego_operators=True, which broke standard reflection
reflection_lm_passed = reflection_lm_callable # Always pass reflection callable
self.logger.info(f"✅ reflection_lm_callable passed to GEPA (LLEGO={self.config.use_llego_operators})")
# Debug logging removed - not needed in production
gepa_params = {
'adapter': adapter, # Use the adapter created above (with or without LLEGO)
'seed_candidate': seed_candidate,
'trainset': trainset,
'valset': valset,
'max_metric_calls': max_metric_calls,
# NOTE: GEPA does NOT have num_iterations - it uses max_metric_calls to control iterations
# 🔥 CRITICAL: When using an adapter, GEPA expects:
# - adapter.make_reflective_dataset() to create feedback data
# - GEPA's internal proposer to generate candidates from that data
# - task_lm and reflection_lm must be None (GEPA will use model from adapter)
'task_lm': None, # Don't pass - adapter handles this
'reflection_lm': reflection_lm_passed, # Pass LLEGO-enhanced reflection (may be ignored!)
# Valid GEPA parameters based on actual library
'candidate_selection_strategy': 'pareto', # Use Pareto selection
'skip_perfect_score': False, # Don't skip perfect scores
'reflection_minibatch_size': batch_size, # Use batch size for reflection
'perfect_score': 1.0, # Perfect score threshold
'module_selector': 'round_robin', # Cycle through components
'display_progress_bar': self.config.verbose, # Show progress if verbose
'raise_on_exception': True, # Raise exceptions for debugging
}
# 🔥 CRITICAL FIX: Filter kwargs to only include valid GEPA parameters
# GEPA does NOT accept num_iterations, max_iterations, or other non-GEPA params
VALID_GEPA_PARAMS = {
'seed_candidate', 'trainset', 'valset', 'adapter', 'task_lm', 'reflection_lm',
'candidate_selection_strategy', 'skip_perfect_score', 'batch_sampler',
'reflection_minibatch_size', 'perfect_score', 'reflection_prompt_template',
'module_selector', 'use_merge', 'max_merge_invocations', 'merge_val_overlap_floor',
'max_metric_calls', 'stop_callbacks', 'logger', 'run_dir', 'use_wandb',
'wandb_api_key', 'wandb_init_kwargs', 'use_mlflow', 'mlflow_tracking_uri',
'mlflow_experiment_name', 'track_best_outputs', 'display_progress_bar',
'use_cloudpickle', 'seed', 'raise_on_exception', 'val_evaluation_policy'
}
# Only add valid kwargs that aren't already in gepa_params
for key, value in kwargs.items():
if key in VALID_GEPA_PARAMS and key not in gepa_params:
gepa_params[key] = value
elif key not in VALID_GEPA_PARAMS:
self.logger.debug(f"⚠️ Filtering out invalid GEPA parameter: {key}")
# Debug logging removed - not needed in production
# 🎯 NEW: Capture GEPA's internal logging for pareto front information
gepa_output = io.StringIO()
# Log iteration start
from ..utils.clean_logger import get_clean_logger
clean_log = get_clean_logger()
clean_log.log_iteration_start(1, seed_prompt=seed_candidate.get('system_prompt', ''))
# 🔥 CRITICAL: Pass valset size to adapter for better dataset type detection
if hasattr(adapter, '_valset_size'):
adapter._valset_size = len(valset)
self.logger.debug(f"✅ Set valset_size in adapter: {len(valset)} for Dpareto detection")
# 🔥 CRITICAL FIX: Store valset in adapter so we can evaluate generated candidates on it
# This ensures generated candidates are evaluated on Dpareto for Pareto selection
if hasattr(adapter, '_valset'):
adapter._valset = valset
self.logger.debug(f"✅ Stored valset in adapter ({len(valset)} samples) for Dpareto evaluation of generated candidates")
else:
# Add _valset attribute if it doesn't exist
adapter._valset = valset
self.logger.debug(f"✅ Added _valset attribute to adapter ({len(valset)} samples)")
# Run GEPA optimization (synchronous call wrapped in async)
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self._run_gepa_with_logging(gepa_params, gepa_output)
)
# 🎯 NEW: Process and log pareto front information, extract iteration count
gepa_logs = gepa_output.getvalue()
actual_iterations = self._log_pareto_front_info(gepa_logs) # Get iteration count
return result, actual_iterations # Return both result and iteration count
except Exception as e:
# Try to extract partial results before failing
self.logger.warning(f"GEPA optimization failed: {e}")
# Check if we have any cached results from the adapter
best_candidate = adapter.get_best_candidate()
best_score = adapter.get_best_score()
if best_candidate and best_score > 0:
self.logger.info(f"🎯 Using cached best result with score: {best_score:.4f}")
# Create a mock GEPA result with the best candidate found
return {
'best_candidate': best_candidate,
'best_score': best_score,
'partial_result': True,
'error': f'GEPA failed but returning best result found: {str(e)}'
}
else:
# If no cached results, re-raise the error
raise GepaOptimizerError(f"GEPA optimization failed: {str(e)}")
def _run_gepa_with_logging(self, gepa_params: Dict[str, Any], output_buffer: io.StringIO) -> Any:
"""Run GEPA optimization while capturing its output."""
self.logger.info("🔄 Calling gepa.optimize() - GEPA should now:")
self.logger.info(" 1. Evaluate seed on validation set")
self.logger.info(" 2. For each iteration: evaluate on training minibatch (capture_traces=True)")
self.logger.info(" 3. Call make_reflective_dataset() with trajectories")
self.logger.info(" 4. Call propose_new_texts() or reflection_lm to generate new candidates")
self.logger.info(" 5. Evaluate new candidates and update Pareto front")
# Capture GEPA's print statements and logging
with redirect_stdout(output_buffer), redirect_stderr(output_buffer):
result = gepa.optimize(**gepa_params)
# Log GEPA output for debugging
gepa_output = output_buffer.getvalue()
if gepa_output:
self.logger.info("📋 GEPA Output (captured):")
for line in gepa_output.split('\n')[:50]: # First 50 lines
if line.strip():
self.logger.info(f" GEPA: {line}")
return result
def _log_pareto_front_info(self, gepa_logs: str) -> int: # Return int instead of None
"""Extract and log pareto front information from GEPA logs. Returns max iteration count."""
lines = gepa_logs.split('\n')
current_iteration = 0
max_iteration = 0 # Track max iteration
for line in lines:
# Look for iteration information
if 'iteration' in line.lower():
# Try to extract iteration number
import re
iteration_match = re.search(r'iteration\s+(\d+)', line.lower())
if iteration_match:
current_iteration = int(iteration_match.group(1))
max_iteration = max(max_iteration, current_iteration) # Track max
# Log iteration change
from ..utils.clean_logger import get_clean_logger
clean_log = get_clean_logger()
if current_iteration > clean_log.current_iteration:
clean_log.current_iteration = current_iteration
# Look for pareto front information
if 'pareto front' in line.lower() or 'new program' in line.lower():
self.logger.info(f"GEPA Pareto Update: {line.strip()}")
elif 'iteration' in line.lower() and ('score' in line.lower() or 'program' in line.lower()):
self.logger.debug(f"{line.strip()}")
elif 'best' in line.lower() and 'score' in line.lower():
self.logger.info(f"{line.strip()}")
# Look for evaluation information
if 'evaluating' in line.lower() and 'candidate' in line.lower():
self.logger.debug(f"{line.strip()}")
self.logger.info(f"GEPA Optimization Complete: {max_iteration} iterations")
# Debug logging removed - not needed in production
return max_iteration # Return the max iteration count
def _extract_best_candidate(self, gepa_result: Any) -> Dict[str, str]:
"""
Extract the best candidate from GEPA Pareto front (single source of truth).
GEPA Pareto front is the single source of truth because:
- All candidates (GEPA reflection, LLEGO crossover, LLEGO mutation) are evaluated on Dpareto
- All non-dominated candidates are added to GEPA Pareto front
- Therefore, the best candidate MUST be in GEPA Pareto front
Args:
gepa_result: Raw result from gepa.optimize() (used only as fallback edge case)
Returns:
Best candidate dictionary with prompt components from GEPA Pareto front
"""
try:
self.logger.info(f"\n{'═'*80}")
self.logger.info(f"🔍 EXTRACTING BEST CANDIDATE FROM GEPA PARETO FRONT")
self.logger.info(f"{'═'*80}")
# ========================================================================
# PRIMARY: Get best candidate from GEPA Pareto front (single source of truth)
# ========================================================================
from ..utils.pareto_logger import get_pareto_logger
pareto_log = get_pareto_logger()
if pareto_log.pareto_front:
try:
# Get best candidate from GEPA Pareto front (highest score = best)
gepa_pareto_best = max(pareto_log.pareto_front, key=lambda x: x['score'])
gepa_pareto_fitness = gepa_pareto_best['score']
gepa_pareto_prompt = gepa_pareto_best['prompt']
gepa_pareto_type = gepa_pareto_best.get('type', 'unknown')
gepa_pareto_notation = gepa_pareto_best.get('notation', 'S')
best_candidate = {
'system_prompt': gepa_pareto_prompt,
'fitness': gepa_pareto_fitness,
'source': 'gepa_pareto_front',
'candidate_type': gepa_pareto_type,
'notation': gepa_pareto_notation
}
self.logger.info(f"✅ SELECTED: Best candidate from GEPA Pareto front")
self.logger.info(f" Notation: {gepa_pareto_notation}")
self.logger.info(f" Fitness: f({gepa_pareto_notation})={gepa_pareto_fitness:.4f}")
self.logger.info(f" Type: {gepa_pareto_type}")
self.logger.info(f" Prompt length: {len(gepa_pareto_prompt)} chars")
self.logger.info(f" 💡 GEPA Pareto front is single source of truth (all candidates evaluated on Dpareto)")
return best_candidate
except Exception as e:
self.logger.error(f"❌ Failed to extract from GEPA Pareto front: {e}")
import traceback
self.logger.error(traceback.format_exc())
# ========================================================================
# EDGE CASE FALLBACK: Pareto front empty (shouldn't happen, but handle gracefully)
# ========================================================================
self.logger.warning(f"⚠️ GEPA Pareto front is empty - using gepa_result as fallback")
self.logger.warning(f" This should not happen if all candidates are evaluated on Dpareto")
# Try to extract from gepa_result (last resort)
if hasattr(gepa_result, 'best_candidate'):
gepa_candidate = gepa_result.best_candidate
gepa_prompt = gepa_candidate.get('system_prompt') if isinstance(gepa_candidate, dict) else str(gepa_candidate)
gepa_fitness = getattr(gepa_result, 'best_score', None)
if gepa_prompt:
self.logger.info(f"✅ Using gepa_result.best_candidate as fallback")
return {
'system_prompt': gepa_prompt,
'fitness': float(gepa_fitness) if gepa_fitness is not None else None,
'source': 'gepa_result_fallback',
'candidate_type': 'unknown',
'notation': 'S'
}
# Last resort: return empty prompt
self.logger.error(f"❌ No candidates found anywhere - returning empty prompt")
return {'system_prompt': ''}
except Exception as e:
self.logger.error(f"❌ Error extracting best candidate: {e}")
import traceback
self.logger.error(traceback.format_exc())
return {'system_prompt': ''}
def _evaluate_candidate_on_testset(
self,
candidate: Dict[str, str],
testset: List[Dict]
) -> float:
"""
Evaluate a candidate prompt on the held-out test set.
Args:
candidate: Prompt candidate to evaluate
testset: Test dataset (not used during optimization)
Returns:
Average composite score on test set
Raises:
TestSetEvaluationError: If evaluation fails
"""
from ..utils.exceptions import TestSetEvaluationError
try:
# Evaluate using the adapter (same as GEPA does internally)
eval_result = self.adapter.evaluate(
batch=testset,
candidate=candidate,
capture_traces=False # Don't need detailed traces for test
)
if not eval_result.scores:
raise TestSetEvaluationError("No scores returned from test evaluation")
# Calculate average score
avg_score = sum(eval_result.scores) / len(eval_result.scores)
self.logger.debug(
f"Test set evaluation: {len(eval_result.scores)} samples, "
f"scores: {eval_result.scores}, avg: {avg_score:.4f}"
)
return avg_score
except Exception as e:
raise TestSetEvaluationError(f"Failed to evaluate on test set: {str(e)}")
def optimize_sync(self,
model: str,
seed_prompt: str,
dataset: Any,
reflection_lm: str,
max_metric_calls: int = 150,
**kwargs) -> OptimizedResult:
"""
Synchronous version of the optimization method
Args:
model: Target model to optimize for
seed_prompt: Initial prompt to optimize
dataset: Training data in any format
reflection_lm: Model for reflection
max_metric_calls: Budget for optimization attempts
**kwargs: Additional optimization parameters
Returns:
OptimizedResult: Optimization result
"""
# Run the async method in a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
self.train(model, seed_prompt, dataset, reflection_lm, max_metric_calls, **kwargs)
)
return result
finally:
loop.close()
# Convenience function for quick optimization
def optimize_prompt(
model: Union[str, ModelConfig],
seed_prompt: str,
dataset: Any,
reflection_model: Optional[Union[str, ModelConfig]] = None,
**kwargs
) -> OptimizedResult:
"""
Convenience function for quick prompt optimization without creating optimizer instance
Args:
model: Target model configuration
seed_prompt: Initial prompt to optimize
dataset: Training data
reflection_model: Model for reflection (optional)
**kwargs: Additional optimization parameters
Returns:
OptimizedResult: Optimization result
"""
# Create default config if not provided
if reflection_model is None:
reflection_model = model
config = OptimizationConfig(
model=model,
reflection_model=reflection_model,
max_iterations=kwargs.get('max_iterations', 10),
max_metric_calls=kwargs.get('max_metric_calls', 50),
batch_size=kwargs.get('batch_size', 4)
)
optimizer = GepaOptimizer(config=config)
return asyncio.run(optimizer.train(seed_prompt, dataset, **kwargs))