""" Pareto Front Logger - Tracks candidate comparisons and Pareto front updates """ from typing import Dict, List, Optional from collections import defaultdict import logging logger = logging.getLogger(__name__) class ParetoLogger: """Tracks evaluations and Pareto front updates""" def __init__(self): self.candidates_evaluated = [] # List of (prompt, score, type, dataset) self.pareto_front = [] # Current Pareto front (prompt, score, type) self.baseline_score = None def log_candidate_evaluation(self, prompt: str, score: float, candidate_type: str, dataset_type: str): """Log a candidate evaluation""" self.candidates_evaluated.append({ 'prompt': prompt, 'score': score, 'type': candidate_type, 'dataset': dataset_type }) # If evaluated on Dpareto, check against Pareto front if dataset_type == 'dpareto': self._check_pareto_update(prompt, score, candidate_type) def _check_pareto_update(self, prompt: str, score: float, candidate_type: str): """Check if candidate should be added to Pareto front ๐Ÿ”ฅ CRITICAL RULE: Candidate must be better than baseline (f(Sโ‚€)) to enter Pareto front Exception: Seed prompt (Sโ‚€) itself is always added as baseline """ # Get notation for candidate with better mapping if candidate_type == 'gepa_reflection': cand_notation = 'Sแตฃ' elif candidate_type == 'llego_crossover' or candidate_type == 'llego_crossover1' or candidate_type == 'llego_crossover2': cand_notation = 'Oโ‚“โ‚’' elif candidate_type == 'llego_mutation' or candidate_type == 'llego_mutation1' or candidate_type == 'llego_mutation2': cand_notation = 'Oโ‚˜แตคโ‚œ' elif candidate_type == 'seed': cand_notation = 'Sโ‚€' elif candidate_type == 'unknown' or not candidate_type: cand_notation = 'S' # Default for unknown else: # For any other type, use base notation cand_notation = 'S' logger.info("\n" + "โ•" * 80) logger.info(f"๐Ÿ“Š PARETO FRONT P ANALYSIS - Evaluating {cand_notation}") logger.info("โ•" * 80) logger.info(f"\n ๐Ÿ“Š Evaluating: {cand_notation} with f({cand_notation}) = {score:.4f}") # ๐Ÿ”ฅ CRITICAL BASELINE CHECK: Candidate must be better than baseline (unless it's the seed itself) # Rule: Only candidates with f(candidate) > f(Sโ‚€) can enter Pareto front # Exception: Seed prompt (Sโ‚€) itself is always added as the baseline if candidate_type == 'seed': # ๐Ÿ”ฅ FIX: Check if seed prompt is already in Pareto front to prevent duplicates normalized_prompt = prompt.strip().strip('"\'') for existing_cand in self.pareto_front: existing_prompt = existing_cand.get('prompt', '').strip().strip('"\'') if existing_prompt == normalized_prompt and existing_cand.get('type') == 'seed': logger.info(f"\n โš ๏ธ {cand_notation} is already in Pareto Front P (duplicate detected)") logger.info(f" Skipping duplicate seed prompt addition") front_notations = [c.get('notation', 'S') for c in self.pareto_front] logger.info(f" P = {{{', '.join(front_notations)}}}") return # Skip adding duplicate logger.info(f"\n โœ… {cand_notation} is seed prompt - always added as baseline") # Set baseline if not already set (safety check - adapter should have done this) if self.baseline_score is None: self.baseline_score = score logger.info(f" ๐Ÿ’ก Setting baseline: f(Sโ‚€) = {score:.4f}") # Add seed to Pareto front immediately (no dominance check needed) self.pareto_front.append({ 'prompt': prompt, 'score': score, 'type': candidate_type, 'notation': cand_notation }) self.pareto_front.sort(key=lambda x: x['score'], reverse=True) # Display Pareto front with seed front_notations = [c.get('notation', 'S') for c in self.pareto_front] logger.info(f"\n โœ… ADDED to Pareto Front P (baseline)") logger.info(f" P = {{{', '.join(front_notations)}}}") self._display_pareto_front() return # Seed is always added - skip dominance check else: # For non-seed candidates, must be better than baseline to proceed if self.baseline_score is not None: if score > self.baseline_score: logger.info(f"\n โœ… {cand_notation} meets baseline requirement:") logger.info(f" f(Sโ‚€) = {self.baseline_score:.4f} (baseline)") logger.info(f" f({cand_notation}) = {score:.4f}") logger.info(f" f({cand_notation}) > f(Sโ‚€) โ†’ Can be added to Pareto front") logger.info(f" Improvement over baseline: +{score - self.baseline_score:.4f}") else: logger.info(f"\n โŒ {cand_notation} does NOT meet baseline requirement:") logger.info(f" f(Sโ‚€) = {self.baseline_score:.4f} (baseline)") logger.info(f" f({cand_notation}) = {score:.4f}") logger.info(f" f({cand_notation}) โ‰ค f(Sโ‚€) โ†’ NOT ADDED to Pareto front") logger.info(f" ๐Ÿ’ก Only candidates better than baseline can enter Pareto front") logger.info(f" ๐Ÿ’ก Difference: {score - self.baseline_score:.4f} (needs to be > 0)") return # Skip Pareto front update - candidate is not better than baseline else: # CRITICAL: Baseline must be set before evaluating any non-seed candidates logger.error(f"\n โŒ CRITICAL ERROR: Baseline score not set!") logger.error(f" Cannot evaluate {cand_notation} without baseline f(Sโ‚€)") logger.error(f" ๐Ÿ’ก Seed prompt must be evaluated on Dpareto first") logger.error(f" ๐Ÿ’ก Rejecting candidate to maintain correctness") # Debug logging removed - not needed in production return # Reject candidate - baseline is required # Check if this candidate dominates any in current front dominated = [] for i, front_candidate in enumerate(self.pareto_front): front_score = front_candidate['score'] front_notation = front_candidate.get('notation', 'S') # Simple dominance: higher score dominates if score > front_score: dominated.append(i) logger.info(f"\n โœ… {cand_notation} DOMINATES P{i+1}:") logger.info(f" f(P{i+1}) = {front_score:.4f}") logger.info(f" f({cand_notation}) = {score:.4f}") logger.info(f" f({cand_notation}) > f({front_notation}) โ†’ DOMINANCE") logger.info(f" Improvement: +{score - front_score:.4f}") if dominated: # Remove dominated candidates for i in reversed(dominated): removed = self.pareto_front.pop(i) removed_notation = removed.get('notation', 'S') logger.info(f" โžก๏ธ Removing {removed_notation} from Pareto front P (dominated by {cand_notation})") # Add new candidate self.pareto_front.append({ 'prompt': prompt, 'score': score, 'type': candidate_type, 'notation': cand_notation }) # Sort by score self.pareto_front.sort(key=lambda x: x['score'], reverse=True) # Display Pareto front with candidate notations front_notations = [c.get('notation', 'S') for c in self.pareto_front] logger.info(f"\n โœ… ADDED to Pareto Front P") logger.info(f" P = {{{', '.join(front_notations)}}}") else: # Check if any in front dominates this candidate is_dominated = False for i, front_candidate in enumerate(self.pareto_front): if front_candidate['score'] > score: front_notation = front_candidate.get('notation', 'S') logger.info(f"\n โŒ {cand_notation} is DOMINATED by {front_notation}:") logger.info(f" f({front_notation}) = {front_candidate['score']:.4f}") logger.info(f" f({cand_notation}) = {score:.4f}") logger.info(f" f({front_notation}) > f({cand_notation}) โ†’ DOMINATED") logger.info(f" Difference: {score - front_candidate['score']:.4f}") is_dominated = True break if not is_dominated: # Check for equal scores (for single-objective, we can add if non-dominated) equal_candidates = [c.get('notation', 'S') for c in self.pareto_front if abs(c['score'] - score) < 1e-6] # Non-dominated: add to front self.pareto_front.append({ 'prompt': prompt, 'score': score, 'type': candidate_type, 'notation': cand_notation }) self.pareto_front.sort(key=lambda x: x['score'], reverse=True) # Display Pareto front with candidate notations front_notations = [c.get('notation', 'S') for c in self.pareto_front] if equal_candidates: logger.info(f"\n โœ… ADDED to Pareto Front P (non-dominated)") logger.info(f" f({cand_notation}) = {score:.4f} (same score as {', '.join(equal_candidates)})") logger.info(f" P = {{{', '.join(front_notations)}}}") else: logger.info(f"\n โœ… ADDED to Pareto Front P (non-dominated)") logger.info(f" {cand_notation} is non-dominated โ†’ kept in P") logger.info(f" P = {{{', '.join(front_notations)}}}") else: # Show all dominating candidates with their notations dominating_list = [(c.get('notation', 'S'), c['score']) for c in self.pareto_front if c['score'] > score] if dominating_list: for dom_notation, dom_score in dominating_list: logger.info(f"\n โŒ {cand_notation} is DOMINATED by {dom_notation}:") logger.info(f" f({dom_notation}) = {dom_score:.4f}") logger.info(f" f({cand_notation}) = {score:.4f}") logger.info(f" f({dom_notation}) > f({cand_notation}) โ†’ DOMINATED") logger.info(f"\n โŒ NOT ADDED to Pareto Front P (dominated)") self._display_pareto_front() def _display_pareto_front(self): """Display current Pareto front with candidate notation""" logger.info(f"\n๐Ÿ“‹ CURRENT PARETO FRONT P (Size: |P| = {len(self.pareto_front)}):") logger.info("โ”€" * 80) if not self.pareto_front: logger.info(" P = {} (Empty - no candidates added yet)") logger.info(" ๐Ÿ’ก NOTATION: P = Pareto front (non-dominated solutions)") return # Display Pareto front using candidate notations instead of P1, P2, etc. front_notations = [c.get('notation', 'S') for c in self.pareto_front] logger.info(f" P = {{{', '.join(front_notations)}}}") for candidate in self.pareto_front: notation = candidate.get('notation', 'S') # Enhanced type labels with full notation type_labels = { 'seed': ('๐ŸŒฑ Seed Prompt', 'Sโ‚€'), 'gepa_reflection': ('๐Ÿ“ GEPA Reflection Candidate', 'Sแตฃ'), 'llego_crossover': ('๐Ÿ”€ LLEGO Crossover Offspring', 'Oโ‚“โ‚’'), 'llego_mutation': ('๐ŸŽฒ LLEGO Mutation Offspring', 'Oโ‚˜แตคโ‚œ'), 'unknown': ('๐Ÿ”„ Unknown Candidate', 'S') } cand_type = candidate.get('type', 'unknown') type_label, type_notation = type_labels.get(cand_type, (f'๐Ÿ”„ {cand_type}', notation)) # Use the notation from the candidate if available, otherwise use type notation display_notation = notation if notation != 'S' else type_notation logger.info(f"\n {display_notation}: {type_label}") logger.info(f" f({display_notation}) = {candidate['score']:.4f}") prompt_preview = candidate['prompt'][:150] if len(candidate['prompt']) > 150 else candidate['prompt'] logger.info(f" Prompt ({len(candidate['prompt'])} chars): {prompt_preview}{'...' if len(candidate['prompt']) > 150 else ''}") logger.info(f"\n ๐Ÿ’ก NOTATION EXPLANATION:") logger.info(f" P = Pareto front (set of non-dominated solutions)") logger.info(f" Sโ‚€ = Seed prompt (baseline)") logger.info(f" Sแตฃ = GEPA Reflection candidate") logger.info(f" Oโ‚“โ‚’ = LLEGO Crossover offspring (combines parents)") logger.info(f" Oโ‚˜แตคโ‚œ = LLEGO Mutation offspring (explores variations)") logger.info(f" f({', '.join(front_notations[:3])}) = Fitness scores of candidates in Pareto front") logger.info("โ”€" * 80) def set_baseline(self, score: float): """Set baseline score for comparison""" self.baseline_score = score # Add seed to Pareto front if we have it if self.pareto_front: seed_candidate = self.pareto_front[0] # First is usually seed seed_candidate['baseline_score'] = score def batch_update_pareto_front(self, candidates_with_scores: List[Dict]) -> List[Dict]: """ ๐Ÿ”ฅ BATCH PARETO FRONT UPDATE Efficiently update Pareto front with multiple candidates in one operation. Steps: 1. Filter by baseline (score > baseline_score) 2. Find non-dominated among filtered candidates 3. Compare with current Pareto front 4. Update Pareto front (remove dominated, add non-dominated) Args: candidates_with_scores: List of dicts with keys: - 'prompt': str - 'score': float - 'type': str (candidate_type) - 'notation': str (optional, will be generated if missing) Returns: List of candidates that were added to Pareto front """ if not candidates_with_scores: return [] logger.info("\n" + "โ•" * 80) logger.info(f"๐Ÿ”ฅ BATCH PARETO FRONT UPDATE - Processing {len(candidates_with_scores)} candidates") logger.info("โ•" * 80) # Step 0: Deduplicate input candidates by prompt text seen_prompts = set() deduplicated_candidates = [] for cand in candidates_with_scores: normalized_prompt = cand.get('prompt', '').strip().strip('"\'') if normalized_prompt not in seen_prompts: seen_prompts.add(normalized_prompt) deduplicated_candidates.append(cand) else: logger.info(f" โš ๏ธ Skipping duplicate candidate: {cand.get('notation', 'S')} (prompt already in batch)") if len(deduplicated_candidates) < len(candidates_with_scores): logger.info(f" ๐Ÿ“Š Deduplicated: {len(candidates_with_scores)} โ†’ {len(deduplicated_candidates)} candidates") candidates_with_scores = deduplicated_candidates # Step 1: Filter by baseline (score > baseline_score) if self.baseline_score is None: logger.error("โŒ Baseline score not set - cannot perform batch update") logger.error(" ๐Ÿ’ก Seed prompt must be evaluated on Dpareto first") return [] baseline = self.baseline_score filtered = [] for cand in candidates_with_scores: score = cand.get('score', 0.0) cand_type = cand.get('type', 'unknown') # Seed is always included (it's the baseline) if cand_type == 'seed': # ๐Ÿ”ฅ FIX: Check if seed is already in Pareto front normalized_prompt = cand.get('prompt', '').strip().strip('"\'') already_in_front = False for existing_cand in self.pareto_front: existing_prompt = existing_cand.get('prompt', '').strip().strip('"\'') if existing_prompt == normalized_prompt and existing_cand.get('type') == 'seed': already_in_front = True logger.info(f" โš ๏ธ Seed prompt already in Pareto front - skipping duplicate") break if not already_in_front: filtered.append(cand) continue # Non-seed candidates must be better than baseline if score > baseline: filtered.append(cand) logger.info(f" โœ… {cand.get('notation', 'S')} passes baseline: f={score:.4f} > f(Sโ‚€)={baseline:.4f}") else: notation = cand.get('notation', 'S') logger.info(f" โŒ {notation} fails baseline: f={score:.4f} โ‰ค f(Sโ‚€)={baseline:.4f}") if not filtered: logger.info(f"\n โŒ No candidates pass baseline filter (baseline: {baseline:.4f})") logger.info(" ๐Ÿ’ก All candidates are worse than or equal to seed prompt") return [] logger.info(f"\n ๐Ÿ“Š After baseline filter: {len(filtered)}/{len(candidates_with_scores)} candidates remain") # Step 2: Find non-dominated among filtered candidates # Sort by score (descending) for easier dominance checking filtered_sorted = sorted(filtered, key=lambda x: x.get('score', 0.0), reverse=True) non_dominated_batch = [] for i, cand in enumerate(filtered_sorted): cand_score = cand.get('score', 0.0) cand_notation = cand.get('notation', 'S') is_dominated = False # Check if dominated by any other candidate in batch for other in filtered_sorted[:i]: # Only check candidates with higher scores other_score = other.get('score', 0.0) if other_score > cand_score: other_notation = other.get('notation', 'S') logger.info(f" โŒ {cand_notation} dominated by {other_notation} in batch: f({other_notation})={other_score:.4f} > f({cand_notation})={cand_score:.4f}") is_dominated = True break if not is_dominated: non_dominated_batch.append(cand) logger.info(f" โœ… {cand_notation} is non-dominated in batch: f={cand_score:.4f}") logger.info(f"\n ๐Ÿ“Š After batch dominance check: {len(non_dominated_batch)}/{len(filtered)} non-dominated candidates") if not non_dominated_batch: logger.info(" โŒ No non-dominated candidates in batch") return [] # Step 3: Compare with current Pareto front and update added_to_front = [] candidates_to_remove = [] # First, check which current front candidates are dominated by new batch for front_cand in self.pareto_front: front_score = front_cand.get('score', 0.0) front_notation = front_cand.get('notation', 'S') # Check if any new candidate dominates this front candidate for new_cand in non_dominated_batch: new_score = new_cand.get('score', 0.0) new_notation = new_cand.get('notation', 'S') if new_score > front_score: candidates_to_remove.append(front_cand) logger.info(f" โžก๏ธ {front_notation} will be removed (dominated by {new_notation}): f({front_notation})={front_score:.4f} < f({new_notation})={new_score:.4f}") break # Remove dominated candidates from front for cand_to_remove in candidates_to_remove: if cand_to_remove in self.pareto_front: self.pareto_front.remove(cand_to_remove) # Now add non-dominated new candidates (check they're not dominated by remaining front) for new_cand in non_dominated_batch: new_score = new_cand.get('score', 0.0) new_notation = new_cand.get('notation', 'S') new_type = new_cand.get('type', 'unknown') new_prompt = new_cand.get('prompt', '') # Check if dominated by any remaining front candidate is_dominated_by_front = False for front_cand in self.pareto_front: front_score = front_cand.get('score', 0.0) if front_score > new_score: front_notation = front_cand.get('notation', 'S') logger.info(f" โŒ {new_notation} dominated by existing {front_notation}: f({front_notation})={front_score:.4f} > f({new_notation})={new_score:.4f}") is_dominated_by_front = True break if not is_dominated_by_front: # Generate notation if missing if 'notation' not in new_cand: if new_type == 'gepa_reflection': new_notation = 'Sแตฃ' elif new_type.startswith('llego_crossover'): new_notation = 'Oโ‚“โ‚’' elif new_type.startswith('llego_mutation'): new_notation = 'Oโ‚˜แตคโ‚œ' elif new_type == 'seed': new_notation = 'Sโ‚€' else: new_notation = 'S' # Add to Pareto front front_entry = { 'prompt': new_prompt, 'score': new_score, 'type': new_type, 'notation': new_notation } self.pareto_front.append(front_entry) added_to_front.append(new_cand) # Also log to candidates_evaluated for tracking self.candidates_evaluated.append({ 'prompt': new_prompt, 'score': new_score, 'type': new_type, 'dataset': 'dpareto' }) logger.info(f" โœ… {new_notation} ADDED to Pareto front: f={new_score:.4f}") # Sort Pareto front by score self.pareto_front.sort(key=lambda x: x.get('score', 0.0), reverse=True) # Display updated Pareto front logger.info(f"\n{'โ•'*80}") logger.info(f"โœ… BATCH UPDATE COMPLETE") logger.info(f" Added: {len(added_to_front)} candidates") logger.info(f" Removed: {len(candidates_to_remove)} dominated candidates") logger.info(f" Pareto front size: |P| = {len(self.pareto_front)}") front_notations = [c.get('notation', 'S') for c in self.pareto_front] logger.info(f" P = {{{', '.join(front_notations)}}}") self._display_pareto_front() logger.info("โ•" * 80 + "\n") return added_to_front # Global instance _pareto_logger = ParetoLogger() def get_pareto_logger() -> ParetoLogger: """Get global Pareto logger instance""" return _pareto_logger def reset_pareto_logger() -> ParetoLogger: """Reset global Pareto logger instance (for new runs)""" global _pareto_logger _pareto_logger = ParetoLogger() # Debug logging removed - not needed in production return _pareto_logger