Spaces:
Sleeping
Sleeping
| """ | |
| Universal GEPA adapter for user-defined metrics and LLM clients. | |
| """ | |
| from .base_adapter import BaseGepaAdapter | |
| from ..data.converters import UniversalConverter | |
| from typing import Any, Dict, List, Optional | |
| import logging | |
| import re | |
| from gepa.core.adapter import EvaluationBatch | |
| logger = logging.getLogger(__name__) | |
| class UniversalGepaAdapter(BaseGepaAdapter): | |
| """ | |
| Universal GEPA adapter that works with any LLM client and evaluator. | |
| This adapter uses the existing UniversalConverter for data processing | |
| and delegates LLM generation and evaluation to user-provided components. | |
| Features: | |
| - Optimized multi-variation JSON generation (66% cost reduction) | |
| - Robust parsing with multiple fallback strategies | |
| - Automatic fallback to sequential generation if JSON parsing fails | |
| """ | |
| # Fallback system prompt for sequential generation (when JSON parsing fails) | |
| _FALLBACK_SYSTEM_PROMPT = """You are an expert prompt engineer specializing in iterative prompt optimization. | |
| Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues. | |
| Core Requirements: | |
| 1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary) | |
| 2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening) | |
| 3. PRESERVE the core task domain and output format requirements | |
| 4. INTEGRATE improvements from feedback naturally into the prompt structure | |
| 5. MAINTAIN clarity, specificity, and actionability | |
| Quality Standards: | |
| - Be specific and concrete (avoid vague instructions) | |
| - Use clear, imperative language for task instructions | |
| - Include edge case handling if feedback identifies confusion | |
| - Ensure the prompt is self-contained and unambiguous | |
| DO NOT include: | |
| - Analysis of what went wrong | |
| - Explanations of your changes | |
| - Meta-text like "Here's an improved version..." or "Based on feedback..." | |
| - Recommendations or suggestions (those are already in the feedback) | |
| Output the improved prompt directly and only the prompt.""" | |
| def __init__(self, llm_client, evaluator, data_converter=None, llego_layer=None): | |
| """ | |
| Initialize universal adapter. | |
| Args: | |
| llm_client: User-provided LLM client (must inherit from BaseLLMClient) | |
| evaluator: User-provided evaluator (must inherit from BaseEvaluator) | |
| data_converter: Optional custom data converter (uses UniversalConverter by default) | |
| llego_layer: Optional LLEGO integration layer for genetic operations | |
| """ | |
| # Store LLEGO layer first | |
| self.llego = llego_layer | |
| # If LLEGO is provided, wrap the LLM client | |
| # Note: If config is passed separately, it will be handled by optimizer | |
| if llego_layer is not None: | |
| from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient | |
| # Only wrap if not already wrapped (optimizer may have wrapped it with config) | |
| if not isinstance(llm_client, LLEGOEnhancedLLMClient): | |
| # Wrap before calling super().__init__ | |
| # Config will be set later by optimizer if hybrid mode is enabled | |
| llm_client = LLEGOEnhancedLLMClient(llm_client, llego_layer, config=None, verbose=True) | |
| else: | |
| # Already wrapped, but update config if available | |
| if hasattr(llm_client, 'config') and llm_client.config is None: | |
| # Config will be set by optimizer later | |
| pass | |
| # Initialize parent (this sets up self.logger) | |
| super().__init__(llm_client, evaluator) | |
| # Use existing UniversalConverter for data processing | |
| self.data_converter = data_converter or UniversalConverter() | |
| # π₯ NEW: Initialize optimization state tracking | |
| self._is_baseline_evaluation = False # Flag to distinguish baseline vs optimization | |
| self._last_candidate = None # Track last candidate to detect changes | |
| self._gepa_iteration = 0 # Track actual GEPA iteration (not evaluation count) | |
| # Track candidates for logging | |
| self._evaluation_count = 0 | |
| # Track current evaluation context | |
| self._current_evaluation_type = None # 'seed', 'gepa_reflection', 'llego_crossover', 'llego_mutation' | |
| self._current_dataset_type = None # 'dfeedback' or 'dpareto' | |
| self._baseline_score = None # Store baseline score for comparison | |
| # Track candidate sources by prompt text (in case GEPA doesn't pass source field) | |
| self._candidate_sources = {} # Maps prompt_text -> source_type | |
| # Track validation set size for better dataset type detection | |
| self._valset_size = None # Will be set by optimizer | |
| self._valset = None # Will be set by optimizer - stores actual valset for Dpareto evaluation | |
| # π₯ CRITICAL: Track which candidates have been evaluated on Dpareto to avoid double evaluation | |
| # Key: normalized prompt text, Value: (fitness_score, candidate_type, timestamp) | |
| self._dpareto_evaluated_candidates = {} # Maps prompt -> (score, type) | |
| # π₯ HYBRID MODE: Storage for generated candidates | |
| self._generated_candidates = [] # Store hybrid mode candidates | |
| self._candidate_generation_active = False # Track if we're generating candidates | |
| self._config = None # Will be set by optimizer if hybrid mode enabled | |
| self._reflection_lm_client = None # Will be set by optimizer | |
| # π₯ FORMAT AWARENESS: Store detected output format for better prompts | |
| self._detected_format = None # Will be populated from expected outputs | |
| self._format_detection_done = False # Only detect once | |
| # Log initialization | |
| model_info = llm_client.get_model_info() | |
| if llego_layer is not None: | |
| self.logger.info(f"π Initialized Universal adapter with {model_info}") | |
| self.logger.info(f"𧬠LLEGO integration ENABLED - LLM client is wrapped for genetic operations") | |
| else: | |
| self.logger.info(f"π Initialized Universal adapter with {model_info}") | |
| def _clean_llm_output(self, output: str) -> str: | |
| """ | |
| π₯ CRITICAL: Clean LLM output before evaluation. | |
| LLMs often wrap JSON/structured output in markdown code blocks. | |
| This causes evaluation to fail because the evaluator sees: | |
| "```json\n{\"key\": \"value\"}\n```" | |
| Instead of: | |
| "{\"key\": \"value\"}" | |
| This method extracts the clean content for fair comparison. | |
| """ | |
| if not output or not isinstance(output, str): | |
| return output | |
| cleaned = output.strip() | |
| # Remove markdown code blocks (```json ... ``` or ``` ... ```) | |
| code_block_match = re.search(r'```(?:json|JSON)?\s*([\s\S]*?)\s*```', cleaned) | |
| if code_block_match: | |
| extracted = code_block_match.group(1).strip() | |
| # Only use extracted if it looks like valid content | |
| if extracted and (extracted.startswith('{') or extracted.startswith('[') or len(extracted) > 10): | |
| self.logger.debug(f"π¦ Cleaned markdown code block from LLM output") | |
| return extracted | |
| # Remove leading/trailing markdown artifacts | |
| # Handle cases like "Here is the JSON:\n```json\n...\n```" | |
| if '```' in cleaned: | |
| # Try to extract content between first ``` and last ``` | |
| parts = cleaned.split('```') | |
| if len(parts) >= 3: | |
| # Content is in the middle part(s) | |
| middle_content = parts[1] | |
| # Remove language tag if present (e.g., "json\n") | |
| middle_content = re.sub(r'^(?:json|JSON|python|text)\s*\n?', '', middle_content).strip() | |
| if middle_content: | |
| return middle_content | |
| return cleaned | |
| def _detect_and_cache_format(self, batch: List[Dict[str, Any]]) -> None: | |
| """ | |
| Detect output format from expected outputs and cache for future use. | |
| This enables format-aware prompting and feedback generation. | |
| """ | |
| try: | |
| from ..utils.format_detection import detect_output_format | |
| # Extract expected outputs from batch | |
| expected_outputs = [] | |
| for item in batch: | |
| # Try to extract output directly, or standardize if needed | |
| output = None | |
| if isinstance(item, dict): | |
| # Try common output field names first | |
| output = item.get('output') or item.get('expected_output') or item.get('result') or item.get('answer') | |
| if not output: | |
| # Standardize using converter's private method (same as _evaluate_batch_mode) | |
| try: | |
| standardized = self.data_converter._standardize([item])[0] | |
| output = standardized.get('output') | |
| except Exception: | |
| pass | |
| if output and isinstance(output, str) and output.strip(): | |
| expected_outputs.append(output) | |
| if expected_outputs: | |
| self._detected_format = detect_output_format(expected_outputs) | |
| self.logger.info(f"π FORMAT DETECTED: {self._detected_format['format_type']}") | |
| self.logger.info(f" Spec: {self._detected_format['format_spec'][:100]}...") | |
| self.logger.info(f" Avg length: {self._detected_format['avg_length']} chars") | |
| # Debug logging removed - not needed in production | |
| else: | |
| self.logger.warning("β οΈ No expected outputs found for format detection") | |
| self._detected_format = None | |
| # Debug logging removed - not needed in production | |
| except Exception as e: | |
| self.logger.warning(f"β οΈ Format detection failed: {e}") | |
| self._detected_format = None | |
| def evaluate(self, batch: List[Dict[str, Any]], candidate: Dict[str, str], | |
| capture_traces: bool = False) -> EvaluationBatch: | |
| """ | |
| Evaluate candidates using user-provided LLM client and evaluator. | |
| This method automatically detects BatchLLMClient and uses batch processing | |
| for cost savings, or falls back to standard individual processing. | |
| This method works with any data type supported by UniversalConverter. | |
| π₯ IMPORTANT: We only optimize system_prompt, NOT user_prompt. | |
| The user_prompt varies per tester and is not part of optimization. | |
| π₯ CACHING: Seed prompt is evaluated ONLY ONCE on Dpareto (validation set). | |
| Subsequent evaluations return cached result to save API calls and ensure consistency. | |
| """ | |
| system_prompt = candidate.get('system_prompt', '') | |
| # π₯ FORMAT DETECTION: Detect output format from expected outputs (once) | |
| if not self._format_detection_done and batch: | |
| self._detect_and_cache_format(batch) | |
| self._format_detection_done = True | |
| # Determine dataset type first (needed for cache check) | |
| batch_size_threshold = self._config.batch_size if hasattr(self, '_config') and self._config else 8 | |
| # π₯ CRITICAL FIX: If capture_traces=True, this is a TRAINING MINIBATCH for reflection | |
| # GEPA calls with capture_traces=True when it needs trajectories for reflective mutation | |
| # We must NEVER use cache in this case, otherwise trajectories=None breaks GEPA! | |
| if capture_traces: | |
| dataset_type = 'dfeedback' # Training minibatch - need fresh evaluation with trajectories | |
| self.logger.info(f"π― evaluate() called with capture_traces=True - this is a TRAINING MINIBATCH") | |
| self.logger.info(f" Batch size: {len(batch)}, Will generate trajectories for reflection") | |
| # If _is_baseline_evaluation is True, we KNOW this is the validation set | |
| elif hasattr(self, '_is_baseline_evaluation') and self._is_baseline_evaluation: | |
| dataset_type = 'dpareto' # Baseline is ALWAYS evaluated on validation set | |
| self.logger.debug(f"π― Forced dataset_type to 'dpareto' (baseline evaluation flag is True)") | |
| elif hasattr(self, '_valset_size') and self._valset_size is not None and len(batch) == self._valset_size: | |
| # π₯ FIX: Use == not >= to avoid misclassifying training minibatches as validation set | |
| dataset_type = 'dpareto' # EXACT validation set size = Dpareto | |
| elif len(batch) > batch_size_threshold * 1.5: | |
| dataset_type = 'dpareto' # Much larger than batch = likely full valset | |
| else: | |
| dataset_type = 'dfeedback' # Small batch = training minibatch for reflection | |
| # π₯ CRITICAL: Check cache to avoid re-evaluating same prompt on Dpareto | |
| # This ensures seed prompt is evaluated ONLY ONCE | |
| # NOTE: Only applies when capture_traces=False (validation set evaluation) | |
| if dataset_type == 'dpareto' and not capture_traces: | |
| normalized_prompt = system_prompt.strip().strip('"\'') | |
| if normalized_prompt in self._dpareto_evaluated_candidates: | |
| existing_score, existing_type, _ = self._dpareto_evaluated_candidates[normalized_prompt] | |
| self.logger.info( | |
| f"β»οΈ CACHE HIT: Prompt already evaluated on Dpareto " | |
| f"(score={existing_score:.4f}, type={existing_type}) - skipping re-evaluation" | |
| ) | |
| # Return cached result - create EvaluationBatch with cached score | |
| cached_outputs = [f"[CACHED: {existing_type}]"] * len(batch) | |
| cached_scores = [existing_score] * len(batch) | |
| # Still update baseline if this is seed and baseline not set | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| if existing_type == 'seed' and self._baseline_score is None: | |
| self._baseline_score = existing_score | |
| pareto_log.set_baseline(existing_score) | |
| self.logger.info(f"π Baseline score set from cache: {existing_score:.4f}") | |
| # Log to Pareto logger (for tracking, but no re-evaluation) | |
| pareto_log.log_candidate_evaluation( | |
| prompt=system_prompt, | |
| score=existing_score, | |
| candidate_type=existing_type, | |
| dataset_type='dpareto' | |
| ) | |
| return EvaluationBatch( | |
| outputs=cached_outputs, | |
| scores=cached_scores, | |
| trajectories=None # No traces for cached results | |
| ) | |
| # Determine candidate type | |
| # Priority order: | |
| # 1. Check candidate dict for 'source' field (from LLM wrapper) | |
| # 2. Check _candidate_sources mapping (from previous evaluations) | |
| # 3. Check _current_evaluation_type (from log_proposed_candidate) | |
| # 4. Infer from context (seed, repeat, etc.) | |
| candidate_type = candidate.get('source') # First try candidate dict | |
| if not candidate_type or candidate_type == 'unknown': | |
| candidate_type = self._candidate_sources.get(system_prompt) # Check mapping | |
| if not candidate_type or candidate_type == 'unknown': | |
| candidate_type = self._current_evaluation_type # Use stored type | |
| if not candidate_type or candidate_type == 'unknown': | |
| # Try to infer from prompt or metadata | |
| if system_prompt == self._last_candidate: | |
| candidate_type = 'repeat' # Same prompt being re-evaluated | |
| elif self._evaluation_count == 0 or 'seed' in str(candidate.get('source', '')).lower(): | |
| candidate_type = 'seed' # Explicitly mark as seed | |
| self.logger.debug("π± Detected seed prompt (Sβ)") | |
| else: | |
| candidate_type = 'unknown' # Truly unknown | |
| # Debug logging removed - not needed in production | |
| # Store source for future lookups (always update if we found a valid type) | |
| if candidate_type and candidate_type != 'unknown' and system_prompt not in self._candidate_sources: | |
| self._candidate_sources[system_prompt] = candidate_type | |
| self.logger.debug(f" π Stored candidate type: {candidate_type} for prompt (length: {len(system_prompt)})") | |
| # Dataset type already determined above for cache check - reuse it | |
| # Debug logging removed - not needed in production | |
| # Check if this is a new candidate (different from last one) | |
| if self._last_candidate != system_prompt: | |
| self._evaluation_count += 1 | |
| # π₯ CRITICAL: If this is baseline evaluation, force candidate_type to 'seed' | |
| if self._is_baseline_evaluation: | |
| candidate_type = 'seed' | |
| self.logger.debug(f"π± Baseline evaluation detected - setting candidate_type to 'seed'") | |
| self._current_evaluation_type = candidate_type | |
| self._current_dataset_type = dataset_type | |
| self._last_candidate = system_prompt | |
| # Minimal logging - just track what we're evaluating | |
| if self._is_baseline_evaluation: | |
| self.logger.debug(f"Evaluating baseline (Sβ) on {dataset_type}") | |
| else: | |
| self.logger.debug(f"Evaluating candidate #{self._evaluation_count} ({candidate_type}) on {dataset_type}") | |
| # Detect and use batch mode if available | |
| from ..llms.batch_llm import BatchLLMClient | |
| is_batch_mode = isinstance(self.llm_client, BatchLLMClient) | |
| if is_batch_mode: | |
| outputs, scores, trajectories = self._evaluate_batch_mode( | |
| batch, system_prompt, capture_traces | |
| ) | |
| else: | |
| outputs, scores, trajectories = self._evaluate_standard_mode( | |
| batch, system_prompt, capture_traces | |
| ) | |
| avg_score = sum(scores) / len(scores) if scores else 0.0 | |
| # Debug logging removed - not needed in production | |
| # π₯ CRITICAL FIX: Baseline MUST be set from seed's first Dpareto evaluation ONLY | |
| # This ensures FAIR comparison: seed and candidates evaluated on SAME dataset (Dpareto) with SAME number of datapoints | |
| # | |
| # Fair evaluation requires: | |
| # - Seed baseline: Dpareto (validation set) - first evaluation during optimization | |
| # - Candidates: Dpareto (validation set) - same dataset, same size | |
| # - Same conditions = fair comparison β | |
| # | |
| # We IGNORE test set for baseline - baseline must come from Dpareto to ensure same dataset/size | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| # π₯ FIX: Check if this is baseline evaluation AND dpareto - set baseline with priority | |
| is_baseline_eval = hasattr(self, '_is_baseline_evaluation') and self._is_baseline_evaluation | |
| if self._baseline_score is None: | |
| # π₯ FIX B: Set baseline on FIRST Dpareto evaluation, regardless of candidate type | |
| # Also set baseline if this is explicitly marked as baseline evaluation | |
| if self._current_dataset_type == 'dpareto' or is_baseline_eval: | |
| # β PRIMARY: Set baseline from FIRST Dpareto evaluation (seed or first candidate) | |
| self._baseline_score = avg_score | |
| pareto_log.set_baseline(avg_score) | |
| self.logger.info(f"π Baseline score (Dpareto, {len(batch)} samples): {avg_score:.4f}") | |
| self.logger.info(f" β Baseline set from {'baseline evaluation' if is_baseline_eval else 'first Dpareto'} (type: {self._current_evaluation_type})") | |
| # Debug logging removed - not needed in production | |
| # Note: Test set evaluations are ignored for baseline - baseline comes from Dpareto | |
| else: | |
| # π₯ SAFETY CHECK: Ensure Pareto logger also has baseline if adapter has it | |
| # This handles the case where optimizer set baseline in adapter but Pareto logger wasn't updated | |
| if (self._current_dataset_type == 'dpareto' or is_baseline_eval) and pareto_log.baseline_score is None: | |
| pareto_log.set_baseline(self._baseline_score) | |
| self.logger.info(f"β Synchronized baseline in Pareto logger: {self._baseline_score:.4f}") | |
| # Track Dpareto evaluations for Pareto front | |
| if self._current_dataset_type == 'dpareto': | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| pareto_log.log_candidate_evaluation( | |
| prompt=system_prompt, | |
| score=avg_score, | |
| candidate_type=self._current_evaluation_type or 'unknown', | |
| dataset_type=self._current_dataset_type | |
| ) | |
| # Track evaluated candidates | |
| normalized_prompt = system_prompt.strip().strip('"\'') | |
| if normalized_prompt not in self._dpareto_evaluated_candidates: | |
| self._dpareto_evaluated_candidates[normalized_prompt] = ( | |
| avg_score, self._current_evaluation_type or 'unknown', 'evaluated_by_gepa' | |
| ) | |
| self.logger.debug(f"Evaluation complete: score={avg_score:.4f}") | |
| # π₯ CRITICAL: Update _best_candidate and _best_score with average fitness for Dpareto evaluations | |
| # This ensures the adapter tracks the best average fitness, not just per-sample scores | |
| # Only update if this score is better than current best | |
| if self._current_dataset_type == 'dpareto': | |
| if self._best_score is None or avg_score > self._best_score: | |
| self._best_score = avg_score | |
| self._best_candidate = { | |
| 'system_prompt': system_prompt, | |
| 'fitness': avg_score, | |
| 'source': self._current_evaluation_type or 'unknown' | |
| } | |
| self.logger.info(f"β Updated best candidate from Dpareto evaluation: f={avg_score:.4f} (type: {self._current_evaluation_type})") | |
| return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajectories) | |
| def _evaluate_batch_mode( | |
| self, | |
| batch: List[Dict], | |
| system_prompt: str, | |
| capture_traces: bool | |
| ) -> tuple: | |
| """ | |
| Batch mode evaluation - process all samples in one API call. | |
| This method prepares all requests, submits them as a batch job to Gemini, | |
| waits for completion, then evaluates all results. | |
| """ | |
| # Prepare all requests | |
| requests = [] | |
| standardized_items = [] | |
| for item in batch: | |
| standardized_item = self.data_converter._standardize([item])[0] | |
| standardized_items.append(standardized_item) | |
| request = { | |
| 'system_prompt': system_prompt, | |
| 'user_prompt': standardized_item['input'] | |
| } | |
| if standardized_item.get('image'): | |
| request['image_base64'] = standardized_item['image'] | |
| requests.append(request) | |
| # Submit batch job and get all results at once | |
| batch_results = self.llm_client.generate_batch(requests) | |
| # Process results | |
| outputs = [] | |
| scores = [] | |
| trajectories = [] if capture_traces else None | |
| for i, (llm_response, standardized_item) in enumerate(zip(batch_results, standardized_items)): | |
| # Extract content | |
| raw_output = llm_response.get("content", "") | |
| # π₯ CRITICAL: Clean markdown wrappers before evaluation | |
| predicted_output = self._clean_llm_output(raw_output) | |
| outputs.append(predicted_output) | |
| # Evaluate with cleaned output | |
| evaluation_results = self.evaluator.evaluate( | |
| predicted_output, | |
| standardized_item['output'] | |
| ) | |
| composite_score = evaluation_results.get("composite_score", 0.0) | |
| scores.append(composite_score) | |
| # Update tracking | |
| if composite_score > self._best_score: | |
| self._best_score = composite_score | |
| self._best_candidate = {'system_prompt': system_prompt} | |
| # Capture traces | |
| if capture_traces: | |
| trajectories.append({ | |
| 'input_data': standardized_item, | |
| 'predicted_output': predicted_output, | |
| 'evaluation_results': evaluation_results | |
| }) | |
| # Concise logging with element IDs and candidate notation | |
| predicted_element = evaluation_results.get('predicted_element', '?') | |
| expected_element = evaluation_results.get('expected_element', '?') | |
| status = "β " if composite_score == 1.0 else "β" | |
| # Add notation for candidate type | |
| notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'} | |
| notation = notation_map.get(self._current_evaluation_type, 'S') | |
| self.logger.info(f" [{notation}] Sample {i+1}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status}") | |
| return outputs, scores, trajectories | |
| def _evaluate_standard_mode( | |
| self, | |
| batch: List[Dict], | |
| system_prompt: str, | |
| capture_traces: bool | |
| ) -> tuple: | |
| """ | |
| Standard mode evaluation - process samples individually (existing logic). | |
| This is the original implementation, preserved for backward compatibility | |
| and for use with non-batch LLM clients. | |
| """ | |
| outputs = [] | |
| scores = [] | |
| trajectories = [] if capture_traces else None | |
| for i, item in enumerate(batch): | |
| # Use existing data processing logic | |
| standardized_item = self.data_converter._standardize([item])[0] | |
| # Prepare generation parameters | |
| generation_params = { | |
| 'system_prompt': system_prompt, | |
| 'user_prompt': standardized_item['input'] | |
| } | |
| # Add image if present | |
| if standardized_item.get('image'): | |
| generation_params['image_base64'] = standardized_item['image'] | |
| # Generate response using user's LLM client | |
| llm_response = self.llm_client.generate(**generation_params) | |
| # Extract content | |
| if isinstance(llm_response, dict): | |
| raw_output = llm_response.get("content", "") | |
| else: | |
| raw_output = str(llm_response) | |
| # π₯ CRITICAL: Clean markdown wrappers before evaluation | |
| predicted_output = self._clean_llm_output(raw_output) | |
| outputs.append(predicted_output) | |
| # Evaluate using user's evaluator with cleaned output | |
| evaluation_results = self.evaluator.evaluate( | |
| predicted_output, | |
| standardized_item['output'] | |
| ) | |
| composite_score = evaluation_results.get("composite_score", 0.0) | |
| scores.append(composite_score) | |
| # Debug logging removed - not needed in production | |
| # Update performance tracking | |
| self._evaluation_count += 1 | |
| if composite_score > self._best_score: | |
| self._best_score = composite_score | |
| self._best_candidate = {'system_prompt': system_prompt} | |
| # Capture traces if requested | |
| if capture_traces: | |
| trajectories.append({ | |
| 'input_data': standardized_item, | |
| 'predicted_output': predicted_output, | |
| 'evaluation_results': evaluation_results | |
| }) | |
| # Concise logging with element IDs and candidate notation | |
| predicted_element = evaluation_results.get('predicted_element', '?') | |
| expected_element = evaluation_results.get('expected_element', '?') | |
| status = "β " if composite_score == 1.0 else "β" | |
| # Add notation for candidate type | |
| notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'} | |
| notation = notation_map.get(self._current_evaluation_type, 'S') | |
| self.logger.info(f" [{notation}] Sample {i+1}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status}") | |
| return outputs, scores, trajectories | |
| def make_reflective_dataset(self, candidate: Dict[str, str], eval_batch: EvaluationBatch, | |
| components_to_update: List[str]) -> Dict[str, List[Dict[str, Any]]]: | |
| """ | |
| Create reflective dataset using user-provided evaluator. | |
| This method generates feedback based on the evaluation results | |
| from the user's custom evaluator. | |
| π₯ NEW: If hybrid mode is enabled, this method ALSO generates hybrid candidates | |
| (GEPA Reflection + LLEGO Operators) and stores them for GEPA to use. | |
| """ | |
| # π₯ DEBUG: Log that this method is being called (CRITICAL for debugging) | |
| self.logger.info(f"\n{'='*80}") | |
| self.logger.info(f"π₯ make_reflective_dataset() CALLED BY GEPA") | |
| self.logger.info(f"{'='*80}") | |
| self.logger.info(f" Candidate prompt: {candidate.get('system_prompt', '')[:100]}...") | |
| self.logger.info(f" Eval batch has trajectories: {eval_batch.trajectories is not None and len(eval_batch.trajectories) > 0 if eval_batch.trajectories else False}") | |
| self.logger.info(f" Eval batch scores: {eval_batch.scores if eval_batch.scores else 'None'}") | |
| self.logger.info(f" Components to update: {components_to_update}") | |
| reflective_dataset = {} | |
| system_prompt = candidate.get('system_prompt', '') | |
| # π₯ REMOVED: Verbose diagnostic checks - only log if hybrid mode is actually enabled | |
| hybrid_mode_enabled = (self._config and | |
| hasattr(self._config, 'enable_gepa_reflection_with_llego') and | |
| self._config.enable_gepa_reflection_with_llego and | |
| self._reflection_lm_client) | |
| if hybrid_mode_enabled: | |
| self.logger.debug(f"β Hybrid mode conditions met - will generate hybrid candidates") | |
| # ======================================================================== | |
| # π₯ CRITICAL FIX: Update LLEGO population with evaluated candidate | |
| # ======================================================================== | |
| # This is the MISSING LINK! After a candidate is evaluated, we need to add it | |
| # to the LLEGO population so it can be used for crossover/mutation. | |
| # Without this, the population only contains the seed, so Pareto front stays at 1! | |
| # | |
| # This is called for EVERY candidate that GEPA evaluates: | |
| # - Seed prompt (baseline) β added to population | |
| # - New candidate 1 (from reflection/crossover/mutation) β added to population | |
| # - New candidate 2 β added to population | |
| # - etc. | |
| if self.llego: | |
| # Calculate average fitness from evaluation scores | |
| if eval_batch.scores and len(eval_batch.scores) > 0: | |
| avg_fitness = sum(eval_batch.scores) / len(eval_batch.scores) | |
| else: | |
| # Fallback: extract from trajectories if scores not available | |
| scores = [t.get('evaluation_results', {}).get('composite_score', 0.0) | |
| for t in eval_batch.trajectories if 'evaluation_results' in t] | |
| avg_fitness = sum(scores) / len(scores) if scores else 0.0 | |
| self.logger.debug(f"Updating LLEGO population: fitness={avg_fitness:.4f}") | |
| # Create PromptCandidate from evaluated prompt | |
| from ..operators.llego_operators import PromptCandidate | |
| # Check if this candidate already exists in population (avoid duplicates) | |
| # π₯ FIX: Normalize prompts for comparison (strip whitespace, remove quotes) | |
| normalized_new_prompt = system_prompt.strip().strip('"\'') | |
| existing_prompts = {p.prompt.strip().strip('"\'') for p in self.llego.population} | |
| # Also check normalized versions | |
| if normalized_new_prompt not in existing_prompts: | |
| prompt_candidate = PromptCandidate( | |
| prompt=system_prompt, # Keep original prompt (not normalized) | |
| fitness=avg_fitness, | |
| metadata={ | |
| 'generation': self.llego.current_generation, | |
| 'operator': 'evaluated', | |
| 'prompt_length': len(system_prompt), | |
| 'word_count': len(system_prompt.split()), | |
| 'evaluation_samples': len(eval_batch.scores) if eval_batch.scores else 0, | |
| 'candidate_type': self._current_evaluation_type or 'unknown', # Store type for notation | |
| 'dataset_evaluated': self._current_dataset_type or 'unknown' | |
| } | |
| ) | |
| # Update population - this will add the candidate and keep top N by fitness | |
| population_before = len(self.llego.population) | |
| self.llego.update_population([prompt_candidate]) | |
| population_after = len(self.llego.population) | |
| self.logger.debug(f"Added to LLEGO population: fitness={avg_fitness:.4f}, size={population_after}") | |
| else: | |
| # Update fitness if candidate already exists (seed prompt, etc.) | |
| # π₯ FIX: Also normalize for comparison | |
| updated = False | |
| for p in self.llego.population: | |
| normalized_existing = p.prompt.strip().strip('"\'') | |
| if normalized_existing == normalized_new_prompt: | |
| old_fitness = p.fitness | |
| if avg_fitness > p.fitness: | |
| p.fitness = avg_fitness | |
| updated = True | |
| self.logger.debug(f"Updated fitness: {old_fitness:.4f} β {avg_fitness:.4f}") | |
| # Update candidate type if we have new information | |
| if self._current_evaluation_type and p.metadata: | |
| old_type = p.metadata.get('candidate_type', 'unknown') | |
| if self._current_evaluation_type != old_type: | |
| p.metadata['candidate_type'] = self._current_evaluation_type | |
| else: | |
| self.logger.debug(f"βΉοΈ Candidate already exists with better/equal fitness: {p.fitness:.4f} >= {avg_fitness:.4f}") | |
| break | |
| if not updated: | |
| self.logger.debug(f"Candidate already in population with higher fitness") | |
| else: | |
| self.logger.debug("LLEGO not initialized - skipping population update") | |
| # ======================================================================== | |
| # π₯ HYBRID MODE: Generate candidates at adapter level | |
| # ======================================================================== | |
| if (self._config and | |
| hasattr(self._config, 'enable_gepa_reflection_with_llego') and | |
| self._config.enable_gepa_reflection_with_llego and | |
| self._reflection_lm_client): | |
| self.logger.debug("Generating hybrid candidates") | |
| # Generate hybrid candidates FIRST | |
| generated_candidates = self._generate_hybrid_candidates_adapter_level( | |
| current_prompt=system_prompt, | |
| eval_batch=eval_batch, | |
| candidate=candidate | |
| ) | |
| # π₯ CRITICAL: Store generated candidates so we can inject them | |
| # _generate_hybrid_candidates_adapter_level now returns list of dicts with metadata | |
| if generated_candidates: | |
| candidate_dicts = [] | |
| for cand in generated_candidates: | |
| if isinstance(cand, dict) and 'prompt' in cand: | |
| # Already a dict with metadata (preferred format) | |
| candidate_dicts.append(cand) | |
| elif isinstance(cand, str): | |
| # Just a string - determine source based on position (fallback) | |
| # This shouldn't happen if _generate_hybrid_candidates_adapter_level is fixed | |
| self.logger.warning(f"β οΈ Received string candidate instead of dict - using fallback logic") | |
| if len(candidate_dicts) < self._config.num_gepa_reflection_candidates: | |
| source = 'gepa_reflection' | |
| elif len(candidate_dicts) < self._config.num_gepa_reflection_candidates + self._config.n_crossover: | |
| source = 'llego_crossover' | |
| else: | |
| source = 'llego_mutation' | |
| candidate_dicts.append({ | |
| 'prompt': cand, | |
| 'source': source, | |
| 'index': len(candidate_dicts) + 1 | |
| }) | |
| else: | |
| self.logger.warning(f"β οΈ Unknown candidate format: {type(cand)}") | |
| self._generated_candidates = candidate_dicts | |
| # Store candidate sources for tracking | |
| for cand_dict in candidate_dicts: | |
| if 'prompt' in cand_dict and 'source' in cand_dict: | |
| self._candidate_sources[cand_dict['prompt']] = cand_dict['source'] | |
| # π₯ CRITICAL: Inject into LLM client wrapper so it can return them when GEPA calls | |
| # This is the key mechanism: when GEPA calls adapter.llm_client.generate() for proposals, | |
| # our wrapper will detect it and return our pre-generated candidates | |
| if hasattr(self.llm_client, '_adapter_generated_candidates'): | |
| self.llm_client._adapter_generated_candidates = candidate_dicts.copy() | |
| self.logger.debug(f"Injected {len(candidate_dicts)} candidates") | |
| else: | |
| try: | |
| self.llm_client._adapter_generated_candidates = candidate_dicts.copy() | |
| except Exception as e: | |
| self.logger.error(f"Failed to inject candidates: {e}") | |
| # Evaluate generated candidates on Dpareto for fair comparison | |
| if hasattr(self, '_evaluating_generated_candidates'): | |
| pass # Skip to prevent recursion | |
| elif self._valset and len(self._valset) > 0: | |
| self._evaluating_generated_candidates = True | |
| self.logger.debug(f"Evaluating {len(candidate_dicts)} candidates on Dpareto ({len(self._valset)} samples)") | |
| # π₯ NEW: Collect all candidates with scores for batch update | |
| candidates_with_scores = [] | |
| for i, cand_dict in enumerate(candidate_dicts, 1): | |
| cand_prompt = cand_dict.get('prompt', '') | |
| cand_source = cand_dict.get('source', 'unknown') | |
| if not cand_prompt: | |
| continue | |
| # Normalize prompt for duplicate detection | |
| normalized_prompt = cand_prompt.strip().strip('"\'') | |
| # Check if already evaluated on Dpareto (avoid double evaluation) | |
| if normalized_prompt in self._dpareto_evaluated_candidates: | |
| existing_score, existing_type, _ = self._dpareto_evaluated_candidates[normalized_prompt] | |
| # Still add to batch for Pareto update (with existing score) | |
| notation_map = { | |
| 'seed': 'Sβ', | |
| 'gepa_reflection': 'Sα΅£', | |
| 'llego_crossover': 'Oββ', | |
| 'llego_mutation': 'Oβα΅€β' | |
| } | |
| cand_notation = notation_map.get(cand_source, 'S') | |
| candidates_with_scores.append({ | |
| 'prompt': cand_prompt, | |
| 'score': existing_score, | |
| 'type': cand_source, | |
| 'notation': cand_notation | |
| }) | |
| continue | |
| # Evaluate this candidate on valset (Dpareto) | |
| try: | |
| # Set candidate type for proper logging | |
| self._current_evaluation_type = cand_source | |
| # π₯ CRITICAL: Temporarily disable individual Pareto updates | |
| # We'll do batch update after all evaluations | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| original_log_method = pareto_log.log_candidate_evaluation | |
| # Temporarily replace to prevent individual updates | |
| def noop_log(*args, **kwargs): | |
| pass # Skip individual logging - we'll batch update later | |
| pareto_log.log_candidate_evaluation = noop_log | |
| # Evaluate on valset - THIS IS THE FAIR EVALUATION ON SAME DATASET | |
| valset_eval = self.evaluate( | |
| batch=self._valset, # Same valset as seed! | |
| candidate={'system_prompt': cand_prompt, 'source': cand_source}, | |
| capture_traces=True | |
| ) | |
| # Restore original method | |
| pareto_log.log_candidate_evaluation = original_log_method | |
| avg_score = sum(valset_eval.scores) / len(valset_eval.scores) if valset_eval.scores else 0.0 | |
| # Store evaluation result to avoid double evaluation | |
| self._dpareto_evaluated_candidates[normalized_prompt] = ( | |
| avg_score, | |
| cand_source, | |
| 'evaluated_in_make_reflective_dataset' | |
| ) | |
| self.logger.debug(f"Candidate {i} evaluated: score={avg_score:.4f}") | |
| # Generate notation | |
| notation_map = { | |
| 'seed': 'Sβ', | |
| 'gepa_reflection': 'Sα΅£', | |
| 'llego_crossover': 'Oββ', | |
| 'llego_mutation': 'Oβα΅€β' | |
| } | |
| cand_notation = notation_map.get(cand_source, 'S') | |
| # Add to batch for Pareto update | |
| candidates_with_scores.append({ | |
| 'prompt': cand_prompt, | |
| 'score': avg_score, | |
| 'type': cand_source, | |
| 'notation': cand_notation | |
| }) | |
| # π₯ CRITICAL: Explicitly add this candidate to LLEGO population with Dpareto fitness | |
| if self.llego: | |
| from ..operators.llego_operators import PromptCandidate | |
| # Check if already in population | |
| existing_in_pop = False | |
| for p in self.llego.population: | |
| if p.prompt.strip().strip('"\'') == normalized_prompt: | |
| # Update fitness if this Dpareto score is better | |
| if avg_score > p.fitness: | |
| old_fitness = p.fitness | |
| p.fitness = avg_score | |
| if p.metadata: | |
| p.metadata['candidate_type'] = cand_source | |
| p.metadata['dataset_evaluated'] = 'dpareto' | |
| self.logger.debug(f"Updated LLEGO fitness: {old_fitness:.4f} β {avg_score:.4f}") | |
| existing_in_pop = True | |
| break | |
| if not existing_in_pop: | |
| # Add new candidate to population | |
| prompt_candidate = PromptCandidate( | |
| prompt=cand_prompt, | |
| fitness=avg_score, | |
| metadata={ | |
| 'generation': self.llego.current_generation, | |
| 'operator': 'evaluated_on_dpareto', | |
| 'prompt_length': len(cand_prompt), | |
| 'word_count': len(cand_prompt.split()), | |
| 'evaluation_samples': len(valset_eval.scores) if valset_eval.scores else 0, | |
| 'candidate_type': cand_source, | |
| 'dataset_evaluated': 'dpareto' | |
| } | |
| ) | |
| self.llego.update_population([prompt_candidate]) | |
| except Exception as e: | |
| self.logger.error(f" β Error evaluating candidate #{i} on Dpareto: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # Batch Pareto front update | |
| if candidates_with_scores: | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| added_candidates = pareto_log.batch_update_pareto_front(candidates_with_scores) | |
| # π₯ CRITICAL: Update queue with scores for best-candidate selection | |
| # Create a mapping of prompt -> score for quick lookup | |
| prompt_to_score = {c['prompt'].strip().strip('"\''): c['score'] for c in candidates_with_scores} | |
| # Update candidates in queue with their scores | |
| if hasattr(self.llm_client, '_adapter_generated_candidates'): | |
| updated_queue = [] | |
| for cand in self.llm_client._adapter_generated_candidates: | |
| if isinstance(cand, dict): | |
| cand_prompt = cand.get('prompt', '') | |
| normalized = cand_prompt.strip().strip('"\'') | |
| if normalized in prompt_to_score: | |
| # Update with score | |
| cand['score'] = prompt_to_score[normalized] | |
| updated_queue.append(cand) | |
| else: | |
| updated_queue.append(cand) | |
| else: | |
| updated_queue.append(cand) | |
| self.llm_client._adapter_generated_candidates = updated_queue | |
| self.logger.debug(f"Pareto update: {len(added_candidates)} added, front size={len(pareto_log.pareto_front)}") | |
| # Clear flag after evaluation complete | |
| self._evaluating_generated_candidates = False | |
| elif not hasattr(self, '_evaluating_generated_candidates'): | |
| self.logger.error("Valset not available - cannot evaluate generated candidates") | |
| # Signal LLEGO-enhanced client for reflection mode | |
| if self.llego and hasattr(self.llm_client, 'set_reflection_context'): | |
| self.llm_client.set_reflection_context( | |
| current_prompt=system_prompt, | |
| feedback=eval_batch, | |
| in_reflection=True | |
| ) | |
| # π₯ CRITICAL: Also set reflection context on reflection_lm_client if it exists | |
| # This ensures hybrid mode candidate generation is triggered when GEPA calls reflection_lm_callable | |
| if hasattr(self, 'reflection_lm_client') and self.reflection_lm_client: | |
| if hasattr(self.reflection_lm_client, 'set_reflection_context'): | |
| self.logger.info("π₯ CRITICAL: Setting reflection context on reflection_lm_client for hybrid mode") | |
| self.reflection_lm_client.set_reflection_context( | |
| current_prompt=system_prompt, | |
| feedback=eval_batch, | |
| in_reflection=True # This enables hybrid candidate generation! | |
| ) | |
| self._log_reflection_dataset_creation(candidate, eval_batch, components_to_update) | |
| # Inject generated candidates into reflective dataset | |
| suggested_prompts = [] | |
| if hasattr(self, '_generated_candidates') and self._generated_candidates: | |
| suggested_prompts = [c['prompt'] for c in self._generated_candidates if isinstance(c, dict) and 'prompt' in c] | |
| self.logger.debug(f"Injecting {len(suggested_prompts)} suggested prompts") | |
| for component in components_to_update: | |
| reflective_dataset[component] = [] | |
| for trace in eval_batch.trajectories: | |
| # Generate feedback based on evaluation results | |
| # π Phase 2: Pass trace and current_prompt for LLM-as-Judge | |
| feedback = self._generate_feedback( | |
| trace['evaluation_results'], | |
| trace=trace, | |
| current_prompt=system_prompt | |
| ) | |
| # Base reflection data | |
| # π₯ FIX: Strip image_base64 from input_data to prevent massive base64 strings in logs | |
| input_data_clean = trace['input_data'].copy() if isinstance(trace['input_data'], dict) else {} | |
| if 'image_base64' in input_data_clean: | |
| input_data_clean['image_base64'] = f"[IMAGE_DATA_{len(input_data_clean['image_base64'])}_chars]" | |
| # π₯ FIX: Clean detailed_scores to remove any base64 references or large data | |
| detailed_scores_clean = {} | |
| if isinstance(trace['evaluation_results'], dict): | |
| for key, value in trace['evaluation_results'].items(): | |
| # Skip any values that look like base64 (very long strings) | |
| if isinstance(value, str) and len(value) > 1000: | |
| detailed_scores_clean[key] = f"[DATA_{len(value)}_chars]" | |
| else: | |
| detailed_scores_clean[key] = value | |
| else: | |
| detailed_scores_clean = trace['evaluation_results'] | |
| reflection_entry = { | |
| "current_prompt": system_prompt, | |
| "input_data": input_data_clean, # Use cleaned version without full base64 | |
| "predicted_output": trace['predicted_output'], | |
| "score": trace['evaluation_results'].get("composite_score", 0.0), | |
| "feedback": feedback, | |
| "detailed_scores": detailed_scores_clean # Cleaned scores without large data | |
| } | |
| # π₯ CRITICAL: Only optimize system_prompt, NOT user_prompt | |
| # The user_prompt contains the task description (command) and should NOT be modified | |
| if component == 'system_prompt' and suggested_prompts: | |
| # Add suggested improved prompts to the reflection entry | |
| # GEPA might use these if the structure supports it | |
| reflection_entry["suggested_improved_prompts"] = suggested_prompts | |
| reflection_entry["num_suggestions"] = len(suggested_prompts) | |
| # Also add the best suggested prompt as a direct suggestion | |
| if suggested_prompts: | |
| reflection_entry["suggested_prompt"] = suggested_prompts[0] # First candidate as primary suggestion | |
| reflection_entry["optimize_component"] = "system_prompt_only" # Mark that we only optimize system_prompt | |
| elif component != 'system_prompt': | |
| # For non-system_prompt components (like user_prompt), do NOT add suggestions | |
| # We only want to optimize system_prompt | |
| reflection_entry["optimize_component"] = "skip" # Mark to skip optimization | |
| self.logger.info(f"β οΈ Skipping optimization for component '{component}' - only optimizing system_prompt") | |
| reflective_dataset[component].append(reflection_entry) | |
| total_samples = sum(len(data) for data in reflective_dataset.values()) | |
| avg_score = sum(trace['score'] for data in reflective_dataset.values() for trace in data) / total_samples if total_samples > 0 else 0.0 | |
| self.logger.info(f"π Reflection dataset created - {total_samples} samples, avg score: {avg_score:.4f}") | |
| return reflective_dataset | |
| def _generate_feedback( | |
| self, | |
| evaluation_results: Dict[str, Any], | |
| trace: Optional[Dict[str, Any]] = None, | |
| current_prompt: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Generate feedback using hybrid approach: | |
| - LLM-as-Judge for low/medium scores (detailed, actionable) | |
| - Simple feedback for high scores (efficient) | |
| Args: | |
| evaluation_results: Evaluation scores and extracted data | |
| trace: Full trace with input_data, predicted_output, etc. (optional) | |
| current_prompt: The current system prompt being optimized (optional) | |
| Returns: | |
| Feedback string focused on prompt improvement | |
| """ | |
| composite_score = evaluation_results.get("composite_score", 0.0) | |
| # Check if LLM-as-Judge is enabled | |
| use_llm_judge = getattr(self._config, 'use_llm_as_judge', True) | |
| threshold = getattr(self._config, 'llm_as_judge_threshold', 0.8) | |
| # π₯ FIX: Check both attribute names (inconsistency in codebase) | |
| reflection_lm = getattr(self, '_reflection_lm_client', None) or getattr(self, 'reflection_lm_client', None) | |
| # Debug logging - use INFO so we can see what's happening | |
| self.logger.info(f"π Feedback generation: score={composite_score:.4f}, use_llm_judge={use_llm_judge}, threshold={threshold}, has_trace={trace is not None}, has_reflection_lm={reflection_lm is not None}") | |
| if trace: | |
| input_data = trace.get('input_data', {}) | |
| predicted = trace.get('predicted_output', '')[:100] if trace.get('predicted_output') else 'N/A' | |
| expected = input_data.get('output', '')[:100] if input_data.get('output') else 'N/A' | |
| self.logger.info(f" Predicted preview: {predicted}...") | |
| self.logger.info(f" Expected preview: {expected}...") | |
| # Use LLM-as-Judge for scores needing improvement | |
| if use_llm_judge and composite_score < threshold and trace: | |
| if not reflection_lm: | |
| self.logger.warning("β οΈ LLM-as-Judge requested but reflection_lm_client not available - using simple feedback") | |
| self.logger.warning(f" Checked: _reflection_lm_client={getattr(self, '_reflection_lm_client', None) is not None}, reflection_lm_client={getattr(self, 'reflection_lm_client', None) is not None}") | |
| else: | |
| try: | |
| self.logger.info(f"π€ Calling LLM-as-Judge for detailed feedback (score: {composite_score:.4f} < threshold: {threshold})") | |
| feedback = self._llm_as_judge_feedback( | |
| evaluation_results, | |
| trace, | |
| current_prompt | |
| ) | |
| self.logger.info(f"β LLM-as-Judge returned feedback (length: {len(feedback)} chars)") | |
| return feedback | |
| except Exception as e: | |
| self.logger.error(f"β LLM-as-Judge failed: {e}, falling back to simple feedback") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # Fall through to simple feedback | |
| # Simple actionable feedback (for high scores or as fallback) | |
| if composite_score >= threshold: | |
| self.logger.debug(f"β Score {composite_score:.4f} >= threshold {threshold} - using simple feedback") | |
| elif not trace: | |
| self.logger.debug(f"β οΈ No trace provided - using simple feedback") | |
| elif not use_llm_judge: | |
| self.logger.debug(f"β οΈ LLM-as-Judge disabled - using simple feedback") | |
| feedback = self._simple_actionable_feedback( | |
| evaluation_results, | |
| trace, | |
| current_prompt | |
| ) | |
| # π₯ ADD FORMAT FEEDBACK: Append format-specific feedback if available | |
| if self._detected_format and trace: | |
| from ..utils.format_detection import generate_format_feedback | |
| input_data = trace.get('input_data', {}) | |
| format_feedback = generate_format_feedback( | |
| predicted_output=trace.get('predicted_output', ''), | |
| expected_output=input_data.get('output', ''), | |
| format_info=self._detected_format | |
| ) | |
| if format_feedback: | |
| feedback += format_feedback | |
| return feedback | |
| def _llm_as_judge_feedback( | |
| self, | |
| evaluation_results: Dict[str, Any], | |
| trace: Dict[str, Any], | |
| current_prompt: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Generate detailed, actionable feedback using LLM-as-Judge. | |
| π₯ UNIVERSAL VERSION: Works for ANY task type (text, JSON, structured outputs). | |
| No UI-specific assumptions. Pure semantic and structural comparison. | |
| Args: | |
| evaluation_results: Evaluation scores and extracted data | |
| trace: Full trace with input_data, predicted_output, etc. | |
| current_prompt: The current system prompt being optimized | |
| Returns: | |
| Detailed feedback string focused on prompt improvement | |
| """ | |
| # Import universal judge prompt builder | |
| from ..utils.universal_judge_prompt import ( | |
| build_universal_judge_prompt, | |
| get_universal_judge_system_prompt, | |
| format_universal_judge_feedback, | |
| build_empty_output_feedback | |
| ) | |
| # Extract data from trace | |
| input_data = trace.get('input_data', {}) | |
| predicted_output = trace.get('predicted_output', '') or '' | |
| expected_output = input_data.get('output', '') or '' | |
| task_input = input_data.get('input', '') or '' | |
| # Get image if available (for multi-modal tasks) | |
| image_base64 = input_data.get('image', '') or input_data.get('image_base64', '') | |
| # Log what we're working with | |
| self.logger.info(f"π LLM-as-Judge input check:") | |
| self.logger.info(f" predicted_output length: {len(predicted_output)} chars") | |
| self.logger.info(f" expected_output length: {len(expected_output)} chars") | |
| self.logger.info(f" image available: {bool(image_base64)} (length: {len(image_base64) if image_base64 else 0} chars)") | |
| self.logger.info(f" predicted_output preview: {predicted_output[:200] if predicted_output else '[EMPTY]'}...") | |
| self.logger.info(f" expected_output preview: {expected_output[:200] if expected_output else '[EMPTY]'}...") | |
| # Handle empty predicted output specially | |
| if not predicted_output or not predicted_output.strip(): | |
| self.logger.warning(f"β οΈ Predicted output is empty - generating specialized feedback") | |
| return build_empty_output_feedback(task_input, expected_output, current_prompt) | |
| if not image_base64: | |
| self.logger.debug(f"βΉοΈ No image provided - text-only analysis") | |
| # Get the LLM for judging | |
| judge_llm = getattr(self, '_reflection_lm_client', None) or getattr(self, 'reflection_lm_client', None) | |
| if not judge_llm: | |
| self.logger.error("β CRITICAL: No reflection_lm_client available for LLM-as-Judge!") | |
| raise ValueError("reflection_lm_client not available") | |
| # Build the universal judge prompt | |
| judge_prompt = build_universal_judge_prompt( | |
| task_input=task_input, | |
| predicted_output=predicted_output, | |
| expected_output=expected_output, | |
| current_prompt=current_prompt, | |
| evaluation_results=evaluation_results, | |
| image_base64=image_base64 | |
| ) | |
| # Get the universal system prompt | |
| system_prompt = get_universal_judge_system_prompt(has_image=bool(image_base64)) | |
| # Call LLM-as-Judge | |
| try: | |
| self.logger.info(f"π€ Calling Universal LLM-as-Judge for semantic analysis") | |
| result = judge_llm.generate( | |
| system_prompt=system_prompt, | |
| user_prompt=judge_prompt, | |
| image_base64=image_base64 if image_base64 else "" | |
| ) | |
| if isinstance(result, dict): | |
| judge_output = result.get('content', '') | |
| else: | |
| judge_output = str(result) | |
| # Format the feedback using the universal formatter | |
| score = evaluation_results.get('composite_score', 0.0) | |
| feedback = format_universal_judge_feedback( | |
| judge_output=judge_output, | |
| task_input=task_input, | |
| predicted_output=predicted_output, | |
| expected_output=expected_output, | |
| score=score | |
| ) | |
| # π₯ ADD FORMAT FEEDBACK: Append format-specific feedback | |
| if self._detected_format: | |
| from ..utils.format_detection import generate_format_feedback | |
| format_feedback = generate_format_feedback( | |
| predicted_output=predicted_output, | |
| expected_output=expected_output, | |
| format_info=self._detected_format | |
| ) | |
| if format_feedback: | |
| feedback += format_feedback | |
| # Also add format constraint for next iteration | |
| feedback += f"\n\n{self._detected_format['format_constraint']}" | |
| self.logger.info(f"β Universal LLM-as-Judge generated feedback") | |
| return feedback | |
| except Exception as e: | |
| self.logger.error(f"LLM-as-Judge failed: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # Fallback to simple feedback | |
| return self._simple_actionable_feedback(evaluation_results, trace, current_prompt) | |
| def _extract_reasoning_from_expected(self, expected_output: str) -> str: | |
| """Extract reasoning section from expected output.""" | |
| if not expected_output: | |
| return "" | |
| # Look for "Reason:" or "Reasoning:" section | |
| reason_patterns = [ | |
| r'Reason[:\s]+(.*?)(?:\n\n|\Z)', | |
| r'Reasoning[:\s]+(.*?)(?:\n\n|\Z)', | |
| ] | |
| for pattern in reason_patterns: | |
| match = re.search(pattern, expected_output, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| return match.group(1).strip()[:500] # Truncate to 500 chars | |
| return "" | |
| def _extract_reasoning_from_predicted(self, predicted_output: str) -> str: | |
| """Extract reasoning from predicted output if available.""" | |
| # Similar to _extract_reasoning_from_expected | |
| # Or return first 200 chars if no clear reasoning section | |
| if not predicted_output: | |
| return "" | |
| # Look for reasoning patterns | |
| reason_patterns = [ | |
| r'Reason[:\s]+(.*?)(?:\n\n|\Z)', | |
| r'Reasoning[:\s]+(.*?)(?:\n\n|\Z)', | |
| ] | |
| for pattern in reason_patterns: | |
| match = re.search(pattern, predicted_output, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| return match.group(1).strip()[:500] | |
| # If no reasoning found, return first 200 chars | |
| if len(predicted_output) > 200: | |
| return predicted_output[:200] + "..." | |
| return predicted_output | |
| def _simple_actionable_feedback( | |
| self, | |
| evaluation_results: Dict[str, Any], | |
| trace: Dict[str, Any] = None, | |
| current_prompt: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Simple feedback without LLM-as-Judge. | |
| π₯ UNIVERSAL VERSION: Works for any task type. | |
| """ | |
| composite_score = evaluation_results.get("composite_score", 0.0) | |
| semantic_sim = evaluation_results.get("semantic_similarity", 0.0) | |
| structural_sim = evaluation_results.get("structural_similarity", 0.0) | |
| feedback_parts = [] | |
| # Extract task context if available | |
| if trace: | |
| input_data = trace.get('input_data', {}) | |
| predicted = trace.get('predicted_output', '') | |
| expected = input_data.get('output', '') | |
| # Check for empty output | |
| if not predicted or not predicted.strip(): | |
| feedback_parts.append( | |
| "β CRITICAL: No output generated. " | |
| "Add explicit output instructions to the prompt." | |
| ) | |
| # Check for format mismatch | |
| elif structural_sim < 0.5: | |
| feedback_parts.append( | |
| f"β οΈ Format mismatch (structural similarity: {structural_sim:.0%}). " | |
| "Add output format instructions (e.g., 'Return as JSON with fields: ...')." | |
| ) | |
| # Check for semantic mismatch | |
| elif semantic_sim < 0.5: | |
| feedback_parts.append( | |
| f"β οΈ Semantic mismatch (similarity: {semantic_sim:.0%}). " | |
| "The output meaning differs from expected. Add clearer task instructions." | |
| ) | |
| # Score-based feedback | |
| if composite_score >= 0.9: | |
| feedback_parts.append("β Excellent match - prompt is working well.") | |
| elif composite_score >= 0.8: | |
| feedback_parts.append("β Good match - minor refinements possible.") | |
| elif composite_score >= 0.6: | |
| feedback_parts.append( | |
| f"β οΈ Partial match (score: {composite_score:.0%}). " | |
| "Consider adding examples or more specific field names to the prompt." | |
| ) | |
| elif composite_score >= 0.3: | |
| feedback_parts.append( | |
| f"β οΈ Low match (score: {composite_score:.0%}). " | |
| "The prompt needs clearer instructions about expected output format and content." | |
| ) | |
| else: | |
| feedback_parts.append( | |
| f"β Poor match (score: {composite_score:.0%}). " | |
| "Major revision required - add explicit output format, field names, and examples." | |
| ) | |
| return "\n".join(feedback_parts) if feedback_parts else f"Score: {composite_score:.0%}" | |
| def get_best_candidate(self) -> Optional[Dict[str, str]]: | |
| """ | |
| Get the best candidate from GEPA Pareto front. | |
| GEPA Pareto front is the single source of truth because: | |
| - All candidates (GEPA reflection, LLEGO crossover, LLEGO mutation) are evaluated on Dpareto | |
| - All non-dominated candidates are added to GEPA Pareto front | |
| - Therefore, the best candidate MUST be in GEPA Pareto front | |
| Returns: | |
| Best candidate dictionary from GEPA Pareto front, or None if empty | |
| """ | |
| # PRIMARY: Get best candidate from GEPA Pareto front (single source of truth) | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| if pareto_log.pareto_front: | |
| try: | |
| # Get best candidate from GEPA Pareto front (highest score = best) | |
| gepa_best = max(pareto_log.pareto_front, key=lambda x: x['score']) | |
| gepa_fitness = gepa_best['score'] | |
| gepa_prompt = gepa_best['prompt'] | |
| gepa_type = gepa_best.get('type', 'unknown') | |
| gepa_notation = gepa_best.get('notation', 'S') | |
| self.logger.info(f"β Best candidate from GEPA Pareto front: {gepa_notation} with f({gepa_notation})={gepa_fitness:.4f}") | |
| self.logger.info(f" Type: {gepa_type}, Prompt length: {len(gepa_prompt)} chars") | |
| self.logger.info(f" π‘ GEPA Pareto front is single source of truth (all candidates evaluated on Dpareto)") | |
| return { | |
| 'system_prompt': gepa_prompt, | |
| 'fitness': gepa_fitness, | |
| 'source': 'gepa_pareto_front', | |
| 'candidate_type': gepa_type, | |
| 'notation': gepa_notation | |
| } | |
| except Exception as e: | |
| self.logger.error(f"β Failed to get best from GEPA Pareto front: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # EDGE CASE: Pareto front empty (shouldn't happen, but handle gracefully) | |
| self.logger.warning("β οΈ GEPA Pareto front is empty - no best candidate available") | |
| self.logger.warning(" This should not happen if all candidates are evaluated on Dpareto") | |
| return None | |
| def get_best_score(self) -> float: | |
| """Get the best score from GEPA Pareto front (single source of truth).""" | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| if pareto_log.pareto_front: | |
| try: | |
| gepa_best_fitness = max(p['score'] for p in pareto_log.pareto_front) | |
| return gepa_best_fitness | |
| except Exception as e: | |
| self.logger.warning(f"β οΈ Failed to get best fitness from GEPA Pareto front: {e}") | |
| # Edge case: Pareto front empty - fallback to adapter's score | |
| return self._best_score | |
| def log_proposed_candidate(self, candidate: Dict[str, str], iteration: int = 0): | |
| """ | |
| Pretty print the new proposed candidate prompt. | |
| Args: | |
| candidate: The new candidate prompt from GEPA | |
| iteration: Current optimization iteration | |
| """ | |
| system_prompt = candidate.get('system_prompt', '') | |
| candidate_source = candidate.get('source', 'unknown') | |
| # Store source in adapter state so evaluate() can access it | |
| self._current_evaluation_type = candidate_source | |
| # Also store in mapping by prompt text for lookup | |
| if candidate_source != 'unknown' and system_prompt: | |
| self._candidate_sources[system_prompt] = candidate_source | |
| # Use clean logger for simpler output | |
| from ..utils.clean_logger import get_clean_logger | |
| clean_log = get_clean_logger() | |
| # Update iteration if needed | |
| if iteration > clean_log.current_iteration: | |
| clean_log.log_iteration_start(iteration, seed_prompt=None) | |
| # Don't log here - let evaluate() handle it with full context | |
| def _log_reflection_dataset_creation(self, candidate: Dict[str, str], eval_batch: EvaluationBatch, | |
| components_to_update: List[str]): | |
| """ | |
| Pretty print the reflection dataset creation process. | |
| Args: | |
| candidate: Current candidate being evaluated | |
| eval_batch: Evaluation results | |
| components_to_update: Components being updated | |
| """ | |
| system_prompt = candidate.get('system_prompt', '') | |
| self.logger.info(f"π DEBUG: Inside _log_reflection_dataset_creation") | |
| self.logger.info(f"π DEBUG: system_prompt length: {len(system_prompt)}") | |
| self.logger.info(f"π DEBUG: eval_batch.scores: {eval_batch.scores}") | |
| self.logger.info(f"π DEBUG: eval_batch.trajectories: {len(eval_batch.trajectories) if eval_batch.trajectories else 0}") | |
| # Determine candidate notation | |
| notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'} | |
| notation = notation_map.get(self._current_evaluation_type, 'S') | |
| cand_num = self._evaluation_count if hasattr(self, '_evaluation_count') else '?' | |
| cand_label = f"{notation}{cand_num}" | |
| # Use logger for the main output too | |
| self.logger.info("\n" + "="*80) | |
| self.logger.info("π REFLECTION DATASET CREATION") | |
| self.logger.info("="*80) | |
| self.logger.info(f"\nπ CURRENT PROMPT BEING ANALYZED: {cand_label}") | |
| self.logger.info(f" Candidate Type: {self._current_evaluation_type or 'unknown'}") | |
| self.logger.info("-" * 40) | |
| self.logger.info(f'"{system_prompt}"') | |
| self.logger.info("-" * 40) | |
| self.logger.info(f"\nπ EVALUATION SUMMARY:") | |
| self.logger.info("-" * 40) | |
| if eval_batch.scores: | |
| avg_score = sum(eval_batch.scores) / len(eval_batch.scores) | |
| min_score = min(eval_batch.scores) | |
| max_score = max(eval_batch.scores) | |
| self.logger.info(f" β’ Average Score: {avg_score:.4f}") | |
| self.logger.info(f" β’ Min Score: {min_score:.4f}") | |
| self.logger.info(f" β’ Max Score: {max_score:.4f}") | |
| self.logger.info(f" β’ Total Samples: {len(eval_batch.scores)}") | |
| self.logger.info(f"\nπ― COMPONENTS TO UPDATE:") | |
| self.logger.info("-" * 40) | |
| for i, component in enumerate(components_to_update, 1): | |
| self.logger.info(f" {i}. {component}") | |
| if eval_batch.trajectories: | |
| self.logger.info(f"\nπ DETAILED ANALYSIS (FULL FEEDBACK - NO TRUNCATION):") | |
| self.logger.info("-" * 80) | |
| for i, trace in enumerate(eval_batch.trajectories[:5], 1): # Show first 5 samples with FULL details | |
| evaluation_results = trace['evaluation_results'] | |
| composite_score = evaluation_results.get("composite_score", 0.0) | |
| # Extract element IDs for concise logging | |
| predicted_element = evaluation_results.get('predicted_element', 'Unknown') | |
| expected_element = evaluation_results.get('expected_element', 'Unknown') | |
| # Concise, direct logging with candidate notation | |
| status_icon = "β " if composite_score == 1.0 else "β" | |
| # Add notation for candidate type | |
| notation_map = {'seed': 'Sβ', 'gepa_reflection': 'Sα΅£', 'llego_crossover': 'Oββ', 'llego_mutation': 'Oβα΅€β'} | |
| notation = notation_map.get(self._current_evaluation_type, 'S') | |
| self.logger.info(f" [{notation}] Sample {i}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status_icon}") | |
| # π₯ FIX: Pass trace and current_prompt to enable LLM-as-Judge! | |
| feedback = self._generate_feedback( | |
| evaluation_results, | |
| trace=trace, # Pass the full trace! | |
| current_prompt=system_prompt # Pass current prompt being analyzed! | |
| ) | |
| self.logger.info(f" π¬ FEEDBACK (FULL):") | |
| self.logger.info(f" \"{feedback}\"") | |
| if len(eval_batch.trajectories) > 5: | |
| self.logger.info(f"\n ... and {len(eval_batch.trajectories) - 5} more samples (all logged similarly)") | |
| self.logger.info("="*80) | |
| def _extract_clean_prompt_from_reflection(self, reflection_output: str) -> str: | |
| """ | |
| π‘οΈ DEFENSIVE FALLBACK: Extract clean prompt if LLM adds analysis despite system prompt instructions. | |
| NOTE: The system prompt now explicitly instructs the LLM to output ONLY the prompt text. | |
| However, this extraction logic serves as a safety net in case the LLM still adds: | |
| "Based on the performance analysis... | |
| ### Recommendations... | |
| ### Revised Prompt Example: | |
| [THE ACTUAL PROMPT HERE] | |
| ### Conclusion..." | |
| This is now a defensive measure, not the primary mechanism. | |
| Args: | |
| reflection_output: Full reflection output (should be clean prompt, but may contain analysis) | |
| Returns: | |
| str: Clean, extracted prompt (or original if extraction fails or not needed) | |
| """ | |
| if not reflection_output or not isinstance(reflection_output, str): | |
| return reflection_output | |
| # Pattern 1: Look for "Revised Prompt Example:" or "### Revised Prompt Example:" | |
| patterns = [ | |
| r'(?:###\s*)?Revised\s+Prompt\s+(?:Example|:)?\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', | |
| r'(?:###\s*)?Revised\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', | |
| r'(?:###\s*)?Optimized\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', | |
| r'(?:###\s*)?New\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', | |
| r'(?:Here\s+is|Here\'s)\s+a?\s*refined?\s+(?:version\s+of\s+)?(?:the\s+)?prompt\s*[:\n](.*?)(?:\n###|\n##|\n---|\Z)', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, reflection_output, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| extracted = match.group(1).strip() | |
| # Clean up common artifacts | |
| extracted = re.sub(r'^```(?:plaintext|markdown|text)?\s*\n', '', extracted, flags=re.MULTILINE) | |
| extracted = re.sub(r'\n```\s*$', '', extracted, flags=re.MULTILINE) | |
| extracted = extracted.strip() | |
| if len(extracted) > 50: # Reasonable minimum length for a prompt | |
| self.logger.debug(f"β Extracted clean prompt using pattern: {pattern[:50]}...") | |
| self.logger.debug(f" Original length: {len(reflection_output)} chars") | |
| self.logger.debug(f" Extracted length: {len(extracted)} chars") | |
| return extracted | |
| # Pattern 2: If output starts with a quote or prompt-like structure | |
| # Look for text that starts with "You are..." and is substantial | |
| if 'You are' in reflection_output: | |
| # Find the longest continuous block that starts with "You are" | |
| prompt_match = re.search(r'(You are[^#]*?)(?:\n###|\n##|###|##|Conclusion|\Z)', | |
| reflection_output, re.IGNORECASE | re.DOTALL) | |
| if prompt_match: | |
| extracted = prompt_match.group(1).strip() | |
| if len(extracted) > 50: | |
| self.logger.debug(f"β Extracted prompt starting with 'You are...'") | |
| return extracted | |
| # Pattern 3: If the reflection output is actually just a clean prompt (no analysis) | |
| # Check if it's relatively short and doesn't contain analysis keywords | |
| analysis_keywords = ['recommendation', 'suggestion', 'improvement', 'conclusion', | |
| 'optimization', 'analysis', 'feedback'] | |
| if (len(reflection_output) < 2000 and | |
| not any(keyword in reflection_output.lower() for keyword in analysis_keywords)): | |
| # Likely a clean prompt, return as-is | |
| self.logger.debug(f"β Reflection output appears to be a clean prompt (no analysis detected)") | |
| return reflection_output.strip() | |
| # Fallback: Return original (with warning) | |
| self.logger.warning(f"β οΈ Could not extract clean prompt from reflection output") | |
| self.logger.warning(f" Output length: {len(reflection_output)} chars") | |
| self.logger.warning(f" Output preview: {reflection_output[:200]}...") | |
| self.logger.warning(f" Returning original output (may contain analysis text)") | |
| return reflection_output.strip() | |
| def _parse_json_variations(self, response_text: str, num_expected: int) -> List[str]: | |
| """ | |
| π₯ OPTIMIZED: Parse N prompt variations from JSON format response. | |
| Uses robust JSON parsing with multiple fallback strategies: | |
| 1. Extract JSON from markdown code blocks (```json ... ```) | |
| 2. Find JSON object directly in text | |
| 3. Attempt JSON repair for common issues | |
| 4. Fallback to numbered section parsing if JSON fails | |
| Args: | |
| response_text: LLM response containing JSON with variations | |
| num_expected: Expected number of variations | |
| Returns: | |
| List[str]: List of prompt variations (in order by index) | |
| Raises: | |
| ValueError: If parsing fails and no valid variations found | |
| """ | |
| import json | |
| import re | |
| if not response_text or not isinstance(response_text, str): | |
| raise ValueError("Empty or invalid response text") | |
| # Strategy 1: Extract JSON from markdown code block | |
| json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL) | |
| if json_match: | |
| json_str = json_match.group(1) | |
| try: | |
| data = json.loads(json_str) | |
| return self._extract_variations_from_json(data, num_expected) | |
| except json.JSONDecodeError as e: | |
| self.logger.debug(f"JSON in code block invalid: {e}, trying repair...") | |
| # Strategy 2: Find JSON object directly in text | |
| json_match = re.search(r'\{[^{}]*"variations"[^{}]*\[.*?\]\s*[^{}]*\}', response_text, re.DOTALL) | |
| if json_match: | |
| json_str = json_match.group(0) | |
| try: | |
| data = json.loads(json_str) | |
| return self._extract_variations_from_json(data, num_expected) | |
| except json.JSONDecodeError: | |
| # Try to find largest JSON object | |
| json_match = re.search(r'\{.*\}', response_text, re.DOTALL) | |
| if json_match: | |
| try: | |
| data = json.loads(json_match.group(0)) | |
| return self._extract_variations_from_json(data, num_expected) | |
| except json.JSONDecodeError: | |
| pass | |
| # Strategy 3: Attempt JSON repair (common issues: trailing commas, unescaped quotes) | |
| json_match = re.search(r'\{.*\}', response_text, re.DOTALL) | |
| if json_match: | |
| json_str = json_match.group(0) | |
| # Try common repairs | |
| repaired = re.sub(r',\s*}', '}', json_str) # Remove trailing commas before } | |
| repaired = re.sub(r',\s*]', ']', repaired) # Remove trailing commas before ] | |
| try: | |
| data = json.loads(repaired) | |
| return self._extract_variations_from_json(data, num_expected) | |
| except json.JSONDecodeError: | |
| pass | |
| # Strategy 4: Fallback to numbered section parsing | |
| self.logger.warning(f"JSON parsing failed, trying numbered section fallback...") | |
| try: | |
| return self._parse_numbered_section_variations(response_text, num_expected) | |
| except ValueError: | |
| pass | |
| # All strategies failed | |
| raise ValueError(f"Could not parse {num_expected} variations from response. Response preview: {response_text[:300]}...") | |
| def _extract_variations_from_json(self, data: Dict[str, Any], num_expected: int) -> List[str]: | |
| """Extract and validate variations from parsed JSON data.""" | |
| if not isinstance(data, dict): | |
| raise ValueError("JSON data is not a dictionary") | |
| variations_list = data.get('variations', []) | |
| if not isinstance(variations_list, list): | |
| raise ValueError("'variations' field is not a list") | |
| if len(variations_list) < num_expected: | |
| self.logger.warning(f"Expected {num_expected} variations, found {len(variations_list)} in JSON") | |
| # Extract and sort by index | |
| variations_with_index = [] | |
| for var in variations_list: | |
| if not isinstance(var, dict): | |
| continue | |
| index = var.get('index', 0) | |
| prompt = var.get('prompt', '') | |
| if prompt and isinstance(prompt, str): | |
| variations_with_index.append((index, prompt.strip())) | |
| # Sort by index | |
| variations_with_index.sort(key=lambda x: x[0]) | |
| # Extract just the prompts | |
| variations = [v[1] for v in variations_with_index] | |
| # Validate count | |
| if len(variations) < num_expected: | |
| self.logger.warning(f"Only {len(variations)} valid variations found, expected {num_expected}") | |
| # Pad with duplicates if needed (not ideal but better than failing) | |
| while len(variations) < num_expected: | |
| variations.append(variations[-1] if variations else "") | |
| # Take first N if we got more | |
| variations = variations[:num_expected] | |
| # Validate all variations are non-empty | |
| if not all(v for v in variations): | |
| raise ValueError(f"Some variations are empty after parsing") | |
| return variations | |
| def _parse_numbered_section_variations(self, response_text: str, num_expected: int) -> List[str]: | |
| """ | |
| Fallback parser: Extract variations from numbered sections. | |
| Format: --- VARIATION N --- or Variation N: or similar | |
| """ | |
| variations = [] | |
| # Pattern 1: --- VARIATION N --- | |
| pattern1 = r'---\s*VARIATION\s+(\d+)\s*---\s*\n(.*?)(?=\n---\s*VARIATION|\Z)' | |
| matches1 = re.findall(pattern1, response_text, re.DOTALL | re.IGNORECASE) | |
| # Pattern 2: Variation N: | |
| pattern2 = r'Variation\s+(\d+)\s*:?\s*\n(.*?)(?=\nVariation\s+\d+|$)' | |
| matches2 = re.findall(pattern2, response_text, re.DOTALL | re.IGNORECASE) | |
| # Pattern 3: Numbered list (1. 2. 3.) | |
| pattern3 = r'(\d+)\.\s*\n(.*?)(?=\n\d+\.|$)' | |
| matches3 = re.findall(pattern3, response_text, re.DOTALL) | |
| # Use the pattern with most matches | |
| matches = matches1 if len(matches1) >= num_expected else (matches2 if len(matches2) >= num_expected else matches3) | |
| if len(matches) >= num_expected: | |
| # Sort by index | |
| matches.sort(key=lambda x: int(x[0])) | |
| # Extract prompts | |
| variations = [match[1].strip() for match in matches[:num_expected]] | |
| if len(variations) != num_expected: | |
| raise ValueError(f"Numbered section parsing found {len(variations)} variations, expected {num_expected}") | |
| return variations | |
| def _generate_hybrid_candidates_adapter_level( | |
| self, | |
| current_prompt: str, | |
| eval_batch: EvaluationBatch, | |
| candidate: Dict[str, str] | |
| ) -> List[str]: | |
| """ | |
| π₯ ADAPTER-LEVEL HYBRID CANDIDATE GENERATION | |
| Generate candidates from BOTH GEPA reflection AND LLEGO operators | |
| when GEPA's adapter mode ignores the reflection_lm parameter. | |
| This method: | |
| 1. Builds comprehensive feedback from evaluation results | |
| 2. Generates GEPA reflection candidates | |
| 3. Generates LLEGO crossover/mutation candidates | |
| 4. Logs ALL candidates with FULL prompts (no truncation) | |
| 5. Stores candidates for potential use | |
| Args: | |
| current_prompt: The current prompt being optimized | |
| eval_batch: Evaluation results with trajectories | |
| candidate: Current candidate dict | |
| Returns: | |
| List of generated candidate prompts | |
| """ | |
| try: | |
| from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient | |
| all_candidates = [] | |
| gepa_count = 0 | |
| # π₯ CRITICAL: Pass format info to LLM client before generating candidates | |
| if self._detected_format and self._reflection_lm_client: | |
| if isinstance(self._reflection_lm_client, LLEGOEnhancedLLMClient): | |
| self._reflection_lm_client._detected_format = self._detected_format | |
| self.logger.info(f"π Passed format info to reflection LLM: {self._detected_format['format_type']}") | |
| self.logger.info(f"π₯ STEP 1: Building comprehensive feedback from evaluation") | |
| # π₯ REMOVED: Excessive diagnostic logs - moved to DEBUG level | |
| # Build comprehensive feedback text from trajectories | |
| if not hasattr(eval_batch, 'trajectories'): | |
| self.logger.error(f"β eval_batch has no 'trajectories' attribute! Type: {type(eval_batch)}") | |
| return [] | |
| trajectories = eval_batch.trajectories | |
| if not trajectories: | |
| self.logger.warning(f"β οΈ eval_batch.trajectories is empty - no feedback to generate candidates from") | |
| return [] | |
| self.logger.debug(f"Processing {len(trajectories)} trajectories for feedback generation") | |
| feedback_lines = [] | |
| feedback_lines.append(f"Current prompt performance analysis:\n") | |
| feedback_lines.append(f"Current prompt:\n{current_prompt}\n") | |
| feedback_lines.append(f"\nEvaluation results:\n") | |
| for i, trace in enumerate(trajectories[:8], 1): # Use up to 8 samples for feedback | |
| try: | |
| eval_results = trace.get('evaluation_results', {}) | |
| score = eval_results.get("composite_score", 0.0) if isinstance(eval_results, dict) else 0.0 | |
| input_data = trace.get('input_data', {}) | |
| predicted = trace.get('predicted_output', '') | |
| expected = input_data.get('output', '') if isinstance(input_data, dict) else '' | |
| # π₯ FIX: Clean input_data to remove base64 images before logging | |
| input_data_clean = input_data.copy() if isinstance(input_data, dict) else {} | |
| if 'image_base64' in input_data_clean: | |
| input_data_clean['image_base64'] = f"[IMAGE_DATA_{len(input_data_clean['image_base64'])}_chars]" | |
| feedback_lines.append(f" Sample {i}:") | |
| feedback_lines.append(f" Input: {input_data_clean.get('input', '') if isinstance(input_data_clean, dict) else ''}") | |
| feedback_lines.append(f" Expected: {expected}") | |
| feedback_lines.append(f" Predicted: {predicted}") | |
| feedback_lines.append(f" Score: {score:.4f}") | |
| if isinstance(eval_results, dict): | |
| # π₯ FIX: Pass trace and current_prompt to enable LLM-as-Judge! | |
| feedback = self._generate_feedback( | |
| eval_results, | |
| trace=trace, # Pass the full trace! | |
| current_prompt=current_prompt # Pass current prompt! | |
| ) | |
| feedback_lines.append(f" Feedback: {feedback}") | |
| else: | |
| feedback_lines.append(f" Feedback: Evaluation results not in expected format") | |
| feedback_lines.append("") | |
| except Exception as e: | |
| self.logger.error(f"β Error processing trace {i}: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| continue | |
| feedback_text = "\n".join(feedback_lines) | |
| self.logger.info(f"\nπ FULL FEEDBACK TEXT (NO TRUNCATION):") | |
| self.logger.info(feedback_text) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PART 1: GEPA REFLECTION CANDIDATES | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| self.logger.info(f"π PART 2: GEPA REFLECTION - Semantic Understanding") | |
| num_gepa = self._config.num_gepa_reflection_candidates if hasattr(self._config, 'num_gepa_reflection_candidates') else 3 | |
| self.logger.info(f"\nπ Generating {num_gepa} GEPA Reflection candidates in single optimized call...") | |
| # Set reflection context | |
| if isinstance(self._reflection_lm_client, LLEGOEnhancedLLMClient): | |
| self._reflection_lm_client.set_reflection_context( | |
| current_prompt=current_prompt, | |
| feedback=eval_batch, | |
| in_reflection=True | |
| ) | |
| # π₯ OPTIMIZED: Single call with JSON format for multiple variations | |
| try: | |
| # Precision-engineered system prompt requesting JSON format | |
| optimization_system_prompt = f"""You are an expert prompt engineer specializing in iterative prompt optimization. | |
| Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate {num_gepa} DISTINCT variations of improved prompts that address the identified issues through DIFFERENT improvement strategies. | |
| CRITICAL OUTPUT FORMAT - MUST BE VALID JSON: | |
| {{ | |
| "variations": [ | |
| {{ | |
| "index": 1, | |
| "prompt": "[First improved prompt text - complete and self-contained]" | |
| }}, | |
| {{ | |
| "index": 2, | |
| "prompt": "[Second improved prompt text - complete and self-contained]" | |
| }}, | |
| {{ | |
| "index": 3, | |
| "prompt": "[Third improved prompt text - complete and self-contained]" | |
| }} | |
| ] | |
| }} | |
| DIVERSITY REQUIREMENTS: | |
| - Variation 1: Focus on clarity, specificity, and explicit instructions | |
| - Variation 2: Focus on edge case handling, robustness, and error prevention | |
| - Variation 3: Focus on structural organization, examples, and step-by-step guidance | |
| - Each variation must be MEANINGFULLY DIFFERENT (not just rewordings) | |
| - Each variation must address ALL feedback issues but through different approaches | |
| QUALITY STANDARDS (apply to all variations): | |
| - Be specific and concrete (avoid vague instructions) | |
| - Use clear, imperative language for task instructions | |
| - Include edge case handling if feedback identifies confusion | |
| - Ensure each prompt is self-contained and unambiguous | |
| - Preserve the core task domain and output format requirements | |
| OUTPUT FORMAT: | |
| - Output MUST be valid JSON (can be wrapped in ```json ... ``` markdown code block) | |
| - Generate EXACTLY {num_gepa} variations | |
| - Index must be 1, 2, 3, ... (sequential, starting at 1) | |
| - Each "prompt" field must contain the complete, self-contained prompt text | |
| - NO explanations, NO analysis, NO meta-commentary - just the JSON structure | |
| DO NOT include: | |
| - Analysis of what went wrong | |
| - Explanations of your changes | |
| - Meta-text like "Here's an improved version..." or "Based on feedback..." | |
| - Recommendations or suggestions (those are already in the feedback) | |
| - Any text outside the JSON structure | |
| Output ONLY the JSON object with the variations.""" | |
| # Construct user prompt with clear structure | |
| optimization_user_prompt = f"""CURRENT PROMPT (to be improved): | |
| {current_prompt} | |
| {feedback_text} | |
| TASK: Generate {num_gepa} DISTINCT variations of improved prompts. Each variation should: | |
| - Address ALL feedback issues identified above | |
| - Use a DIFFERENT improvement strategy (clarity, robustness, structure) | |
| - Be meaningfully different from the others (not just rewordings) | |
| - Be complete and self-contained | |
| Remember: Output ONLY the JSON object with {num_gepa} variations. No explanations.""" | |
| result = self._reflection_lm_client.generate( | |
| system_prompt=optimization_system_prompt, | |
| user_prompt=optimization_user_prompt, | |
| image_base64="" | |
| ) | |
| if isinstance(result, dict): | |
| response_text = result.get("content", str(result)) | |
| else: | |
| response_text = str(result) | |
| # Parse JSON variations | |
| gepa_variations = self._parse_json_variations(response_text, num_gepa) | |
| # Add all variations to candidates | |
| for idx, variation_prompt in enumerate(gepa_variations, 1): | |
| # π‘οΈ DEFENSIVE FALLBACK: Extract clean prompt if LLM adds analysis despite instructions | |
| gepa_candidate = self._extract_clean_prompt_from_reflection(variation_prompt) | |
| if gepa_candidate != variation_prompt: | |
| self.logger.debug(f" Variation {idx}: Extracted clean prompt (removed {len(variation_prompt) - len(gepa_candidate)} chars)") | |
| all_candidates.append({ | |
| 'prompt': gepa_candidate, | |
| 'source': 'gepa_reflection', | |
| 'index': idx | |
| }) | |
| # π₯ CAPTURE CANDIDATE FOR LIVE UI DISPLAY | |
| try: | |
| import sys | |
| if 'app' in sys.modules: | |
| app_module = sys.modules['app'] | |
| if hasattr(app_module, 'add_candidate_to_store'): | |
| app_module.add_candidate_to_store({ | |
| 'prompt': gepa_candidate, | |
| 'source': 'gepa_reflection', | |
| 'timestamp': f"Candidate #{idx}" | |
| }) | |
| except Exception: | |
| pass # Silent fail - UI capture is optional | |
| self.logger.info(f"\nβ GEPA REFLECTION CANDIDATE #{idx}/{num_gepa} (FULL PROMPT - NO TRUNCATION):") | |
| self.logger.info(f"{'β'*80}") | |
| self.logger.info(f"{gepa_candidate}") | |
| self.logger.info(f"{'β'*80}") | |
| self.logger.info(f" Length: {len(gepa_candidate)} chars, Words: {len(gepa_candidate.split())}") | |
| gepa_count = len(all_candidates) | |
| self.logger.info(f"\nβ GEPA Reflection: {gepa_count} candidates generated in single optimized call") | |
| except Exception as e: | |
| self.logger.error(f"β Error generating GEPA reflection candidates: {e}") | |
| self.logger.warning(f" Falling back to sequential generation...") | |
| import traceback | |
| self.logger.debug(traceback.format_exc()) | |
| # Fallback: Sequential generation (when JSON parsing fails) | |
| for i in range(num_gepa): | |
| self.logger.info(f"\nπ Generating GEPA Reflection candidate #{i+1}/{num_gepa} (fallback mode)...") | |
| try: | |
| fallback_user_prompt = f"""CURRENT PROMPT (to be improved): | |
| {current_prompt} | |
| {feedback_text} | |
| TASK: Generate an improved version of the CURRENT PROMPT that addresses all issues identified in the evaluation feedback above. | |
| Remember: Output ONLY the improved prompt text. No explanations.""" | |
| result = self._reflection_lm_client.generate( | |
| system_prompt=self._FALLBACK_SYSTEM_PROMPT, | |
| user_prompt=fallback_user_prompt, | |
| image_base64="" | |
| ) | |
| if isinstance(result, dict): | |
| gepa_candidate_raw = result.get("content", str(result)) | |
| else: | |
| gepa_candidate_raw = str(result) | |
| gepa_candidate = self._extract_clean_prompt_from_reflection(gepa_candidate_raw) | |
| all_candidates.append({ | |
| 'prompt': gepa_candidate, | |
| 'source': 'gepa_reflection', | |
| 'index': i + 1 | |
| }) | |
| # π₯ CAPTURE CANDIDATE FOR LIVE UI DISPLAY | |
| try: | |
| import sys | |
| if 'app' in sys.modules: | |
| app_module = sys.modules['app'] | |
| if hasattr(app_module, 'add_candidate_to_store'): | |
| app_module.add_candidate_to_store({ | |
| 'prompt': gepa_candidate, | |
| 'source': 'gepa_reflection', | |
| 'timestamp': f"Fallback #{i+1}" | |
| }) | |
| except Exception: | |
| pass # Silent fail - UI capture is optional | |
| except Exception as fallback_error: | |
| self.logger.error(f"β Error in fallback generation #{i+1}: {fallback_error}") | |
| gepa_count = len(all_candidates) | |
| if gepa_count > 0: | |
| self.logger.info(f"\nβ GEPA Reflection: {gepa_count} candidates generated") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PART 2: LLEGO GENETIC OPERATORS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| self.logger.info(f"𧬠PART 3: LLEGO GENETIC OPERATORS - Structural Diversity") | |
| if self.llego: | |
| # π₯ FIX 2: Get Pareto front from GEPA (not LLEGO population) | |
| # This ensures LLEGO operators use true non-dominated solutions | |
| from ..utils.pareto_logger import get_pareto_logger | |
| pareto_log = get_pareto_logger() | |
| gepa_pareto_front = pareto_log.pareto_front | |
| # Convert GEPA Pareto front to PromptCandidate format | |
| pareto_candidates = self.llego._convert_gepa_pareto_to_candidates(gepa_pareto_front) | |
| pareto_front = pareto_candidates | |
| self.logger.info(f" Using GEPA Pareto front (size: {len(gepa_pareto_front)})") | |
| self.logger.info(f" Converted to {len(pareto_front)} PromptCandidate objects") | |
| for idx, p in enumerate(pareto_front, 1): | |
| cand_type = p.metadata.get('candidate_type', 'unknown') if p.metadata else 'unknown' | |
| notation = p.metadata.get('notation', 'S') if p.metadata else 'S' | |
| self.logger.info(f" {notation}: [fitness={p.fitness:.3f}, type={cand_type}, length={len(p.prompt)} chars]") | |
| # Create LLM callable for LLEGO | |
| def llm_callable(genetic_prompt: str) -> str: | |
| # π₯ LLEGO genetic prompt already contains full instructions | |
| # Use minimal system prompt to avoid instruction conflict | |
| result = self._reflection_lm_client.generate( | |
| system_prompt="You are an expert prompt engineer. Follow the instructions provided in the user message to generate an improved prompt. Output only the prompt text, no explanations.", | |
| user_prompt=genetic_prompt, | |
| image_base64="" | |
| ) | |
| if isinstance(result, dict): | |
| return result.get('content', str(result)) | |
| return str(result) | |
| # Generate LLEGO offspring | |
| try: | |
| llego_prompts = self.llego.evolve_generation( | |
| llm=llm_callable, | |
| pareto_front=pareto_front | |
| ) | |
| n_crossover = self._config.n_crossover if hasattr(self._config, 'n_crossover') else 2 | |
| crossover_count = min(n_crossover, len(llego_prompts)) | |
| for i, prompt in enumerate(llego_prompts): | |
| if i < crossover_count: | |
| source = 'llego_crossover' | |
| else: | |
| source = 'llego_mutation' | |
| all_candidates.append({ | |
| 'prompt': prompt, | |
| 'source': source, | |
| 'index': i + 1 | |
| }) | |
| # π₯ CAPTURE CANDIDATE FOR LIVE UI DISPLAY | |
| try: | |
| import sys | |
| if 'app' in sys.modules: | |
| app_module = sys.modules['app'] | |
| if hasattr(app_module, 'add_candidate_to_store'): | |
| app_module.add_candidate_to_store({ | |
| 'prompt': prompt, | |
| 'source': source, | |
| 'timestamp': f"Candidate #{i+1}" | |
| }) | |
| except Exception: | |
| pass # Silent fail - UI capture is optional | |
| border_char = "β" if source == 'llego_crossover' else "β" | |
| self.logger.info(f"\n{border_char*80}") | |
| self.logger.info(f"{border_char} {'π LLEGO CROSSOVER' if source == 'llego_crossover' else 'π² LLEGO MUTATION'} candidate #{i+1}") | |
| self.logger.info(f"{border_char*80}") | |
| self.logger.info(f"{prompt}") | |
| self.logger.info(f"{border_char*80}") | |
| self.logger.info(f" Length: {len(prompt)} chars, Words: {len(prompt.split())}") | |
| self.logger.info(f"β LLEGO Genetic Operators: {len(llego_prompts)} candidates generated") | |
| except Exception as e: | |
| self.logger.error(f"β Error generating LLEGO candidates: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SUMMARY | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| self.logger.info(f"\n{'='*80}") | |
| self.logger.info(f"π ADAPTER-LEVEL HYBRID GENERATION SUMMARY") | |
| self.logger.info(f"{'='*80}") | |
| self.logger.info(f" π GEPA Reflection: {gepa_count} candidates") | |
| self.logger.info(f" π LLEGO Crossover: {len([c for c in all_candidates if c['source'] == 'llego_crossover'])} candidates") | |
| self.logger.info(f" π² LLEGO Mutation: {len([c for c in all_candidates if c['source'] == 'llego_mutation'])} candidates") | |
| self.logger.info(f" π¦ TOTAL: {len(all_candidates)} diverse candidates") | |
| self.logger.info(f"{'='*80}\n") | |
| # Store candidates (GEPA might access them through some mechanism) | |
| self._generated_candidates = all_candidates | |
| # Log each candidate with FULL text | |
| self.logger.info(f"\n{'='*80}") | |
| self.logger.info(f"π ALL GENERATED CANDIDATES (FULL PROMPTS - NO TRUNCATION)") | |
| self.logger.info(f"{'='*80}") | |
| for i, cand in enumerate(all_candidates, 1): | |
| source_emoji = "π" if cand['source'] == 'gepa_reflection' else "π" if cand['source'] == 'llego_crossover' else "π²" | |
| self.logger.info(f"\n{source_emoji} CANDIDATE #{i} - {cand['source'].upper().replace('_', ' ')}") | |
| self.logger.info(f"{cand['prompt']}") | |
| self.logger.info(f" Length: {len(cand['prompt'])} characters") | |
| self.logger.info(f" Words: {len(cand['prompt'].split())} words") | |
| self.logger.info(f"{'='*80}\n") | |
| # Return candidates as list of dicts with metadata (not just strings) | |
| # This ensures source information is preserved | |
| return all_candidates # Return full dicts with source info | |
| except Exception as e: | |
| self.logger.error(f"\n{'β'*80}") | |
| self.logger.error(f"β CRITICAL ERROR in _generate_hybrid_candidates_adapter_level!") | |
| self.logger.error(f"β Error: {str(e)}") | |
| self.logger.error(f"{'β'*80}\n") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| return [] | |
| def propose_new_texts( | |
| self, | |
| candidate: Dict[str, str], | |
| reflective_dataset: Dict[str, List[Dict[str, Any]]], | |
| components_to_update: List[str] | |
| ) -> Dict[str, str]: | |
| """ | |
| π₯ CRITICAL: This method is called by GEPA to propose new component texts. | |
| This is the KEY integration point - GEPA checks if adapter.propose_new_texts exists, | |
| and if it does, uses it instead of the default InstructionProposalSignature. | |
| This method: | |
| 1. Uses reflective_dataset to generate improved prompts | |
| 2. Optionally uses LLEGO for additional diversity | |
| 3. Returns dict mapping component_name -> new component text | |
| Args: | |
| candidate: Current candidate dict (component_name -> component_text) | |
| reflective_dataset: Feedback data per component (from make_reflective_dataset) | |
| components_to_update: List of component names to update | |
| Returns: | |
| Dict mapping component_name -> new component text | |
| """ | |
| self.logger.info(f"\n{'='*80}") | |
| self.logger.info(f"π― PROPOSE_NEW_TEXTS CALLED BY GEPA") | |
| self.logger.info(f"{'='*80}") | |
| self.logger.info(f" Components to update: {components_to_update}") | |
| self.logger.info(f" Reflective dataset keys: {list(reflective_dataset.keys())}") | |
| # π₯ FIX: Check if we already generated candidates in hybrid mode | |
| # If yes, return one of them instead of generating a new one (avoids duplicate work and context overflow) | |
| if hasattr(self, '_generated_candidates') and self._generated_candidates: | |
| self.logger.info(f"\nβ HYBRID MODE: Using pre-generated candidates from make_reflective_dataset") | |
| self.logger.info(f" Available candidates: {len(self._generated_candidates)}") | |
| self.logger.info(f" Returning first candidate (GEPA will evaluate all of them)") | |
| # Return the first candidate (GEPA will get others via queue) | |
| first_candidate = self._generated_candidates[0] | |
| new_texts = {} | |
| for component in components_to_update: | |
| if isinstance(first_candidate, dict) and 'prompt' in first_candidate: | |
| new_texts[component] = first_candidate['prompt'] | |
| source = first_candidate.get('source', 'unknown') | |
| self.logger.info(f" Returning {source} candidate (length: {len(first_candidate['prompt'])} chars)") | |
| else: | |
| new_texts[component] = str(first_candidate) | |
| self.logger.info(f"{'='*80}\n") | |
| return new_texts | |
| new_texts = {} | |
| # Check if we have reflection_lm_client (required for proposal) | |
| if not self._reflection_lm_client: | |
| self.logger.error("β reflection_lm_client not available - cannot generate proposals") | |
| # Fallback: return current candidate (no change) | |
| for component in components_to_update: | |
| new_texts[component] = candidate.get(component, '') | |
| return new_texts | |
| # For each component to update | |
| for component_name in components_to_update: | |
| self.logger.info(f"π Proposing new text for component: {component_name}") | |
| current_text = candidate.get(component_name, '') | |
| dataset = reflective_dataset.get(component_name, []) | |
| if not dataset: | |
| self.logger.warning(f"β οΈ No feedback data for {component_name}, keeping current text") | |
| new_texts[component_name] = current_text | |
| continue | |
| self.logger.info(f" Current text length: {len(current_text)} chars") | |
| self.logger.info(f" Feedback examples: {len(dataset)}") | |
| # Generate improved prompt using reflection LM | |
| try: | |
| # π₯ FIX: Clean dataset to remove base64 images (prevents context overflow) | |
| cleaned_dataset = [] | |
| for item in dataset: | |
| cleaned_item = item.copy() | |
| # Remove or truncate base64 image data | |
| if 'image_base64' in cleaned_item: | |
| img_len = len(cleaned_item['image_base64']) | |
| cleaned_item['image_base64'] = f'[IMAGE_DATA_REMOVED_{img_len}_chars]' | |
| if 'image' in cleaned_item and isinstance(cleaned_item['image'], str) and len(cleaned_item['image']) > 1000: | |
| img_len = len(cleaned_item['image']) | |
| cleaned_item['image'] = f'[IMAGE_DATA_REMOVED_{img_len}_chars]' | |
| # Also clean any nested detailed_scores | |
| if 'detailed_scores' in cleaned_item and isinstance(cleaned_item['detailed_scores'], dict): | |
| for key in list(cleaned_item['detailed_scores'].keys()): | |
| val = cleaned_item['detailed_scores'][key] | |
| if isinstance(val, str) and len(val) > 5000: | |
| cleaned_item['detailed_scores'][key] = f'[LARGE_DATA_REMOVED_{len(val)}_chars]' | |
| cleaned_dataset.append(cleaned_item) | |
| self.logger.info(f" π Cleaned dataset: removed base64 images to prevent context overflow") | |
| # Use GEPA's default instruction proposal format | |
| from gepa.strategies.instruction_proposal import InstructionProposalSignature | |
| # Build input dict for GEPA's instruction proposal | |
| input_dict = { | |
| "current_instruction_doc": current_text, | |
| "dataset_with_feedback": cleaned_dataset # Use cleaned dataset! | |
| } | |
| # Generate prompt using GEPA's signature | |
| prompt = InstructionProposalSignature.prompt_renderer(input_dict) | |
| # Call reflection LM to generate new instruction | |
| self.logger.info(f" Generating improved prompt via reflection LM...") | |
| result = self._reflection_lm_client.generate( | |
| system_prompt="You are an expert prompt engineer. Follow the instructions in the user message to generate an improved prompt.", | |
| user_prompt=prompt, | |
| image_base64="" | |
| ) | |
| # Extract response | |
| if isinstance(result, dict): | |
| response_text = result.get("content", str(result)) | |
| else: | |
| response_text = str(result) | |
| # Extract instruction using GEPA's extractor | |
| extracted = InstructionProposalSignature.output_extractor(response_text) | |
| new_instruction = extracted.get("new_instruction", response_text.strip()) | |
| # Clean up the instruction (remove markdown, quotes, etc.) | |
| new_instruction = self._clean_extracted_prompt(new_instruction) | |
| self.logger.info(f" β Generated new text (length: {len(new_instruction)} chars)") | |
| self.logger.info(f" Preview: {new_instruction[:150]}...") | |
| new_texts[component_name] = new_instruction | |
| except Exception as e: | |
| self.logger.error(f"β Error generating proposal for {component_name}: {e}") | |
| import traceback | |
| self.logger.error(traceback.format_exc()) | |
| # Fallback: return current text | |
| new_texts[component_name] = current_text | |
| self.logger.info(f"\n{'='*80}") | |
| self.logger.info(f"β PROPOSE_NEW_TEXTS COMPLETE") | |
| self.logger.info(f" Generated {len(new_texts)} new component texts") | |
| self.logger.info(f"{'='*80}\n") | |
| return new_texts | |
| def _clean_extracted_prompt(self, prompt: str) -> str: | |
| """ | |
| Clean extracted prompt by removing markdown, quotes, and extra whitespace. | |
| Args: | |
| prompt: Raw extracted prompt text | |
| Returns: | |
| Cleaned prompt text | |
| """ | |
| if not prompt: | |
| return prompt | |
| # Remove markdown code blocks | |
| prompt = re.sub(r'```[\w]*\n?', '', prompt) | |
| prompt = re.sub(r'```', '', prompt) | |
| # Remove quotes if entire prompt is quoted | |
| prompt = prompt.strip() | |
| if (prompt.startswith('"') and prompt.endswith('"')) or \ | |
| (prompt.startswith("'") and prompt.endswith("'")): | |
| prompt = prompt[1:-1] | |
| # Remove leading/trailing whitespace | |
| prompt = prompt.strip() | |
| return prompt |