diff --git "a/src/gepa_optimizer/core/universal_adapter.py" "b/src/gepa_optimizer/core/universal_adapter.py" new file mode 100644--- /dev/null +++ "b/src/gepa_optimizer/core/universal_adapter.py" @@ -0,0 +1,2386 @@ +""" +Universal GEPA adapter for user-defined metrics and LLM clients. +""" + +from .base_adapter import BaseGepaAdapter +from ..data.converters import UniversalConverter +from typing import Any, Dict, List, Optional +import logging +import re +from gepa.core.adapter import EvaluationBatch + +logger = logging.getLogger(__name__) + +class UniversalGepaAdapter(BaseGepaAdapter): + """ + Universal GEPA adapter that works with any LLM client and evaluator. + + This adapter uses the existing UniversalConverter for data processing + and delegates LLM generation and evaluation to user-provided components. + + Features: + - Optimized multi-variation JSON generation (66% cost reduction) + - Robust parsing with multiple fallback strategies + - Automatic fallback to sequential generation if JSON parsing fails + """ + + # Fallback system prompt for sequential generation (when JSON parsing fails) + _FALLBACK_SYSTEM_PROMPT = """You are an expert prompt engineer specializing in iterative prompt optimization. + +Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate an IMPROVED version of the prompt that addresses all identified issues. + +Core Requirements: +1. OUTPUT ONLY the improved prompt text (no explanations, no analysis, no meta-commentary) +2. START directly with the prompt (e.g., "You are a mobile GUI agent..." or similar task-appropriate opening) +3. PRESERVE the core task domain and output format requirements +4. INTEGRATE improvements from feedback naturally into the prompt structure +5. MAINTAIN clarity, specificity, and actionability + +Quality Standards: +- Be specific and concrete (avoid vague instructions) +- Use clear, imperative language for task instructions +- Include edge case handling if feedback identifies confusion +- Ensure the prompt is self-contained and unambiguous + +DO NOT include: +- Analysis of what went wrong +- Explanations of your changes +- Meta-text like "Here's an improved version..." or "Based on feedback..." +- Recommendations or suggestions (those are already in the feedback) + +Output the improved prompt directly and only the prompt.""" + + def __init__(self, llm_client, evaluator, data_converter=None, llego_layer=None): + """ + Initialize universal adapter. + + Args: + llm_client: User-provided LLM client (must inherit from BaseLLMClient) + evaluator: User-provided evaluator (must inherit from BaseEvaluator) + data_converter: Optional custom data converter (uses UniversalConverter by default) + llego_layer: Optional LLEGO integration layer for genetic operations + """ + # Store LLEGO layer first + self.llego = llego_layer + + # If LLEGO is provided, wrap the LLM client + # Note: If config is passed separately, it will be handled by optimizer + if llego_layer is not None: + from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient + # Only wrap if not already wrapped (optimizer may have wrapped it with config) + if not isinstance(llm_client, LLEGOEnhancedLLMClient): + # Wrap before calling super().__init__ + # Config will be set later by optimizer if hybrid mode is enabled + llm_client = LLEGOEnhancedLLMClient(llm_client, llego_layer, config=None, verbose=True) + else: + # Already wrapped, but update config if available + if hasattr(llm_client, 'config') and llm_client.config is None: + # Config will be set by optimizer later + pass + + # Initialize parent (this sets up self.logger) + super().__init__(llm_client, evaluator) + + # Use existing UniversalConverter for data processing + self.data_converter = data_converter or UniversalConverter() + + # ๐Ÿ”ฅ NEW: Initialize optimization state tracking + self._is_baseline_evaluation = False # Flag to distinguish baseline vs optimization + self._last_candidate = None # Track last candidate to detect changes + self._gepa_iteration = 0 # Track actual GEPA iteration (not evaluation count) + + # Track candidates for logging + self._evaluation_count = 0 + + # Track current evaluation context + self._current_evaluation_type = None # 'seed', 'gepa_reflection', 'llego_crossover', 'llego_mutation' + self._current_dataset_type = None # 'dfeedback' or 'dpareto' + self._baseline_score = None # Store baseline score for comparison + + # Track candidate sources by prompt text (in case GEPA doesn't pass source field) + self._candidate_sources = {} # Maps prompt_text -> source_type + + # Track validation set size for better dataset type detection + self._valset_size = None # Will be set by optimizer + self._valset = None # Will be set by optimizer - stores actual valset for Dpareto evaluation + + # ๐Ÿ”ฅ CRITICAL: Track which candidates have been evaluated on Dpareto to avoid double evaluation + # Key: normalized prompt text, Value: (fitness_score, candidate_type, timestamp) + self._dpareto_evaluated_candidates = {} # Maps prompt -> (score, type) + + # ๐Ÿ”ฅ HYBRID MODE: Storage for generated candidates + self._generated_candidates = [] # Store hybrid mode candidates + self._candidate_generation_active = False # Track if we're generating candidates + self._config = None # Will be set by optimizer if hybrid mode enabled + self._reflection_lm_client = None # Will be set by optimizer + + # ๐Ÿ”ฅ FORMAT AWARENESS: Store detected output format for better prompts + self._detected_format = None # Will be populated from expected outputs + self._format_detection_done = False # Only detect once + + # Log initialization + model_info = llm_client.get_model_info() + if llego_layer is not None: + self.logger.info(f"๐Ÿš€ Initialized Universal adapter with {model_info}") + self.logger.info(f"๐Ÿงฌ LLEGO integration ENABLED - LLM client is wrapped for genetic operations") + else: + self.logger.info(f"๐Ÿš€ Initialized Universal adapter with {model_info}") + + def _clean_llm_output(self, output: str) -> str: + """ + ๐Ÿ”ฅ CRITICAL: Clean LLM output before evaluation. + + LLMs often wrap JSON/structured output in markdown code blocks. + This causes evaluation to fail because the evaluator sees: + "```json\n{\"key\": \"value\"}\n```" + Instead of: + "{\"key\": \"value\"}" + + This method extracts the clean content for fair comparison. + """ + if not output or not isinstance(output, str): + return output + + cleaned = output.strip() + + # Remove markdown code blocks (```json ... ``` or ``` ... ```) + code_block_match = re.search(r'```(?:json|JSON)?\s*([\s\S]*?)\s*```', cleaned) + if code_block_match: + extracted = code_block_match.group(1).strip() + # Only use extracted if it looks like valid content + if extracted and (extracted.startswith('{') or extracted.startswith('[') or len(extracted) > 10): + self.logger.debug(f"๐Ÿ“ฆ Cleaned markdown code block from LLM output") + return extracted + + # Remove leading/trailing markdown artifacts + # Handle cases like "Here is the JSON:\n```json\n...\n```" + if '```' in cleaned: + # Try to extract content between first ``` and last ``` + parts = cleaned.split('```') + if len(parts) >= 3: + # Content is in the middle part(s) + middle_content = parts[1] + # Remove language tag if present (e.g., "json\n") + middle_content = re.sub(r'^(?:json|JSON|python|text)\s*\n?', '', middle_content).strip() + if middle_content: + return middle_content + + return cleaned + + def _detect_and_cache_format(self, batch: List[Dict[str, Any]]) -> None: + """ + Detect output format from expected outputs and cache for future use. + + This enables format-aware prompting and feedback generation. + """ + try: + from ..utils.format_detection import detect_output_format + + # Extract expected outputs from batch + expected_outputs = [] + for item in batch: + # Try to extract output directly, or standardize if needed + output = None + if isinstance(item, dict): + # Try common output field names first + output = item.get('output') or item.get('expected_output') or item.get('result') or item.get('answer') + if not output: + # Standardize using converter's private method (same as _evaluate_batch_mode) + try: + standardized = self.data_converter._standardize([item])[0] + output = standardized.get('output') + except Exception: + pass + + if output and isinstance(output, str) and output.strip(): + expected_outputs.append(output) + + if expected_outputs: + self._detected_format = detect_output_format(expected_outputs) + self.logger.info(f"๐Ÿ“ FORMAT DETECTED: {self._detected_format['format_type']}") + self.logger.info(f" Spec: {self._detected_format['format_spec'][:100]}...") + self.logger.info(f" Avg length: {self._detected_format['avg_length']} chars") + # #region agent log + import json as _json_debug + import time as _time_debug + import os as _os_debug + _debug_log_path = "/Users/suhas/Desktop/Projects/Prompt-Optimizer/.cursor/debug.log" + _os_debug.makedirs(_os_debug.path.dirname(_debug_log_path), exist_ok=True) + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "FORMAT_DETECT", "location": "universal_adapter.py:format_detected", "message": "Format detection successful", "data": {"format_type": self._detected_format['format_type'], "num_outputs": len(expected_outputs), "avg_length": self._detected_format['avg_length'], "has_constraint": bool(self._detected_format.get('format_constraint'))}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + # #endregion + else: + self.logger.warning("โš ๏ธ No expected outputs found for format detection") + self._detected_format = None + # #region agent log + import json as _json_debug + import time as _time_debug + import os as _os_debug + _debug_log_path = "/Users/suhas/Desktop/Projects/Prompt-Optimizer/.cursor/debug.log" + _os_debug.makedirs(_os_debug.path.dirname(_debug_log_path), exist_ok=True) + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "FORMAT_DETECT", "location": "universal_adapter.py:format_detected", "message": "Format detection failed - no outputs", "data": {"batch_size": len(batch)}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + # #endregion + + except Exception as e: + self.logger.warning(f"โš ๏ธ Format detection failed: {e}") + self._detected_format = None + + def evaluate(self, batch: List[Dict[str, Any]], candidate: Dict[str, str], + capture_traces: bool = False) -> EvaluationBatch: + """ + Evaluate candidates using user-provided LLM client and evaluator. + + This method automatically detects BatchLLMClient and uses batch processing + for cost savings, or falls back to standard individual processing. + + This method works with any data type supported by UniversalConverter. + + ๐Ÿ”ฅ IMPORTANT: We only optimize system_prompt, NOT user_prompt. + The user_prompt varies per tester and is not part of optimization. + + ๐Ÿ”ฅ CACHING: Seed prompt is evaluated ONLY ONCE on Dpareto (validation set). + Subsequent evaluations return cached result to save API calls and ensure consistency. + """ + system_prompt = candidate.get('system_prompt', '') + + # ๐Ÿ”ฅ FORMAT DETECTION: Detect output format from expected outputs (once) + if not self._format_detection_done and batch: + self._detect_and_cache_format(batch) + self._format_detection_done = True + + # Determine dataset type first (needed for cache check) + batch_size_threshold = self._config.batch_size if hasattr(self, '_config') and self._config else 8 + + # ๐Ÿ”ฅ CRITICAL FIX: If _is_baseline_evaluation is True, we KNOW this is the validation set + # This fixes the issue where valset_size might not be set yet when baseline detection happens + if hasattr(self, '_is_baseline_evaluation') and self._is_baseline_evaluation: + dataset_type = 'dpareto' # Baseline is ALWAYS evaluated on validation set + self.logger.debug(f"๐ŸŽฏ Forced dataset_type to 'dpareto' (baseline evaluation flag is True)") + elif hasattr(self, '_valset_size') and self._valset_size is not None and len(batch) >= self._valset_size: + dataset_type = 'dpareto' # Full validation set size = Dpareto + elif len(batch) > batch_size_threshold * 1.5: + dataset_type = 'dpareto' # Much larger than batch = likely full valset + else: + dataset_type = 'dfeedback' # Small batch = training minibatch for reflection + + # ๐Ÿ”ฅ CRITICAL: Check cache to avoid re-evaluating same prompt on Dpareto + # This ensures seed prompt is evaluated ONLY ONCE + if dataset_type == 'dpareto': + normalized_prompt = system_prompt.strip().strip('"\'') + if normalized_prompt in self._dpareto_evaluated_candidates: + existing_score, existing_type, _ = self._dpareto_evaluated_candidates[normalized_prompt] + self.logger.info( + f"โ™ป๏ธ CACHE HIT: Prompt already evaluated on Dpareto " + f"(score={existing_score:.4f}, type={existing_type}) - skipping re-evaluation" + ) + + # Return cached result - create EvaluationBatch with cached score + cached_outputs = [f"[CACHED: {existing_type}]"] * len(batch) + cached_scores = [existing_score] * len(batch) + + # Still update baseline if this is seed and baseline not set + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + + if existing_type == 'seed' and self._baseline_score is None: + self._baseline_score = existing_score + pareto_log.set_baseline(existing_score) + self.logger.info(f"๐Ÿ“Š Baseline score set from cache: {existing_score:.4f}") + + # Log to Pareto logger (for tracking, but no re-evaluation) + pareto_log.log_candidate_evaluation( + prompt=system_prompt, + score=existing_score, + candidate_type=existing_type, + dataset_type='dpareto' + ) + + return EvaluationBatch( + outputs=cached_outputs, + scores=cached_scores, + trajectories=None # No traces for cached results + ) + + # Determine candidate type + # Priority order: + # 1. Check candidate dict for 'source' field (from LLM wrapper) + # 2. Check _candidate_sources mapping (from previous evaluations) + # 3. Check _current_evaluation_type (from log_proposed_candidate) + # 4. Infer from context (seed, repeat, etc.) + + candidate_type = candidate.get('source') # First try candidate dict + if not candidate_type or candidate_type == 'unknown': + candidate_type = self._candidate_sources.get(system_prompt) # Check mapping + if not candidate_type or candidate_type == 'unknown': + candidate_type = self._current_evaluation_type # Use stored type + if not candidate_type or candidate_type == 'unknown': + # Try to infer from prompt or metadata + if system_prompt == self._last_candidate: + candidate_type = 'repeat' # Same prompt being re-evaluated + elif self._evaluation_count == 0 or 'seed' in str(candidate.get('source', '')).lower(): + candidate_type = 'seed' # Explicitly mark as seed + self.logger.debug("๐ŸŒฑ Detected seed prompt (Sโ‚€)") + else: + candidate_type = 'unknown' # Truly unknown + + # #region agent log + import json as _json_debug + import time as _time_debug + _debug_log_path = "/Users/suhas/Desktop/Projects/Prompt-Optimizer/.cursor/debug.log" + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "C", "location": "universal_adapter.py:candidate_type_detect", "message": "Candidate type detection", "data": {"candidate_type": candidate_type, "evaluation_count": self._evaluation_count, "from_candidate_dict": candidate.get('source'), "from_sources_mapping": self._candidate_sources.get(system_prompt), "from_current_type": self._current_evaluation_type}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + # #endregion + + # Store source for future lookups (always update if we found a valid type) + if candidate_type and candidate_type != 'unknown' and system_prompt not in self._candidate_sources: + self._candidate_sources[system_prompt] = candidate_type + self.logger.debug(f" ๐Ÿ“ Stored candidate type: {candidate_type} for prompt (length: {len(system_prompt)})") + + # Dataset type already determined above for cache check - reuse it + + # #region agent log + try: + import json as _json_debug + import time as _time_debug + import os as _os_debug + _debug_log_path = "/Users/suhas/Desktop/Projects/Prompt-Optimizer/.cursor/debug.log" + _os_debug.makedirs(_os_debug.path.dirname(_debug_log_path), exist_ok=True) + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "H", "location": "universal_adapter.py:dataset_type_detect", "message": "Dataset type detection", "data": {"batch_size": len(batch), "valset_size": getattr(self, '_valset_size', None), "batch_size_threshold": batch_size_threshold, "detected_type": dataset_type, "evaluation_count": self._evaluation_count}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + except Exception: + pass + # #endregion + + # Check if this is a new candidate (different from last one) + if self._last_candidate != system_prompt: + self._evaluation_count += 1 + # ๐Ÿ”ฅ CRITICAL: If this is baseline evaluation, force candidate_type to 'seed' + if self._is_baseline_evaluation: + candidate_type = 'seed' + self.logger.debug(f"๐ŸŒฑ Baseline evaluation detected - setting candidate_type to 'seed'") + self._current_evaluation_type = candidate_type + self._current_dataset_type = dataset_type + self._last_candidate = system_prompt + + # Minimal logging - just track what we're evaluating + if self._is_baseline_evaluation: + self.logger.debug(f"Evaluating baseline (Sโ‚€) on {dataset_type}") + else: + self.logger.debug(f"Evaluating candidate #{self._evaluation_count} ({candidate_type}) on {dataset_type}") + + # Detect and use batch mode if available + from ..llms.batch_llm import BatchLLMClient + is_batch_mode = isinstance(self.llm_client, BatchLLMClient) + + if is_batch_mode: + outputs, scores, trajectories = self._evaluate_batch_mode( + batch, system_prompt, capture_traces + ) + else: + outputs, scores, trajectories = self._evaluate_standard_mode( + batch, system_prompt, capture_traces + ) + + avg_score = sum(scores) / len(scores) if scores else 0.0 + + # #region agent log + import json as _json_debug + import time as _time_debug + _debug_log_path = "/Users/suhas/Desktop/Projects/Prompt-Optimizer/.cursor/debug.log" + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "B,C", "location": "universal_adapter.py:baseline_check", "message": "Baseline check conditions", "data": {"baseline_score_is_none": self._baseline_score is None, "current_dataset_type": self._current_dataset_type, "current_evaluation_type": self._current_evaluation_type, "is_baseline_evaluation": self._is_baseline_evaluation, "batch_size": len(batch), "avg_score": avg_score}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + # #endregion + + # ๐Ÿ”ฅ CRITICAL FIX: Baseline MUST be set from seed's first Dpareto evaluation ONLY + # This ensures FAIR comparison: seed and candidates evaluated on SAME dataset (Dpareto) with SAME number of datapoints + # + # Fair evaluation requires: + # - Seed baseline: Dpareto (validation set) - first evaluation during optimization + # - Candidates: Dpareto (validation set) - same dataset, same size + # - Same conditions = fair comparison โœ… + # + # We IGNORE test set for baseline - baseline must come from Dpareto to ensure same dataset/size + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + + # ๐Ÿ”ฅ FIX: Check if this is baseline evaluation AND dpareto - set baseline with priority + is_baseline_eval = hasattr(self, '_is_baseline_evaluation') and self._is_baseline_evaluation + + if self._baseline_score is None: + # ๐Ÿ”ฅ FIX B: Set baseline on FIRST Dpareto evaluation, regardless of candidate type + # Also set baseline if this is explicitly marked as baseline evaluation + if self._current_dataset_type == 'dpareto' or is_baseline_eval: + # โœ… PRIMARY: Set baseline from FIRST Dpareto evaluation (seed or first candidate) + self._baseline_score = avg_score + pareto_log.set_baseline(avg_score) + self.logger.info(f"๐Ÿ“Š Baseline score (Dpareto, {len(batch)} samples): {avg_score:.4f}") + self.logger.info(f" โœ… Baseline set from {'baseline evaluation' if is_baseline_eval else 'first Dpareto'} (type: {self._current_evaluation_type})") + # #region agent log + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "B", "location": "universal_adapter.py:baseline_set", "message": "Baseline score SET", "data": {"baseline_score": avg_score, "candidate_type": self._current_evaluation_type, "dataset_type": self._current_dataset_type, "is_baseline_eval": is_baseline_eval}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + # #endregion + # Note: Test set evaluations are ignored for baseline - baseline comes from Dpareto + else: + # ๐Ÿ”ฅ SAFETY CHECK: Ensure Pareto logger also has baseline if adapter has it + # This handles the case where optimizer set baseline in adapter but Pareto logger wasn't updated + if (self._current_dataset_type == 'dpareto' or is_baseline_eval) and pareto_log.baseline_score is None: + pareto_log.set_baseline(self._baseline_score) + self.logger.info(f"โœ… Synchronized baseline in Pareto logger: {self._baseline_score:.4f}") + + # Track Dpareto evaluations for Pareto front + if self._current_dataset_type == 'dpareto': + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + pareto_log.log_candidate_evaluation( + prompt=system_prompt, + score=avg_score, + candidate_type=self._current_evaluation_type or 'unknown', + dataset_type=self._current_dataset_type + ) + + # Track evaluated candidates + normalized_prompt = system_prompt.strip().strip('"\'') + if normalized_prompt not in self._dpareto_evaluated_candidates: + self._dpareto_evaluated_candidates[normalized_prompt] = ( + avg_score, self._current_evaluation_type or 'unknown', 'evaluated_by_gepa' + ) + + self.logger.debug(f"Evaluation complete: score={avg_score:.4f}") + + # ๐Ÿ”ฅ CRITICAL: Update _best_candidate and _best_score with average fitness for Dpareto evaluations + # This ensures the adapter tracks the best average fitness, not just per-sample scores + # Only update if this score is better than current best + if self._current_dataset_type == 'dpareto': + if self._best_score is None or avg_score > self._best_score: + self._best_score = avg_score + self._best_candidate = { + 'system_prompt': system_prompt, + 'fitness': avg_score, + 'source': self._current_evaluation_type or 'unknown' + } + self.logger.info(f"โœ… Updated best candidate from Dpareto evaluation: f={avg_score:.4f} (type: {self._current_evaluation_type})") + + return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajectories) + + def _evaluate_batch_mode( + self, + batch: List[Dict], + system_prompt: str, + capture_traces: bool + ) -> tuple: + """ + Batch mode evaluation - process all samples in one API call. + + This method prepares all requests, submits them as a batch job to Gemini, + waits for completion, then evaluates all results. + """ + # Prepare all requests + requests = [] + standardized_items = [] + + for item in batch: + standardized_item = self.data_converter._standardize([item])[0] + standardized_items.append(standardized_item) + + request = { + 'system_prompt': system_prompt, + 'user_prompt': standardized_item['input'] + } + + if standardized_item.get('image'): + request['image_base64'] = standardized_item['image'] + + requests.append(request) + + # Submit batch job and get all results at once + batch_results = self.llm_client.generate_batch(requests) + + # Process results + outputs = [] + scores = [] + trajectories = [] if capture_traces else None + + for i, (llm_response, standardized_item) in enumerate(zip(batch_results, standardized_items)): + # Extract content + raw_output = llm_response.get("content", "") + + # ๐Ÿ”ฅ CRITICAL: Clean markdown wrappers before evaluation + predicted_output = self._clean_llm_output(raw_output) + outputs.append(predicted_output) + + # Evaluate with cleaned output + evaluation_results = self.evaluator.evaluate( + predicted_output, + standardized_item['output'] + ) + + composite_score = evaluation_results.get("composite_score", 0.0) + scores.append(composite_score) + + # Update tracking + if composite_score > self._best_score: + self._best_score = composite_score + self._best_candidate = {'system_prompt': system_prompt} + + # Capture traces + if capture_traces: + trajectories.append({ + 'input_data': standardized_item, + 'predicted_output': predicted_output, + 'evaluation_results': evaluation_results + }) + + # Concise logging with element IDs and candidate notation + predicted_element = evaluation_results.get('predicted_element', '?') + expected_element = evaluation_results.get('expected_element', '?') + status = "โœ…" if composite_score == 1.0 else "โŒ" + + # Add notation for candidate type + notation_map = {'seed': 'Sโ‚€', 'gepa_reflection': 'Sแตฃ', 'llego_crossover': 'Oโ‚“โ‚’', 'llego_mutation': 'Oโ‚˜แตคโ‚œ'} + notation = notation_map.get(self._current_evaluation_type, 'S') + + self.logger.info(f" [{notation}] Sample {i+1}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status}") + + return outputs, scores, trajectories + + def _evaluate_standard_mode( + self, + batch: List[Dict], + system_prompt: str, + capture_traces: bool + ) -> tuple: + """ + Standard mode evaluation - process samples individually (existing logic). + + This is the original implementation, preserved for backward compatibility + and for use with non-batch LLM clients. + """ + outputs = [] + scores = [] + trajectories = [] if capture_traces else None + + for i, item in enumerate(batch): + # Use existing data processing logic + standardized_item = self.data_converter._standardize([item])[0] + + # Prepare generation parameters + generation_params = { + 'system_prompt': system_prompt, + 'user_prompt': standardized_item['input'] + } + + # Add image if present + if standardized_item.get('image'): + generation_params['image_base64'] = standardized_item['image'] + + # Generate response using user's LLM client + llm_response = self.llm_client.generate(**generation_params) + + # Extract content + if isinstance(llm_response, dict): + raw_output = llm_response.get("content", "") + else: + raw_output = str(llm_response) + + # ๐Ÿ”ฅ CRITICAL: Clean markdown wrappers before evaluation + predicted_output = self._clean_llm_output(raw_output) + outputs.append(predicted_output) + + # Evaluate using user's evaluator with cleaned output + evaluation_results = self.evaluator.evaluate( + predicted_output, + standardized_item['output'] + ) + + composite_score = evaluation_results.get("composite_score", 0.0) + scores.append(composite_score) + + # #region agent log + try: + import json as _json_debug + import time as _time_debug + import os as _os_debug + _debug_log_path = "/Users/suhas/Desktop/Projects/Prompt-Optimizer/.cursor/debug.log" + _os_debug.makedirs(_os_debug.path.dirname(_debug_log_path), exist_ok=True) + with open(_debug_log_path, "a") as _f: + _f.write(_json_debug.dumps({"hypothesisId": "G", "location": "universal_adapter.py:evaluation_result", "message": "Individual evaluation result", "data": {"sample_idx": i, "composite_score": composite_score, "semantic_sim": evaluation_results.get("semantic_similarity", -1), "structural_sim": evaluation_results.get("structural_similarity", -1), "format_mismatch": evaluation_results.get("analysis", {}).get("format_mismatch", False), "predicted_len": len(predicted_output) if predicted_output else 0, "expected_len": len(standardized_item.get('output', ''))}, "timestamp": int(_time_debug.time() * 1000), "sessionId": "debug-session"}) + "\n") + except Exception: + pass + # #endregion + + # Update performance tracking + self._evaluation_count += 1 + if composite_score > self._best_score: + self._best_score = composite_score + self._best_candidate = {'system_prompt': system_prompt} + + # Capture traces if requested + if capture_traces: + trajectories.append({ + 'input_data': standardized_item, + 'predicted_output': predicted_output, + 'evaluation_results': evaluation_results + }) + + # Concise logging with element IDs and candidate notation + predicted_element = evaluation_results.get('predicted_element', '?') + expected_element = evaluation_results.get('expected_element', '?') + status = "โœ…" if composite_score == 1.0 else "โŒ" + + # Add notation for candidate type + notation_map = {'seed': 'Sโ‚€', 'gepa_reflection': 'Sแตฃ', 'llego_crossover': 'Oโ‚“โ‚’', 'llego_mutation': 'Oโ‚˜แตคโ‚œ'} + notation = notation_map.get(self._current_evaluation_type, 'S') + + self.logger.info(f" [{notation}] Sample {i+1}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status}") + + return outputs, scores, trajectories + + def make_reflective_dataset(self, candidate: Dict[str, str], eval_batch: EvaluationBatch, + components_to_update: List[str]) -> Dict[str, List[Dict[str, Any]]]: + """ + Create reflective dataset using user-provided evaluator. + + This method generates feedback based on the evaluation results + from the user's custom evaluator. + + ๐Ÿ”ฅ NEW: If hybrid mode is enabled, this method ALSO generates hybrid candidates + (GEPA Reflection + LLEGO Operators) and stores them for GEPA to use. + """ + # ๐Ÿ”ฅ REMOVED: Excessive diagnostic logs - moved to DEBUG level if needed + self.logger.debug(f"make_reflective_dataset() called - generating feedback and hybrid candidates") + + reflective_dataset = {} + system_prompt = candidate.get('system_prompt', '') + + # ๐Ÿ”ฅ REMOVED: Verbose diagnostic checks - only log if hybrid mode is actually enabled + hybrid_mode_enabled = (self._config and + hasattr(self._config, 'enable_gepa_reflection_with_llego') and + self._config.enable_gepa_reflection_with_llego and + self._reflection_lm_client) + + if hybrid_mode_enabled: + self.logger.debug(f"โœ… Hybrid mode conditions met - will generate hybrid candidates") + + # ======================================================================== + # ๐Ÿ”ฅ CRITICAL FIX: Update LLEGO population with evaluated candidate + # ======================================================================== + # This is the MISSING LINK! After a candidate is evaluated, we need to add it + # to the LLEGO population so it can be used for crossover/mutation. + # Without this, the population only contains the seed, so Pareto front stays at 1! + # + # This is called for EVERY candidate that GEPA evaluates: + # - Seed prompt (baseline) โ†’ added to population + # - New candidate 1 (from reflection/crossover/mutation) โ†’ added to population + # - New candidate 2 โ†’ added to population + # - etc. + if self.llego: + # Calculate average fitness from evaluation scores + if eval_batch.scores and len(eval_batch.scores) > 0: + avg_fitness = sum(eval_batch.scores) / len(eval_batch.scores) + else: + # Fallback: extract from trajectories if scores not available + scores = [t.get('evaluation_results', {}).get('composite_score', 0.0) + for t in eval_batch.trajectories if 'evaluation_results' in t] + avg_fitness = sum(scores) / len(scores) if scores else 0.0 + + self.logger.debug(f"Updating LLEGO population: fitness={avg_fitness:.4f}") + + # Create PromptCandidate from evaluated prompt + from ..operators.llego_operators import PromptCandidate + + # Check if this candidate already exists in population (avoid duplicates) + # ๐Ÿ”ฅ FIX: Normalize prompts for comparison (strip whitespace, remove quotes) + normalized_new_prompt = system_prompt.strip().strip('"\'') + existing_prompts = {p.prompt.strip().strip('"\'') for p in self.llego.population} + + # Also check normalized versions + if normalized_new_prompt not in existing_prompts: + prompt_candidate = PromptCandidate( + prompt=system_prompt, # Keep original prompt (not normalized) + fitness=avg_fitness, + metadata={ + 'generation': self.llego.current_generation, + 'operator': 'evaluated', + 'prompt_length': len(system_prompt), + 'word_count': len(system_prompt.split()), + 'evaluation_samples': len(eval_batch.scores) if eval_batch.scores else 0, + 'candidate_type': self._current_evaluation_type or 'unknown', # Store type for notation + 'dataset_evaluated': self._current_dataset_type or 'unknown' + } + ) + + # Update population - this will add the candidate and keep top N by fitness + population_before = len(self.llego.population) + self.llego.update_population([prompt_candidate]) + population_after = len(self.llego.population) + + self.logger.debug(f"Added to LLEGO population: fitness={avg_fitness:.4f}, size={population_after}") + else: + # Update fitness if candidate already exists (seed prompt, etc.) + # ๐Ÿ”ฅ FIX: Also normalize for comparison + updated = False + for p in self.llego.population: + normalized_existing = p.prompt.strip().strip('"\'') + if normalized_existing == normalized_new_prompt: + old_fitness = p.fitness + if avg_fitness > p.fitness: + p.fitness = avg_fitness + updated = True + self.logger.debug(f"Updated fitness: {old_fitness:.4f} โ†’ {avg_fitness:.4f}") + # Update candidate type if we have new information + if self._current_evaluation_type and p.metadata: + old_type = p.metadata.get('candidate_type', 'unknown') + if self._current_evaluation_type != old_type: + p.metadata['candidate_type'] = self._current_evaluation_type + else: + self.logger.debug(f"โ„น๏ธ Candidate already exists with better/equal fitness: {p.fitness:.4f} >= {avg_fitness:.4f}") + break + + if not updated: + self.logger.debug(f"Candidate already in population with higher fitness") + else: + self.logger.debug("LLEGO not initialized - skipping population update") + + # ======================================================================== + # ๐Ÿ”ฅ HYBRID MODE: Generate candidates at adapter level + # ======================================================================== + if (self._config and + hasattr(self._config, 'enable_gepa_reflection_with_llego') and + self._config.enable_gepa_reflection_with_llego and + self._reflection_lm_client): + + self.logger.debug("Generating hybrid candidates") + + # Generate hybrid candidates FIRST + generated_candidates = self._generate_hybrid_candidates_adapter_level( + current_prompt=system_prompt, + eval_batch=eval_batch, + candidate=candidate + ) + + # ๐Ÿ”ฅ CRITICAL: Store generated candidates so we can inject them + # _generate_hybrid_candidates_adapter_level now returns list of dicts with metadata + if generated_candidates: + candidate_dicts = [] + for cand in generated_candidates: + if isinstance(cand, dict) and 'prompt' in cand: + # Already a dict with metadata (preferred format) + candidate_dicts.append(cand) + elif isinstance(cand, str): + # Just a string - determine source based on position (fallback) + # This shouldn't happen if _generate_hybrid_candidates_adapter_level is fixed + self.logger.warning(f"โš ๏ธ Received string candidate instead of dict - using fallback logic") + if len(candidate_dicts) < self._config.num_gepa_reflection_candidates: + source = 'gepa_reflection' + elif len(candidate_dicts) < self._config.num_gepa_reflection_candidates + self._config.n_crossover: + source = 'llego_crossover' + else: + source = 'llego_mutation' + candidate_dicts.append({ + 'prompt': cand, + 'source': source, + 'index': len(candidate_dicts) + 1 + }) + else: + self.logger.warning(f"โš ๏ธ Unknown candidate format: {type(cand)}") + + self._generated_candidates = candidate_dicts + + # Store candidate sources for tracking + for cand_dict in candidate_dicts: + if 'prompt' in cand_dict and 'source' in cand_dict: + self._candidate_sources[cand_dict['prompt']] = cand_dict['source'] + + # ๐Ÿ”ฅ CRITICAL: Inject into LLM client wrapper so it can return them when GEPA calls + # This is the key mechanism: when GEPA calls adapter.llm_client.generate() for proposals, + # our wrapper will detect it and return our pre-generated candidates + if hasattr(self.llm_client, '_adapter_generated_candidates'): + self.llm_client._adapter_generated_candidates = candidate_dicts.copy() + self.logger.debug(f"Injected {len(candidate_dicts)} candidates") + else: + try: + self.llm_client._adapter_generated_candidates = candidate_dicts.copy() + except Exception as e: + self.logger.error(f"Failed to inject candidates: {e}") + + # Evaluate generated candidates on Dpareto for fair comparison + if hasattr(self, '_evaluating_generated_candidates'): + pass # Skip to prevent recursion + elif self._valset and len(self._valset) > 0: + self._evaluating_generated_candidates = True + self.logger.debug(f"Evaluating {len(candidate_dicts)} candidates on Dpareto ({len(self._valset)} samples)") + + # ๐Ÿ”ฅ NEW: Collect all candidates with scores for batch update + candidates_with_scores = [] + + for i, cand_dict in enumerate(candidate_dicts, 1): + cand_prompt = cand_dict.get('prompt', '') + cand_source = cand_dict.get('source', 'unknown') + + if not cand_prompt: + continue + + # Normalize prompt for duplicate detection + normalized_prompt = cand_prompt.strip().strip('"\'') + + # Check if already evaluated on Dpareto (avoid double evaluation) + if normalized_prompt in self._dpareto_evaluated_candidates: + existing_score, existing_type, _ = self._dpareto_evaluated_candidates[normalized_prompt] + + # Still add to batch for Pareto update (with existing score) + notation_map = { + 'seed': 'Sโ‚€', + 'gepa_reflection': 'Sแตฃ', + 'llego_crossover': 'Oโ‚“โ‚’', + 'llego_mutation': 'Oโ‚˜แตคโ‚œ' + } + cand_notation = notation_map.get(cand_source, 'S') + candidates_with_scores.append({ + 'prompt': cand_prompt, + 'score': existing_score, + 'type': cand_source, + 'notation': cand_notation + }) + continue + + # Evaluate this candidate on valset (Dpareto) + try: + # Set candidate type for proper logging + self._current_evaluation_type = cand_source + + # ๐Ÿ”ฅ CRITICAL: Temporarily disable individual Pareto updates + # We'll do batch update after all evaluations + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + original_log_method = pareto_log.log_candidate_evaluation + + # Temporarily replace to prevent individual updates + def noop_log(*args, **kwargs): + pass # Skip individual logging - we'll batch update later + + pareto_log.log_candidate_evaluation = noop_log + + # Evaluate on valset - THIS IS THE FAIR EVALUATION ON SAME DATASET + valset_eval = self.evaluate( + batch=self._valset, # Same valset as seed! + candidate={'system_prompt': cand_prompt, 'source': cand_source}, + capture_traces=True + ) + + # Restore original method + pareto_log.log_candidate_evaluation = original_log_method + + avg_score = sum(valset_eval.scores) / len(valset_eval.scores) if valset_eval.scores else 0.0 + + # Store evaluation result to avoid double evaluation + self._dpareto_evaluated_candidates[normalized_prompt] = ( + avg_score, + cand_source, + 'evaluated_in_make_reflective_dataset' + ) + + self.logger.debug(f"Candidate {i} evaluated: score={avg_score:.4f}") + + # Generate notation + notation_map = { + 'seed': 'Sโ‚€', + 'gepa_reflection': 'Sแตฃ', + 'llego_crossover': 'Oโ‚“โ‚’', + 'llego_mutation': 'Oโ‚˜แตคโ‚œ' + } + cand_notation = notation_map.get(cand_source, 'S') + + # Add to batch for Pareto update + candidates_with_scores.append({ + 'prompt': cand_prompt, + 'score': avg_score, + 'type': cand_source, + 'notation': cand_notation + }) + + # ๐Ÿ”ฅ CRITICAL: Explicitly add this candidate to LLEGO population with Dpareto fitness + if self.llego: + from ..operators.llego_operators import PromptCandidate + + # Check if already in population + existing_in_pop = False + for p in self.llego.population: + if p.prompt.strip().strip('"\'') == normalized_prompt: + # Update fitness if this Dpareto score is better + if avg_score > p.fitness: + old_fitness = p.fitness + p.fitness = avg_score + if p.metadata: + p.metadata['candidate_type'] = cand_source + p.metadata['dataset_evaluated'] = 'dpareto' + self.logger.debug(f"Updated LLEGO fitness: {old_fitness:.4f} โ†’ {avg_score:.4f}") + existing_in_pop = True + break + + if not existing_in_pop: + # Add new candidate to population + prompt_candidate = PromptCandidate( + prompt=cand_prompt, + fitness=avg_score, + metadata={ + 'generation': self.llego.current_generation, + 'operator': 'evaluated_on_dpareto', + 'prompt_length': len(cand_prompt), + 'word_count': len(cand_prompt.split()), + 'evaluation_samples': len(valset_eval.scores) if valset_eval.scores else 0, + 'candidate_type': cand_source, + 'dataset_evaluated': 'dpareto' + } + ) + self.llego.update_population([prompt_candidate]) + + except Exception as e: + self.logger.error(f" โŒ Error evaluating candidate #{i} on Dpareto: {e}") + import traceback + self.logger.error(traceback.format_exc()) + + # Batch Pareto front update + if candidates_with_scores: + + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + added_candidates = pareto_log.batch_update_pareto_front(candidates_with_scores) + + # ๐Ÿ”ฅ CRITICAL: Update queue with scores for best-candidate selection + # Create a mapping of prompt -> score for quick lookup + prompt_to_score = {c['prompt'].strip().strip('"\''): c['score'] for c in candidates_with_scores} + + # Update candidates in queue with their scores + if hasattr(self.llm_client, '_adapter_generated_candidates'): + updated_queue = [] + for cand in self.llm_client._adapter_generated_candidates: + if isinstance(cand, dict): + cand_prompt = cand.get('prompt', '') + normalized = cand_prompt.strip().strip('"\'') + if normalized in prompt_to_score: + # Update with score + cand['score'] = prompt_to_score[normalized] + updated_queue.append(cand) + else: + updated_queue.append(cand) + else: + updated_queue.append(cand) + + self.llm_client._adapter_generated_candidates = updated_queue + + self.logger.debug(f"Pareto update: {len(added_candidates)} added, front size={len(pareto_log.pareto_front)}") + + # Clear flag after evaluation complete + self._evaluating_generated_candidates = False + elif not hasattr(self, '_evaluating_generated_candidates'): + self.logger.error("Valset not available - cannot evaluate generated candidates") + + # Signal LLEGO-enhanced client for reflection mode + if self.llego and hasattr(self.llm_client, 'set_reflection_context'): + self.llm_client.set_reflection_context( + current_prompt=system_prompt, + feedback=eval_batch, + in_reflection=True + ) + + # ๐Ÿ”ฅ CRITICAL: Also set reflection context on reflection_lm_client if it exists + # This ensures hybrid mode candidate generation is triggered when GEPA calls reflection_lm_callable + if hasattr(self, 'reflection_lm_client') and self.reflection_lm_client: + if hasattr(self.reflection_lm_client, 'set_reflection_context'): + self.logger.info("๐Ÿ”ฅ CRITICAL: Setting reflection context on reflection_lm_client for hybrid mode") + self.reflection_lm_client.set_reflection_context( + current_prompt=system_prompt, + feedback=eval_batch, + in_reflection=True # This enables hybrid candidate generation! + ) + + self._log_reflection_dataset_creation(candidate, eval_batch, components_to_update) + + # Inject generated candidates into reflective dataset + suggested_prompts = [] + if hasattr(self, '_generated_candidates') and self._generated_candidates: + suggested_prompts = [c['prompt'] for c in self._generated_candidates if isinstance(c, dict) and 'prompt' in c] + self.logger.debug(f"Injecting {len(suggested_prompts)} suggested prompts") + + for component in components_to_update: + reflective_dataset[component] = [] + for trace in eval_batch.trajectories: + # Generate feedback based on evaluation results + # ๐Ÿ†• Phase 2: Pass trace and current_prompt for LLM-as-Judge + feedback = self._generate_feedback( + trace['evaluation_results'], + trace=trace, + current_prompt=system_prompt + ) + + # Base reflection data + # ๐Ÿ”ฅ FIX: Strip image_base64 from input_data to prevent massive base64 strings in logs + input_data_clean = trace['input_data'].copy() if isinstance(trace['input_data'], dict) else {} + if 'image_base64' in input_data_clean: + input_data_clean['image_base64'] = f"[IMAGE_DATA_{len(input_data_clean['image_base64'])}_chars]" + + # ๐Ÿ”ฅ FIX: Clean detailed_scores to remove any base64 references or large data + detailed_scores_clean = {} + if isinstance(trace['evaluation_results'], dict): + for key, value in trace['evaluation_results'].items(): + # Skip any values that look like base64 (very long strings) + if isinstance(value, str) and len(value) > 1000: + detailed_scores_clean[key] = f"[DATA_{len(value)}_chars]" + else: + detailed_scores_clean[key] = value + else: + detailed_scores_clean = trace['evaluation_results'] + + reflection_entry = { + "current_prompt": system_prompt, + "input_data": input_data_clean, # Use cleaned version without full base64 + "predicted_output": trace['predicted_output'], + "score": trace['evaluation_results'].get("composite_score", 0.0), + "feedback": feedback, + "detailed_scores": detailed_scores_clean # Cleaned scores without large data + } + + # ๐Ÿ”ฅ CRITICAL: Only optimize system_prompt, NOT user_prompt + # The user_prompt contains the task description (command) and should NOT be modified + if component == 'system_prompt' and suggested_prompts: + # Add suggested improved prompts to the reflection entry + # GEPA might use these if the structure supports it + reflection_entry["suggested_improved_prompts"] = suggested_prompts + reflection_entry["num_suggestions"] = len(suggested_prompts) + # Also add the best suggested prompt as a direct suggestion + if suggested_prompts: + reflection_entry["suggested_prompt"] = suggested_prompts[0] # First candidate as primary suggestion + reflection_entry["optimize_component"] = "system_prompt_only" # Mark that we only optimize system_prompt + elif component != 'system_prompt': + # For non-system_prompt components (like user_prompt), do NOT add suggestions + # We only want to optimize system_prompt + reflection_entry["optimize_component"] = "skip" # Mark to skip optimization + self.logger.info(f"โš ๏ธ Skipping optimization for component '{component}' - only optimizing system_prompt") + + reflective_dataset[component].append(reflection_entry) + + total_samples = sum(len(data) for data in reflective_dataset.values()) + avg_score = sum(trace['score'] for data in reflective_dataset.values() for trace in data) / total_samples if total_samples > 0 else 0.0 + self.logger.info(f"๐Ÿ“ Reflection dataset created - {total_samples} samples, avg score: {avg_score:.4f}") + + return reflective_dataset + + def _generate_feedback( + self, + evaluation_results: Dict[str, Any], + trace: Optional[Dict[str, Any]] = None, + current_prompt: Optional[str] = None + ) -> str: + """ + Generate feedback using hybrid approach: + - LLM-as-Judge for low/medium scores (detailed, actionable) + - Simple feedback for high scores (efficient) + + Args: + evaluation_results: Evaluation scores and extracted data + trace: Full trace with input_data, predicted_output, etc. (optional) + current_prompt: The current system prompt being optimized (optional) + + Returns: + Feedback string focused on prompt improvement + """ + composite_score = evaluation_results.get("composite_score", 0.0) + + # Check if LLM-as-Judge is enabled + use_llm_judge = getattr(self._config, 'use_llm_as_judge', True) + threshold = getattr(self._config, 'llm_as_judge_threshold', 0.8) + + # ๐Ÿ”ฅ FIX: Check both attribute names (inconsistency in codebase) + reflection_lm = getattr(self, '_reflection_lm_client', None) or getattr(self, 'reflection_lm_client', None) + + # Debug logging - use INFO so we can see what's happening + self.logger.info(f"๐Ÿ” Feedback generation: score={composite_score:.4f}, use_llm_judge={use_llm_judge}, threshold={threshold}, has_trace={trace is not None}, has_reflection_lm={reflection_lm is not None}") + if trace: + input_data = trace.get('input_data', {}) + predicted = trace.get('predicted_output', '')[:100] if trace.get('predicted_output') else 'N/A' + expected = input_data.get('output', '')[:100] if input_data.get('output') else 'N/A' + self.logger.info(f" Predicted preview: {predicted}...") + self.logger.info(f" Expected preview: {expected}...") + + # Use LLM-as-Judge for scores needing improvement + if use_llm_judge and composite_score < threshold and trace: + if not reflection_lm: + self.logger.warning("โš ๏ธ LLM-as-Judge requested but reflection_lm_client not available - using simple feedback") + self.logger.warning(f" Checked: _reflection_lm_client={getattr(self, '_reflection_lm_client', None) is not None}, reflection_lm_client={getattr(self, 'reflection_lm_client', None) is not None}") + else: + try: + self.logger.info(f"๐Ÿค– Calling LLM-as-Judge for detailed feedback (score: {composite_score:.4f} < threshold: {threshold})") + feedback = self._llm_as_judge_feedback( + evaluation_results, + trace, + current_prompt + ) + self.logger.info(f"โœ… LLM-as-Judge returned feedback (length: {len(feedback)} chars)") + return feedback + except Exception as e: + self.logger.error(f"โŒ LLM-as-Judge failed: {e}, falling back to simple feedback") + import traceback + self.logger.error(traceback.format_exc()) + # Fall through to simple feedback + + # Simple actionable feedback (for high scores or as fallback) + if composite_score >= threshold: + self.logger.debug(f"โœ… Score {composite_score:.4f} >= threshold {threshold} - using simple feedback") + elif not trace: + self.logger.debug(f"โš ๏ธ No trace provided - using simple feedback") + elif not use_llm_judge: + self.logger.debug(f"โš ๏ธ LLM-as-Judge disabled - using simple feedback") + + feedback = self._simple_actionable_feedback( + evaluation_results, + trace, + current_prompt + ) + + # ๐Ÿ”ฅ ADD FORMAT FEEDBACK: Append format-specific feedback if available + if self._detected_format and trace: + from ..utils.format_detection import generate_format_feedback + input_data = trace.get('input_data', {}) + format_feedback = generate_format_feedback( + predicted_output=trace.get('predicted_output', ''), + expected_output=input_data.get('output', ''), + format_info=self._detected_format + ) + if format_feedback: + feedback += format_feedback + + return feedback + + def _llm_as_judge_feedback( + self, + evaluation_results: Dict[str, Any], + trace: Dict[str, Any], + current_prompt: Optional[str] = None + ) -> str: + """ + Generate detailed, actionable feedback using LLM-as-Judge. + + ๐Ÿ”ฅ UNIVERSAL VERSION: Works for ANY task type (text, JSON, structured outputs). + No UI-specific assumptions. Pure semantic and structural comparison. + + Args: + evaluation_results: Evaluation scores and extracted data + trace: Full trace with input_data, predicted_output, etc. + current_prompt: The current system prompt being optimized + + Returns: + Detailed feedback string focused on prompt improvement + """ + # Import universal judge prompt builder + from ..utils.universal_judge_prompt import ( + build_universal_judge_prompt, + get_universal_judge_system_prompt, + format_universal_judge_feedback, + build_empty_output_feedback + ) + + # Extract data from trace + input_data = trace.get('input_data', {}) + predicted_output = trace.get('predicted_output', '') or '' + expected_output = input_data.get('output', '') or '' + task_input = input_data.get('input', '') or '' + + # Get image if available (for multi-modal tasks) + image_base64 = input_data.get('image', '') or input_data.get('image_base64', '') + + # Log what we're working with + self.logger.info(f"๐Ÿ” LLM-as-Judge input check:") + self.logger.info(f" predicted_output length: {len(predicted_output)} chars") + self.logger.info(f" expected_output length: {len(expected_output)} chars") + self.logger.info(f" image available: {bool(image_base64)} (length: {len(image_base64) if image_base64 else 0} chars)") + self.logger.info(f" predicted_output preview: {predicted_output[:200] if predicted_output else '[EMPTY]'}...") + self.logger.info(f" expected_output preview: {expected_output[:200] if expected_output else '[EMPTY]'}...") + + # Handle empty predicted output specially + if not predicted_output or not predicted_output.strip(): + self.logger.warning(f"โš ๏ธ Predicted output is empty - generating specialized feedback") + return build_empty_output_feedback(task_input, expected_output, current_prompt) + + if not image_base64: + self.logger.debug(f"โ„น๏ธ No image provided - text-only analysis") + + # Get the LLM for judging + judge_llm = getattr(self, '_reflection_lm_client', None) or getattr(self, 'reflection_lm_client', None) + + if not judge_llm: + self.logger.error("โŒ CRITICAL: No reflection_lm_client available for LLM-as-Judge!") + raise ValueError("reflection_lm_client not available") + + # Build the universal judge prompt + judge_prompt = build_universal_judge_prompt( + task_input=task_input, + predicted_output=predicted_output, + expected_output=expected_output, + current_prompt=current_prompt, + evaluation_results=evaluation_results, + image_base64=image_base64 + ) + + # Get the universal system prompt + system_prompt = get_universal_judge_system_prompt(has_image=bool(image_base64)) + + # Call LLM-as-Judge + try: + self.logger.info(f"๐Ÿค– Calling Universal LLM-as-Judge for semantic analysis") + result = judge_llm.generate( + system_prompt=system_prompt, + user_prompt=judge_prompt, + image_base64=image_base64 if image_base64 else "" + ) + + if isinstance(result, dict): + judge_output = result.get('content', '') + else: + judge_output = str(result) + + # Format the feedback using the universal formatter + score = evaluation_results.get('composite_score', 0.0) + feedback = format_universal_judge_feedback( + judge_output=judge_output, + task_input=task_input, + predicted_output=predicted_output, + expected_output=expected_output, + score=score + ) + + # ๐Ÿ”ฅ ADD FORMAT FEEDBACK: Append format-specific feedback + if self._detected_format: + from ..utils.format_detection import generate_format_feedback + format_feedback = generate_format_feedback( + predicted_output=predicted_output, + expected_output=expected_output, + format_info=self._detected_format + ) + if format_feedback: + feedback += format_feedback + + # Also add format constraint for next iteration + feedback += f"\n\n{self._detected_format['format_constraint']}" + + self.logger.info(f"โœ… Universal LLM-as-Judge generated feedback") + return feedback + + except Exception as e: + self.logger.error(f"LLM-as-Judge failed: {e}") + import traceback + self.logger.error(traceback.format_exc()) + # Fallback to simple feedback + return self._simple_actionable_feedback(evaluation_results, trace, current_prompt) + + def _extract_reasoning_from_expected(self, expected_output: str) -> str: + """Extract reasoning section from expected output.""" + if not expected_output: + return "" + + # Look for "Reason:" or "Reasoning:" section + reason_patterns = [ + r'Reason[:\s]+(.*?)(?:\n\n|\Z)', + r'Reasoning[:\s]+(.*?)(?:\n\n|\Z)', + ] + + for pattern in reason_patterns: + match = re.search(pattern, expected_output, re.IGNORECASE | re.DOTALL) + if match: + return match.group(1).strip()[:500] # Truncate to 500 chars + + return "" + + def _extract_reasoning_from_predicted(self, predicted_output: str) -> str: + """Extract reasoning from predicted output if available.""" + # Similar to _extract_reasoning_from_expected + # Or return first 200 chars if no clear reasoning section + if not predicted_output: + return "" + + # Look for reasoning patterns + reason_patterns = [ + r'Reason[:\s]+(.*?)(?:\n\n|\Z)', + r'Reasoning[:\s]+(.*?)(?:\n\n|\Z)', + ] + + for pattern in reason_patterns: + match = re.search(pattern, predicted_output, re.IGNORECASE | re.DOTALL) + if match: + return match.group(1).strip()[:500] + + # If no reasoning found, return first 200 chars + if len(predicted_output) > 200: + return predicted_output[:200] + "..." + return predicted_output + + def _simple_actionable_feedback( + self, + evaluation_results: Dict[str, Any], + trace: Dict[str, Any] = None, + current_prompt: Optional[str] = None + ) -> str: + """ + Simple feedback without LLM-as-Judge. + + ๐Ÿ”ฅ UNIVERSAL VERSION: Works for any task type. + """ + composite_score = evaluation_results.get("composite_score", 0.0) + semantic_sim = evaluation_results.get("semantic_similarity", 0.0) + structural_sim = evaluation_results.get("structural_similarity", 0.0) + + feedback_parts = [] + + # Extract task context if available + if trace: + input_data = trace.get('input_data', {}) + predicted = trace.get('predicted_output', '') + expected = input_data.get('output', '') + + # Check for empty output + if not predicted or not predicted.strip(): + feedback_parts.append( + "โŒ CRITICAL: No output generated. " + "Add explicit output instructions to the prompt." + ) + # Check for format mismatch + elif structural_sim < 0.5: + feedback_parts.append( + f"โš ๏ธ Format mismatch (structural similarity: {structural_sim:.0%}). " + "Add output format instructions (e.g., 'Return as JSON with fields: ...')." + ) + # Check for semantic mismatch + elif semantic_sim < 0.5: + feedback_parts.append( + f"โš ๏ธ Semantic mismatch (similarity: {semantic_sim:.0%}). " + "The output meaning differs from expected. Add clearer task instructions." + ) + + # Score-based feedback + if composite_score >= 0.9: + feedback_parts.append("โœ… Excellent match - prompt is working well.") + elif composite_score >= 0.8: + feedback_parts.append("โœ… Good match - minor refinements possible.") + elif composite_score >= 0.6: + feedback_parts.append( + f"โš ๏ธ Partial match (score: {composite_score:.0%}). " + "Consider adding examples or more specific field names to the prompt." + ) + elif composite_score >= 0.3: + feedback_parts.append( + f"โš ๏ธ Low match (score: {composite_score:.0%}). " + "The prompt needs clearer instructions about expected output format and content." + ) + else: + feedback_parts.append( + f"โŒ Poor match (score: {composite_score:.0%}). " + "Major revision required - add explicit output format, field names, and examples." + ) + + return "\n".join(feedback_parts) if feedback_parts else f"Score: {composite_score:.0%}" + + def get_best_candidate(self) -> Optional[Dict[str, str]]: + """ + Get the best candidate from GEPA Pareto front. + + GEPA Pareto front is the single source of truth because: + - All candidates (GEPA reflection, LLEGO crossover, LLEGO mutation) are evaluated on Dpareto + - All non-dominated candidates are added to GEPA Pareto front + - Therefore, the best candidate MUST be in GEPA Pareto front + + Returns: + Best candidate dictionary from GEPA Pareto front, or None if empty + """ + # PRIMARY: Get best candidate from GEPA Pareto front (single source of truth) + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + + if pareto_log.pareto_front: + try: + # Get best candidate from GEPA Pareto front (highest score = best) + gepa_best = max(pareto_log.pareto_front, key=lambda x: x['score']) + gepa_fitness = gepa_best['score'] + gepa_prompt = gepa_best['prompt'] + gepa_type = gepa_best.get('type', 'unknown') + gepa_notation = gepa_best.get('notation', 'S') + + self.logger.info(f"โœ… Best candidate from GEPA Pareto front: {gepa_notation} with f({gepa_notation})={gepa_fitness:.4f}") + self.logger.info(f" Type: {gepa_type}, Prompt length: {len(gepa_prompt)} chars") + self.logger.info(f" ๐Ÿ’ก GEPA Pareto front is single source of truth (all candidates evaluated on Dpareto)") + + return { + 'system_prompt': gepa_prompt, + 'fitness': gepa_fitness, + 'source': 'gepa_pareto_front', + 'candidate_type': gepa_type, + 'notation': gepa_notation + } + except Exception as e: + self.logger.error(f"โŒ Failed to get best from GEPA Pareto front: {e}") + import traceback + self.logger.error(traceback.format_exc()) + + # EDGE CASE: Pareto front empty (shouldn't happen, but handle gracefully) + self.logger.warning("โš ๏ธ GEPA Pareto front is empty - no best candidate available") + self.logger.warning(" This should not happen if all candidates are evaluated on Dpareto") + return None + + def get_best_score(self) -> float: + """Get the best score from GEPA Pareto front (single source of truth).""" + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + + if pareto_log.pareto_front: + try: + gepa_best_fitness = max(p['score'] for p in pareto_log.pareto_front) + return gepa_best_fitness + except Exception as e: + self.logger.warning(f"โš ๏ธ Failed to get best fitness from GEPA Pareto front: {e}") + + # Edge case: Pareto front empty - fallback to adapter's score + return self._best_score + + def log_proposed_candidate(self, candidate: Dict[str, str], iteration: int = 0): + """ + Pretty print the new proposed candidate prompt. + + Args: + candidate: The new candidate prompt from GEPA + iteration: Current optimization iteration + """ + system_prompt = candidate.get('system_prompt', '') + candidate_source = candidate.get('source', 'unknown') + + # Store source in adapter state so evaluate() can access it + self._current_evaluation_type = candidate_source + + # Also store in mapping by prompt text for lookup + if candidate_source != 'unknown' and system_prompt: + self._candidate_sources[system_prompt] = candidate_source + + # Use clean logger for simpler output + from ..utils.clean_logger import get_clean_logger + clean_log = get_clean_logger() + + # Update iteration if needed + if iteration > clean_log.current_iteration: + clean_log.log_iteration_start(iteration, seed_prompt=None) + + # Don't log here - let evaluate() handle it with full context + + def _log_reflection_dataset_creation(self, candidate: Dict[str, str], eval_batch: EvaluationBatch, + components_to_update: List[str]): + """ + Pretty print the reflection dataset creation process. + + Args: + candidate: Current candidate being evaluated + eval_batch: Evaluation results + components_to_update: Components being updated + """ + system_prompt = candidate.get('system_prompt', '') + + self.logger.info(f"๐Ÿ” DEBUG: Inside _log_reflection_dataset_creation") + self.logger.info(f"๐Ÿ” DEBUG: system_prompt length: {len(system_prompt)}") + self.logger.info(f"๐Ÿ” DEBUG: eval_batch.scores: {eval_batch.scores}") + self.logger.info(f"๐Ÿ” DEBUG: eval_batch.trajectories: {len(eval_batch.trajectories) if eval_batch.trajectories else 0}") + + # Determine candidate notation + notation_map = {'seed': 'Sโ‚€', 'gepa_reflection': 'Sแตฃ', 'llego_crossover': 'Oโ‚“โ‚’', 'llego_mutation': 'Oโ‚˜แตคโ‚œ'} + notation = notation_map.get(self._current_evaluation_type, 'S') + cand_num = self._evaluation_count if hasattr(self, '_evaluation_count') else '?' + cand_label = f"{notation}{cand_num}" + + # Use logger for the main output too + self.logger.info("\n" + "="*80) + self.logger.info("๐Ÿ” REFLECTION DATASET CREATION") + self.logger.info("="*80) + + self.logger.info(f"\n๐Ÿ“‹ CURRENT PROMPT BEING ANALYZED: {cand_label}") + self.logger.info(f" Candidate Type: {self._current_evaluation_type or 'unknown'}") + self.logger.info("-" * 40) + self.logger.info(f'"{system_prompt}"') + self.logger.info("-" * 40) + + self.logger.info(f"\n๐Ÿ“Š EVALUATION SUMMARY:") + self.logger.info("-" * 40) + if eval_batch.scores: + avg_score = sum(eval_batch.scores) / len(eval_batch.scores) + min_score = min(eval_batch.scores) + max_score = max(eval_batch.scores) + self.logger.info(f" โ€ข Average Score: {avg_score:.4f}") + self.logger.info(f" โ€ข Min Score: {min_score:.4f}") + self.logger.info(f" โ€ข Max Score: {max_score:.4f}") + self.logger.info(f" โ€ข Total Samples: {len(eval_batch.scores)}") + + self.logger.info(f"\n๐ŸŽฏ COMPONENTS TO UPDATE:") + self.logger.info("-" * 40) + for i, component in enumerate(components_to_update, 1): + self.logger.info(f" {i}. {component}") + + if eval_batch.trajectories: + self.logger.info(f"\n๐Ÿ” DETAILED ANALYSIS (FULL FEEDBACK - NO TRUNCATION):") + self.logger.info("-" * 80) + for i, trace in enumerate(eval_batch.trajectories[:5], 1): # Show first 5 samples with FULL details + evaluation_results = trace['evaluation_results'] + composite_score = evaluation_results.get("composite_score", 0.0) + + # Extract element IDs for concise logging + predicted_element = evaluation_results.get('predicted_element', 'Unknown') + expected_element = evaluation_results.get('expected_element', 'Unknown') + + # Concise, direct logging with candidate notation + status_icon = "โœ…" if composite_score == 1.0 else "โŒ" + + # Add notation for candidate type + notation_map = {'seed': 'Sโ‚€', 'gepa_reflection': 'Sแตฃ', 'llego_crossover': 'Oโ‚“โ‚’', 'llego_mutation': 'Oโ‚˜แตคโ‚œ'} + notation = notation_map.get(self._current_evaluation_type, 'S') + + self.logger.info(f" [{notation}] Sample {i}: Predicted={predicted_element}, Expected={expected_element}, Score={composite_score:.2f} {status_icon}") + + # ๐Ÿ”ฅ FIX: Pass trace and current_prompt to enable LLM-as-Judge! + feedback = self._generate_feedback( + evaluation_results, + trace=trace, # Pass the full trace! + current_prompt=system_prompt # Pass current prompt being analyzed! + ) + self.logger.info(f" ๐Ÿ’ฌ FEEDBACK (FULL):") + self.logger.info(f" \"{feedback}\"") + + if len(eval_batch.trajectories) > 5: + self.logger.info(f"\n ... and {len(eval_batch.trajectories) - 5} more samples (all logged similarly)") + + self.logger.info("="*80) + + def _extract_clean_prompt_from_reflection(self, reflection_output: str) -> str: + """ + ๐Ÿ›ก๏ธ DEFENSIVE FALLBACK: Extract clean prompt if LLM adds analysis despite system prompt instructions. + + NOTE: The system prompt now explicitly instructs the LLM to output ONLY the prompt text. + However, this extraction logic serves as a safety net in case the LLM still adds: + "Based on the performance analysis... + ### Recommendations... + ### Revised Prompt Example: + [THE ACTUAL PROMPT HERE] + ### Conclusion..." + + This is now a defensive measure, not the primary mechanism. + + Args: + reflection_output: Full reflection output (should be clean prompt, but may contain analysis) + + Returns: + str: Clean, extracted prompt (or original if extraction fails or not needed) + """ + if not reflection_output or not isinstance(reflection_output, str): + return reflection_output + + # Pattern 1: Look for "Revised Prompt Example:" or "### Revised Prompt Example:" + patterns = [ + r'(?:###\s*)?Revised\s+Prompt\s+(?:Example|:)?\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', + r'(?:###\s*)?Revised\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', + r'(?:###\s*)?Optimized\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', + r'(?:###\s*)?New\s+Prompt\s*:\s*\n(.*?)(?:\n###|\n##|\n---|\Z)', + r'(?:Here\s+is|Here\'s)\s+a?\s*refined?\s+(?:version\s+of\s+)?(?:the\s+)?prompt\s*[:\n](.*?)(?:\n###|\n##|\n---|\Z)', + ] + + for pattern in patterns: + match = re.search(pattern, reflection_output, re.IGNORECASE | re.DOTALL) + if match: + extracted = match.group(1).strip() + # Clean up common artifacts + extracted = re.sub(r'^```(?:plaintext|markdown|text)?\s*\n', '', extracted, flags=re.MULTILINE) + extracted = re.sub(r'\n```\s*$', '', extracted, flags=re.MULTILINE) + extracted = extracted.strip() + + if len(extracted) > 50: # Reasonable minimum length for a prompt + self.logger.debug(f"โœ… Extracted clean prompt using pattern: {pattern[:50]}...") + self.logger.debug(f" Original length: {len(reflection_output)} chars") + self.logger.debug(f" Extracted length: {len(extracted)} chars") + return extracted + + # Pattern 2: If output starts with a quote or prompt-like structure + # Look for text that starts with "You are..." and is substantial + if 'You are' in reflection_output: + # Find the longest continuous block that starts with "You are" + prompt_match = re.search(r'(You are[^#]*?)(?:\n###|\n##|###|##|Conclusion|\Z)', + reflection_output, re.IGNORECASE | re.DOTALL) + if prompt_match: + extracted = prompt_match.group(1).strip() + if len(extracted) > 50: + self.logger.debug(f"โœ… Extracted prompt starting with 'You are...'") + return extracted + + # Pattern 3: If the reflection output is actually just a clean prompt (no analysis) + # Check if it's relatively short and doesn't contain analysis keywords + analysis_keywords = ['recommendation', 'suggestion', 'improvement', 'conclusion', + 'optimization', 'analysis', 'feedback'] + if (len(reflection_output) < 2000 and + not any(keyword in reflection_output.lower() for keyword in analysis_keywords)): + # Likely a clean prompt, return as-is + self.logger.debug(f"โœ… Reflection output appears to be a clean prompt (no analysis detected)") + return reflection_output.strip() + + # Fallback: Return original (with warning) + self.logger.warning(f"โš ๏ธ Could not extract clean prompt from reflection output") + self.logger.warning(f" Output length: {len(reflection_output)} chars") + self.logger.warning(f" Output preview: {reflection_output[:200]}...") + self.logger.warning(f" Returning original output (may contain analysis text)") + return reflection_output.strip() + + def _parse_json_variations(self, response_text: str, num_expected: int) -> List[str]: + """ + ๐Ÿ”ฅ OPTIMIZED: Parse N prompt variations from JSON format response. + + Uses robust JSON parsing with multiple fallback strategies: + 1. Extract JSON from markdown code blocks (```json ... ```) + 2. Find JSON object directly in text + 3. Attempt JSON repair for common issues + 4. Fallback to numbered section parsing if JSON fails + + Args: + response_text: LLM response containing JSON with variations + num_expected: Expected number of variations + + Returns: + List[str]: List of prompt variations (in order by index) + + Raises: + ValueError: If parsing fails and no valid variations found + """ + import json + import re + + if not response_text or not isinstance(response_text, str): + raise ValueError("Empty or invalid response text") + + # Strategy 1: Extract JSON from markdown code block + json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL) + if json_match: + json_str = json_match.group(1) + try: + data = json.loads(json_str) + return self._extract_variations_from_json(data, num_expected) + except json.JSONDecodeError as e: + self.logger.debug(f"JSON in code block invalid: {e}, trying repair...") + + # Strategy 2: Find JSON object directly in text + json_match = re.search(r'\{[^{}]*"variations"[^{}]*\[.*?\]\s*[^{}]*\}', response_text, re.DOTALL) + if json_match: + json_str = json_match.group(0) + try: + data = json.loads(json_str) + return self._extract_variations_from_json(data, num_expected) + except json.JSONDecodeError: + # Try to find largest JSON object + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if json_match: + try: + data = json.loads(json_match.group(0)) + return self._extract_variations_from_json(data, num_expected) + except json.JSONDecodeError: + pass + + # Strategy 3: Attempt JSON repair (common issues: trailing commas, unescaped quotes) + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if json_match: + json_str = json_match.group(0) + # Try common repairs + repaired = re.sub(r',\s*}', '}', json_str) # Remove trailing commas before } + repaired = re.sub(r',\s*]', ']', repaired) # Remove trailing commas before ] + try: + data = json.loads(repaired) + return self._extract_variations_from_json(data, num_expected) + except json.JSONDecodeError: + pass + + # Strategy 4: Fallback to numbered section parsing + self.logger.warning(f"JSON parsing failed, trying numbered section fallback...") + try: + return self._parse_numbered_section_variations(response_text, num_expected) + except ValueError: + pass + + # All strategies failed + raise ValueError(f"Could not parse {num_expected} variations from response. Response preview: {response_text[:300]}...") + + def _extract_variations_from_json(self, data: Dict[str, Any], num_expected: int) -> List[str]: + """Extract and validate variations from parsed JSON data.""" + if not isinstance(data, dict): + raise ValueError("JSON data is not a dictionary") + + variations_list = data.get('variations', []) + if not isinstance(variations_list, list): + raise ValueError("'variations' field is not a list") + + if len(variations_list) < num_expected: + self.logger.warning(f"Expected {num_expected} variations, found {len(variations_list)} in JSON") + + # Extract and sort by index + variations_with_index = [] + for var in variations_list: + if not isinstance(var, dict): + continue + index = var.get('index', 0) + prompt = var.get('prompt', '') + if prompt and isinstance(prompt, str): + variations_with_index.append((index, prompt.strip())) + + # Sort by index + variations_with_index.sort(key=lambda x: x[0]) + + # Extract just the prompts + variations = [v[1] for v in variations_with_index] + + # Validate count + if len(variations) < num_expected: + self.logger.warning(f"Only {len(variations)} valid variations found, expected {num_expected}") + # Pad with duplicates if needed (not ideal but better than failing) + while len(variations) < num_expected: + variations.append(variations[-1] if variations else "") + + # Take first N if we got more + variations = variations[:num_expected] + + # Validate all variations are non-empty + if not all(v for v in variations): + raise ValueError(f"Some variations are empty after parsing") + + return variations + + def _parse_numbered_section_variations(self, response_text: str, num_expected: int) -> List[str]: + """ + Fallback parser: Extract variations from numbered sections. + + Format: --- VARIATION N --- or Variation N: or similar + """ + variations = [] + + # Pattern 1: --- VARIATION N --- + pattern1 = r'---\s*VARIATION\s+(\d+)\s*---\s*\n(.*?)(?=\n---\s*VARIATION|\Z)' + matches1 = re.findall(pattern1, response_text, re.DOTALL | re.IGNORECASE) + + # Pattern 2: Variation N: + pattern2 = r'Variation\s+(\d+)\s*:?\s*\n(.*?)(?=\nVariation\s+\d+|$)' + matches2 = re.findall(pattern2, response_text, re.DOTALL | re.IGNORECASE) + + # Pattern 3: Numbered list (1. 2. 3.) + pattern3 = r'(\d+)\.\s*\n(.*?)(?=\n\d+\.|$)' + matches3 = re.findall(pattern3, response_text, re.DOTALL) + + # Use the pattern with most matches + matches = matches1 if len(matches1) >= num_expected else (matches2 if len(matches2) >= num_expected else matches3) + + if len(matches) >= num_expected: + # Sort by index + matches.sort(key=lambda x: int(x[0])) + # Extract prompts + variations = [match[1].strip() for match in matches[:num_expected]] + + if len(variations) != num_expected: + raise ValueError(f"Numbered section parsing found {len(variations)} variations, expected {num_expected}") + + return variations + + def _generate_hybrid_candidates_adapter_level( + self, + current_prompt: str, + eval_batch: EvaluationBatch, + candidate: Dict[str, str] + ) -> List[str]: + """ + ๐Ÿ”ฅ ADAPTER-LEVEL HYBRID CANDIDATE GENERATION + + Generate candidates from BOTH GEPA reflection AND LLEGO operators + when GEPA's adapter mode ignores the reflection_lm parameter. + + This method: + 1. Builds comprehensive feedback from evaluation results + 2. Generates GEPA reflection candidates + 3. Generates LLEGO crossover/mutation candidates + 4. Logs ALL candidates with FULL prompts (no truncation) + 5. Stores candidates for potential use + + Args: + current_prompt: The current prompt being optimized + eval_batch: Evaluation results with trajectories + candidate: Current candidate dict + + Returns: + List of generated candidate prompts + """ + try: + from ..llms.llego_enhanced_llm import LLEGOEnhancedLLMClient + + all_candidates = [] + gepa_count = 0 + + # ๐Ÿ”ฅ CRITICAL: Pass format info to LLM client before generating candidates + if self._detected_format and self._reflection_lm_client: + if isinstance(self._reflection_lm_client, LLEGOEnhancedLLMClient): + self._reflection_lm_client._detected_format = self._detected_format + self.logger.info(f"๐Ÿ“ Passed format info to reflection LLM: {self._detected_format['format_type']}") + + self.logger.info(f"๐Ÿ”ฅ STEP 1: Building comprehensive feedback from evaluation") + + # ๐Ÿ”ฅ REMOVED: Excessive diagnostic logs - moved to DEBUG level + # Build comprehensive feedback text from trajectories + if not hasattr(eval_batch, 'trajectories'): + self.logger.error(f"โŒ eval_batch has no 'trajectories' attribute! Type: {type(eval_batch)}") + return [] + + trajectories = eval_batch.trajectories + if not trajectories: + self.logger.warning(f"โš ๏ธ eval_batch.trajectories is empty - no feedback to generate candidates from") + return [] + + self.logger.debug(f"Processing {len(trajectories)} trajectories for feedback generation") + + feedback_lines = [] + feedback_lines.append(f"Current prompt performance analysis:\n") + feedback_lines.append(f"Current prompt:\n{current_prompt}\n") + feedback_lines.append(f"\nEvaluation results:\n") + + for i, trace in enumerate(trajectories[:8], 1): # Use up to 8 samples for feedback + try: + eval_results = trace.get('evaluation_results', {}) + score = eval_results.get("composite_score", 0.0) if isinstance(eval_results, dict) else 0.0 + input_data = trace.get('input_data', {}) + predicted = trace.get('predicted_output', '') + expected = input_data.get('output', '') if isinstance(input_data, dict) else '' + + # ๐Ÿ”ฅ FIX: Clean input_data to remove base64 images before logging + input_data_clean = input_data.copy() if isinstance(input_data, dict) else {} + if 'image_base64' in input_data_clean: + input_data_clean['image_base64'] = f"[IMAGE_DATA_{len(input_data_clean['image_base64'])}_chars]" + + feedback_lines.append(f" Sample {i}:") + feedback_lines.append(f" Input: {input_data_clean.get('input', '') if isinstance(input_data_clean, dict) else ''}") + feedback_lines.append(f" Expected: {expected}") + feedback_lines.append(f" Predicted: {predicted}") + feedback_lines.append(f" Score: {score:.4f}") + + if isinstance(eval_results, dict): + # ๐Ÿ”ฅ FIX: Pass trace and current_prompt to enable LLM-as-Judge! + feedback = self._generate_feedback( + eval_results, + trace=trace, # Pass the full trace! + current_prompt=current_prompt # Pass current prompt! + ) + feedback_lines.append(f" Feedback: {feedback}") + else: + feedback_lines.append(f" Feedback: Evaluation results not in expected format") + feedback_lines.append("") + except Exception as e: + self.logger.error(f"โŒ Error processing trace {i}: {e}") + import traceback + self.logger.error(traceback.format_exc()) + continue + + feedback_text = "\n".join(feedback_lines) + + self.logger.info(f"\n๐Ÿ“‹ FULL FEEDBACK TEXT (NO TRUNCATION):") + self.logger.info(feedback_text) + + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + # PART 1: GEPA REFLECTION CANDIDATES + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€๏ฟฝ๏ฟฝโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + self.logger.info(f"๐Ÿ“ PART 2: GEPA REFLECTION - Semantic Understanding") + + num_gepa = self._config.num_gepa_reflection_candidates if hasattr(self._config, 'num_gepa_reflection_candidates') else 3 + + self.logger.info(f"\n๐Ÿ“ Generating {num_gepa} GEPA Reflection candidates in single optimized call...") + + # Set reflection context + if isinstance(self._reflection_lm_client, LLEGOEnhancedLLMClient): + self._reflection_lm_client.set_reflection_context( + current_prompt=current_prompt, + feedback=eval_batch, + in_reflection=True + ) + + # ๐Ÿ”ฅ OPTIMIZED: Single call with JSON format for multiple variations + try: + # Precision-engineered system prompt requesting JSON format + optimization_system_prompt = f"""You are an expert prompt engineer specializing in iterative prompt optimization. + +Your task: Given the CURRENT PROMPT and its EVALUATION FEEDBACK, generate {num_gepa} DISTINCT variations of improved prompts that address the identified issues through DIFFERENT improvement strategies. + +CRITICAL OUTPUT FORMAT - MUST BE VALID JSON: +{{ + "variations": [ + {{ + "index": 1, + "prompt": "[First improved prompt text - complete and self-contained]" + }}, + {{ + "index": 2, + "prompt": "[Second improved prompt text - complete and self-contained]" + }}, + {{ + "index": 3, + "prompt": "[Third improved prompt text - complete and self-contained]" + }} + ] +}} + +DIVERSITY REQUIREMENTS: +- Variation 1: Focus on clarity, specificity, and explicit instructions +- Variation 2: Focus on edge case handling, robustness, and error prevention +- Variation 3: Focus on structural organization, examples, and step-by-step guidance +- Each variation must be MEANINGFULLY DIFFERENT (not just rewordings) +- Each variation must address ALL feedback issues but through different approaches + +QUALITY STANDARDS (apply to all variations): +- Be specific and concrete (avoid vague instructions) +- Use clear, imperative language for task instructions +- Include edge case handling if feedback identifies confusion +- Ensure each prompt is self-contained and unambiguous +- Preserve the core task domain and output format requirements + +OUTPUT FORMAT: +- Output MUST be valid JSON (can be wrapped in ```json ... ``` markdown code block) +- Generate EXACTLY {num_gepa} variations +- Index must be 1, 2, 3, ... (sequential, starting at 1) +- Each "prompt" field must contain the complete, self-contained prompt text +- NO explanations, NO analysis, NO meta-commentary - just the JSON structure + +DO NOT include: +- Analysis of what went wrong +- Explanations of your changes +- Meta-text like "Here's an improved version..." or "Based on feedback..." +- Recommendations or suggestions (those are already in the feedback) +- Any text outside the JSON structure + +Output ONLY the JSON object with the variations.""" + + # Construct user prompt with clear structure + optimization_user_prompt = f"""CURRENT PROMPT (to be improved): +{current_prompt} + +{feedback_text} + +TASK: Generate {num_gepa} DISTINCT variations of improved prompts. Each variation should: +- Address ALL feedback issues identified above +- Use a DIFFERENT improvement strategy (clarity, robustness, structure) +- Be meaningfully different from the others (not just rewordings) +- Be complete and self-contained + +Remember: Output ONLY the JSON object with {num_gepa} variations. No explanations.""" + + result = self._reflection_lm_client.generate( + system_prompt=optimization_system_prompt, + user_prompt=optimization_user_prompt, + image_base64="" + ) + + if isinstance(result, dict): + response_text = result.get("content", str(result)) + else: + response_text = str(result) + + # Parse JSON variations + gepa_variations = self._parse_json_variations(response_text, num_gepa) + + # Add all variations to candidates + for idx, variation_prompt in enumerate(gepa_variations, 1): + # ๐Ÿ›ก๏ธ DEFENSIVE FALLBACK: Extract clean prompt if LLM adds analysis despite instructions + gepa_candidate = self._extract_clean_prompt_from_reflection(variation_prompt) + + if gepa_candidate != variation_prompt: + self.logger.debug(f" Variation {idx}: Extracted clean prompt (removed {len(variation_prompt) - len(gepa_candidate)} chars)") + + all_candidates.append({ + 'prompt': gepa_candidate, + 'source': 'gepa_reflection', + 'index': idx + }) + + # ๐Ÿ”ฅ CAPTURE CANDIDATE FOR LIVE UI DISPLAY + try: + import sys + if 'app' in sys.modules: + app_module = sys.modules['app'] + if hasattr(app_module, 'add_candidate_to_store'): + app_module.add_candidate_to_store({ + 'prompt': gepa_candidate, + 'source': 'gepa_reflection', + 'timestamp': f"Candidate #{idx}" + }) + except Exception: + pass # Silent fail - UI capture is optional + + self.logger.info(f"\nโœ… GEPA REFLECTION CANDIDATE #{idx}/{num_gepa} (FULL PROMPT - NO TRUNCATION):") + self.logger.info(f"{'โ–“'*80}") + self.logger.info(f"{gepa_candidate}") + self.logger.info(f"{'โ–“'*80}") + self.logger.info(f" Length: {len(gepa_candidate)} chars, Words: {len(gepa_candidate.split())}") + + gepa_count = len(all_candidates) + self.logger.info(f"\nโœ… GEPA Reflection: {gepa_count} candidates generated in single optimized call") + + except Exception as e: + self.logger.error(f"โŒ Error generating GEPA reflection candidates: {e}") + self.logger.warning(f" Falling back to sequential generation...") + import traceback + self.logger.debug(traceback.format_exc()) + + # Fallback: Sequential generation (when JSON parsing fails) + for i in range(num_gepa): + self.logger.info(f"\n๐Ÿ“ Generating GEPA Reflection candidate #{i+1}/{num_gepa} (fallback mode)...") + try: + fallback_user_prompt = f"""CURRENT PROMPT (to be improved): +{current_prompt} + +{feedback_text} + +TASK: Generate an improved version of the CURRENT PROMPT that addresses all issues identified in the evaluation feedback above. + +Remember: Output ONLY the improved prompt text. No explanations.""" + + result = self._reflection_lm_client.generate( + system_prompt=self._FALLBACK_SYSTEM_PROMPT, + user_prompt=fallback_user_prompt, + image_base64="" + ) + + if isinstance(result, dict): + gepa_candidate_raw = result.get("content", str(result)) + else: + gepa_candidate_raw = str(result) + + gepa_candidate = self._extract_clean_prompt_from_reflection(gepa_candidate_raw) + + all_candidates.append({ + 'prompt': gepa_candidate, + 'source': 'gepa_reflection', + 'index': i + 1 + }) + + # ๐Ÿ”ฅ CAPTURE CANDIDATE FOR LIVE UI DISPLAY + try: + import sys + if 'app' in sys.modules: + app_module = sys.modules['app'] + if hasattr(app_module, 'add_candidate_to_store'): + app_module.add_candidate_to_store({ + 'prompt': gepa_candidate, + 'source': 'gepa_reflection', + 'timestamp': f"Fallback #{i+1}" + }) + except Exception: + pass # Silent fail - UI capture is optional + except Exception as fallback_error: + self.logger.error(f"โŒ Error in fallback generation #{i+1}: {fallback_error}") + + gepa_count = len(all_candidates) + if gepa_count > 0: + self.logger.info(f"\nโœ… GEPA Reflection: {gepa_count} candidates generated") + + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + # PART 2: LLEGO GENETIC OPERATORS + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + self.logger.info(f"๐Ÿงฌ PART 3: LLEGO GENETIC OPERATORS - Structural Diversity") + + if self.llego: + # ๐Ÿ”ฅ FIX 2: Get Pareto front from GEPA (not LLEGO population) + # This ensures LLEGO operators use true non-dominated solutions + from ..utils.pareto_logger import get_pareto_logger + pareto_log = get_pareto_logger() + gepa_pareto_front = pareto_log.pareto_front + + # Convert GEPA Pareto front to PromptCandidate format + pareto_candidates = self.llego._convert_gepa_pareto_to_candidates(gepa_pareto_front) + pareto_front = pareto_candidates + + self.logger.info(f" Using GEPA Pareto front (size: {len(gepa_pareto_front)})") + self.logger.info(f" Converted to {len(pareto_front)} PromptCandidate objects") + for idx, p in enumerate(pareto_front, 1): + cand_type = p.metadata.get('candidate_type', 'unknown') if p.metadata else 'unknown' + notation = p.metadata.get('notation', 'S') if p.metadata else 'S' + self.logger.info(f" {notation}: [fitness={p.fitness:.3f}, type={cand_type}, length={len(p.prompt)} chars]") + + # Create LLM callable for LLEGO + def llm_callable(genetic_prompt: str) -> str: + # ๐Ÿ”ฅ LLEGO genetic prompt already contains full instructions + # Use minimal system prompt to avoid instruction conflict + result = self._reflection_lm_client.generate( + system_prompt="You are an expert prompt engineer. Follow the instructions provided in the user message to generate an improved prompt. Output only the prompt text, no explanations.", + user_prompt=genetic_prompt, + image_base64="" + ) + if isinstance(result, dict): + return result.get('content', str(result)) + return str(result) + + # Generate LLEGO offspring + try: + llego_prompts = self.llego.evolve_generation( + llm=llm_callable, + pareto_front=pareto_front + ) + + n_crossover = self._config.n_crossover if hasattr(self._config, 'n_crossover') else 2 + crossover_count = min(n_crossover, len(llego_prompts)) + + for i, prompt in enumerate(llego_prompts): + if i < crossover_count: + source = 'llego_crossover' + else: + source = 'llego_mutation' + + all_candidates.append({ + 'prompt': prompt, + 'source': source, + 'index': i + 1 + }) + + # ๐Ÿ”ฅ CAPTURE CANDIDATE FOR LIVE UI DISPLAY + try: + import sys + if 'app' in sys.modules: + app_module = sys.modules['app'] + if hasattr(app_module, 'add_candidate_to_store'): + app_module.add_candidate_to_store({ + 'prompt': prompt, + 'source': source, + 'timestamp': f"Candidate #{i+1}" + }) + except Exception: + pass # Silent fail - UI capture is optional + + border_char = "โ–“" if source == 'llego_crossover' else "โ–’" + self.logger.info(f"\n{border_char*80}") + self.logger.info(f"{border_char} {'๐Ÿ”€ LLEGO CROSSOVER' if source == 'llego_crossover' else '๐ŸŽฒ LLEGO MUTATION'} candidate #{i+1}") + self.logger.info(f"{border_char*80}") + self.logger.info(f"{prompt}") + self.logger.info(f"{border_char*80}") + self.logger.info(f" Length: {len(prompt)} chars, Words: {len(prompt.split())}") + + self.logger.info(f"โœ… LLEGO Genetic Operators: {len(llego_prompts)} candidates generated") + + except Exception as e: + self.logger.error(f"โŒ Error generating LLEGO candidates: {e}") + import traceback + self.logger.error(traceback.format_exc()) + + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + # SUMMARY + # โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + self.logger.info(f"\n{'='*80}") + self.logger.info(f"๐Ÿ“Š ADAPTER-LEVEL HYBRID GENERATION SUMMARY") + self.logger.info(f"{'='*80}") + self.logger.info(f" ๐Ÿ“ GEPA Reflection: {gepa_count} candidates") + self.logger.info(f" ๐Ÿ”€ LLEGO Crossover: {len([c for c in all_candidates if c['source'] == 'llego_crossover'])} candidates") + self.logger.info(f" ๐ŸŽฒ LLEGO Mutation: {len([c for c in all_candidates if c['source'] == 'llego_mutation'])} candidates") + self.logger.info(f" ๐Ÿ“ฆ TOTAL: {len(all_candidates)} diverse candidates") + self.logger.info(f"{'='*80}\n") + + # Store candidates (GEPA might access them through some mechanism) + self._generated_candidates = all_candidates + + # Log each candidate with FULL text + self.logger.info(f"\n{'='*80}") + self.logger.info(f"๐Ÿ“‹ ALL GENERATED CANDIDATES (FULL PROMPTS - NO TRUNCATION)") + self.logger.info(f"{'='*80}") + for i, cand in enumerate(all_candidates, 1): + source_emoji = "๐Ÿ“" if cand['source'] == 'gepa_reflection' else "๐Ÿ”€" if cand['source'] == 'llego_crossover' else "๐ŸŽฒ" + self.logger.info(f"\n{source_emoji} CANDIDATE #{i} - {cand['source'].upper().replace('_', ' ')}") + self.logger.info(f"{cand['prompt']}") + self.logger.info(f" Length: {len(cand['prompt'])} characters") + self.logger.info(f" Words: {len(cand['prompt'].split())} words") + self.logger.info(f"{'='*80}\n") + + # Return candidates as list of dicts with metadata (not just strings) + # This ensures source information is preserved + return all_candidates # Return full dicts with source info + + except Exception as e: + self.logger.error(f"\n{'โŒ'*80}") + self.logger.error(f"โŒ CRITICAL ERROR in _generate_hybrid_candidates_adapter_level!") + self.logger.error(f"โŒ Error: {str(e)}") + self.logger.error(f"{'โŒ'*80}\n") + import traceback + self.logger.error(traceback.format_exc()) + return [] + + def propose_new_texts( + self, + candidate: Dict[str, str], + reflective_dataset: Dict[str, List[Dict[str, Any]]], + components_to_update: List[str] + ) -> Dict[str, str]: + """ + ๐Ÿ”ฅ CRITICAL: This method is called by GEPA to propose new component texts. + + This is the KEY integration point - GEPA checks if adapter.propose_new_texts exists, + and if it does, uses it instead of the default InstructionProposalSignature. + + This method: + 1. Uses reflective_dataset to generate improved prompts + 2. Optionally uses LLEGO for additional diversity + 3. Returns dict mapping component_name -> new component text + + Args: + candidate: Current candidate dict (component_name -> component_text) + reflective_dataset: Feedback data per component (from make_reflective_dataset) + components_to_update: List of component names to update + + Returns: + Dict mapping component_name -> new component text + """ + self.logger.info(f"\n{'='*80}") + self.logger.info(f"๐ŸŽฏ PROPOSE_NEW_TEXTS CALLED BY GEPA") + self.logger.info(f"{'='*80}") + self.logger.info(f" Components to update: {components_to_update}") + self.logger.info(f" Reflective dataset keys: {list(reflective_dataset.keys())}") + + # ๐Ÿ”ฅ FIX: Check if we already generated candidates in hybrid mode + # If yes, return one of them instead of generating a new one (avoids duplicate work and context overflow) + if hasattr(self, '_generated_candidates') and self._generated_candidates: + self.logger.info(f"\nโœ… HYBRID MODE: Using pre-generated candidates from make_reflective_dataset") + self.logger.info(f" Available candidates: {len(self._generated_candidates)}") + self.logger.info(f" Returning first candidate (GEPA will evaluate all of them)") + + # Return the first candidate (GEPA will get others via queue) + first_candidate = self._generated_candidates[0] + new_texts = {} + for component in components_to_update: + if isinstance(first_candidate, dict) and 'prompt' in first_candidate: + new_texts[component] = first_candidate['prompt'] + source = first_candidate.get('source', 'unknown') + self.logger.info(f" Returning {source} candidate (length: {len(first_candidate['prompt'])} chars)") + else: + new_texts[component] = str(first_candidate) + + self.logger.info(f"{'='*80}\n") + return new_texts + + new_texts = {} + + # Check if we have reflection_lm_client (required for proposal) + if not self._reflection_lm_client: + self.logger.error("โŒ reflection_lm_client not available - cannot generate proposals") + # Fallback: return current candidate (no change) + for component in components_to_update: + new_texts[component] = candidate.get(component, '') + return new_texts + + # For each component to update + for component_name in components_to_update: + self.logger.info(f"๐Ÿ“ Proposing new text for component: {component_name}") + + current_text = candidate.get(component_name, '') + dataset = reflective_dataset.get(component_name, []) + + if not dataset: + self.logger.warning(f"โš ๏ธ No feedback data for {component_name}, keeping current text") + new_texts[component_name] = current_text + continue + + self.logger.info(f" Current text length: {len(current_text)} chars") + self.logger.info(f" Feedback examples: {len(dataset)}") + + # Generate improved prompt using reflection LM + try: + # ๐Ÿ”ฅ FIX: Clean dataset to remove base64 images (prevents context overflow) + cleaned_dataset = [] + for item in dataset: + cleaned_item = item.copy() + # Remove or truncate base64 image data + if 'image_base64' in cleaned_item: + img_len = len(cleaned_item['image_base64']) + cleaned_item['image_base64'] = f'[IMAGE_DATA_REMOVED_{img_len}_chars]' + if 'image' in cleaned_item and isinstance(cleaned_item['image'], str) and len(cleaned_item['image']) > 1000: + img_len = len(cleaned_item['image']) + cleaned_item['image'] = f'[IMAGE_DATA_REMOVED_{img_len}_chars]' + # Also clean any nested detailed_scores + if 'detailed_scores' in cleaned_item and isinstance(cleaned_item['detailed_scores'], dict): + for key in list(cleaned_item['detailed_scores'].keys()): + val = cleaned_item['detailed_scores'][key] + if isinstance(val, str) and len(val) > 5000: + cleaned_item['detailed_scores'][key] = f'[LARGE_DATA_REMOVED_{len(val)}_chars]' + cleaned_dataset.append(cleaned_item) + + self.logger.info(f" ๐Ÿ“‹ Cleaned dataset: removed base64 images to prevent context overflow") + + # Use GEPA's default instruction proposal format + from gepa.strategies.instruction_proposal import InstructionProposalSignature + + # Build input dict for GEPA's instruction proposal + input_dict = { + "current_instruction_doc": current_text, + "dataset_with_feedback": cleaned_dataset # Use cleaned dataset! + } + + # Generate prompt using GEPA's signature + prompt = InstructionProposalSignature.prompt_renderer(input_dict) + + # Call reflection LM to generate new instruction + self.logger.info(f" Generating improved prompt via reflection LM...") + + result = self._reflection_lm_client.generate( + system_prompt="You are an expert prompt engineer. Follow the instructions in the user message to generate an improved prompt.", + user_prompt=prompt, + image_base64="" + ) + + # Extract response + if isinstance(result, dict): + response_text = result.get("content", str(result)) + else: + response_text = str(result) + + # Extract instruction using GEPA's extractor + extracted = InstructionProposalSignature.output_extractor(response_text) + new_instruction = extracted.get("new_instruction", response_text.strip()) + + # Clean up the instruction (remove markdown, quotes, etc.) + new_instruction = self._clean_extracted_prompt(new_instruction) + + self.logger.info(f" โœ… Generated new text (length: {len(new_instruction)} chars)") + self.logger.info(f" Preview: {new_instruction[:150]}...") + + new_texts[component_name] = new_instruction + + except Exception as e: + self.logger.error(f"โŒ Error generating proposal for {component_name}: {e}") + import traceback + self.logger.error(traceback.format_exc()) + # Fallback: return current text + new_texts[component_name] = current_text + + self.logger.info(f"\n{'='*80}") + self.logger.info(f"โœ… PROPOSE_NEW_TEXTS COMPLETE") + self.logger.info(f" Generated {len(new_texts)} new component texts") + self.logger.info(f"{'='*80}\n") + + return new_texts + + def _clean_extracted_prompt(self, prompt: str) -> str: + """ + Clean extracted prompt by removing markdown, quotes, and extra whitespace. + + Args: + prompt: Raw extracted prompt text + + Returns: + Cleaned prompt text + """ + if not prompt: + return prompt + + # Remove markdown code blocks + prompt = re.sub(r'```[\w]*\n?', '', prompt) + prompt = re.sub(r'```', '', prompt) + + # Remove quotes if entire prompt is quoted + prompt = prompt.strip() + if (prompt.startswith('"') and prompt.endswith('"')) or \ + (prompt.startswith("'") and prompt.endswith("'")): + prompt = prompt[1:-1] + + # Remove leading/trailing whitespace + prompt = prompt.strip() + + return prompt \ No newline at end of file