Spaces:
Sleeping
Sleeping
| """ | |
| Pareto Front Logger - Tracks candidate comparisons and Pareto front updates | |
| """ | |
| from typing import Dict, List, Optional | |
| from collections import defaultdict | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class ParetoLogger: | |
| """Tracks evaluations and Pareto front updates""" | |
| def __init__(self): | |
| self.candidates_evaluated = [] # List of (prompt, score, type, dataset) | |
| self.pareto_front = [] # Current Pareto front (prompt, score, type) | |
| self.baseline_score = None | |
| def log_candidate_evaluation(self, prompt: str, score: float, candidate_type: str, dataset_type: str): | |
| """Log a candidate evaluation""" | |
| self.candidates_evaluated.append({ | |
| 'prompt': prompt, | |
| 'score': score, | |
| 'type': candidate_type, | |
| 'dataset': dataset_type | |
| }) | |
| # If evaluated on Dpareto, check against Pareto front | |
| if dataset_type == 'dpareto': | |
| self._check_pareto_update(prompt, score, candidate_type) | |
| def _check_pareto_update(self, prompt: str, score: float, candidate_type: str): | |
| """Check if candidate should be added to Pareto front | |
| π₯ CRITICAL RULE: Candidate must be better than baseline (f(Sβ)) to enter Pareto front | |
| Exception: Seed prompt (Sβ) itself is always added as baseline | |
| """ | |
| # Get notation for candidate with better mapping | |
| if candidate_type == 'gepa_reflection': | |
| cand_notation = 'Sα΅£' | |
| elif candidate_type == 'llego_crossover' or candidate_type == 'llego_crossover1' or candidate_type == 'llego_crossover2': | |
| cand_notation = 'Oββ' | |
| elif candidate_type == 'llego_mutation' or candidate_type == 'llego_mutation1' or candidate_type == 'llego_mutation2': | |
| cand_notation = 'Oβα΅€β' | |
| elif candidate_type == 'seed': | |
| cand_notation = 'Sβ' | |
| elif candidate_type == 'unknown' or not candidate_type: | |
| cand_notation = 'S' # Default for unknown | |
| else: | |
| # For any other type, use base notation | |
| cand_notation = 'S' | |
| logger.info("\n" + "β" * 80) | |
| logger.info(f"π PARETO FRONT P ANALYSIS - Evaluating {cand_notation}") | |
| logger.info("β" * 80) | |
| logger.info(f"\n π Evaluating: {cand_notation} with f({cand_notation}) = {score:.4f}") | |
| # π₯ CRITICAL BASELINE CHECK: Candidate must be better than baseline (unless it's the seed itself) | |
| # Rule: Only candidates with f(candidate) > f(Sβ) can enter Pareto front | |
| # Exception: Seed prompt (Sβ) itself is always added as the baseline | |
| if candidate_type == 'seed': | |
| # π₯ FIX: Check if seed prompt is already in Pareto front to prevent duplicates | |
| normalized_prompt = prompt.strip().strip('"\'') | |
| for existing_cand in self.pareto_front: | |
| existing_prompt = existing_cand.get('prompt', '').strip().strip('"\'') | |
| if existing_prompt == normalized_prompt and existing_cand.get('type') == 'seed': | |
| logger.info(f"\n β οΈ {cand_notation} is already in Pareto Front P (duplicate detected)") | |
| logger.info(f" Skipping duplicate seed prompt addition") | |
| front_notations = [c.get('notation', 'S') for c in self.pareto_front] | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| return # Skip adding duplicate | |
| logger.info(f"\n β {cand_notation} is seed prompt - always added as baseline") | |
| # Set baseline if not already set (safety check - adapter should have done this) | |
| if self.baseline_score is None: | |
| self.baseline_score = score | |
| logger.info(f" π‘ Setting baseline: f(Sβ) = {score:.4f}") | |
| # Add seed to Pareto front immediately (no dominance check needed) | |
| self.pareto_front.append({ | |
| 'prompt': prompt, | |
| 'score': score, | |
| 'type': candidate_type, | |
| 'notation': cand_notation | |
| }) | |
| self.pareto_front.sort(key=lambda x: x['score'], reverse=True) | |
| # Display Pareto front with seed | |
| front_notations = [c.get('notation', 'S') for c in self.pareto_front] | |
| logger.info(f"\n β ADDED to Pareto Front P (baseline)") | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| self._display_pareto_front() | |
| return # Seed is always added - skip dominance check | |
| else: | |
| # For non-seed candidates, must be better than baseline to proceed | |
| if self.baseline_score is not None: | |
| if score > self.baseline_score: | |
| logger.info(f"\n β {cand_notation} meets baseline requirement:") | |
| logger.info(f" f(Sβ) = {self.baseline_score:.4f} (baseline)") | |
| logger.info(f" f({cand_notation}) = {score:.4f}") | |
| logger.info(f" f({cand_notation}) > f(Sβ) β Can be added to Pareto front") | |
| logger.info(f" Improvement over baseline: +{score - self.baseline_score:.4f}") | |
| else: | |
| logger.info(f"\n β {cand_notation} does NOT meet baseline requirement:") | |
| logger.info(f" f(Sβ) = {self.baseline_score:.4f} (baseline)") | |
| logger.info(f" f({cand_notation}) = {score:.4f}") | |
| logger.info(f" f({cand_notation}) β€ f(Sβ) β NOT ADDED to Pareto front") | |
| logger.info(f" π‘ Only candidates better than baseline can enter Pareto front") | |
| logger.info(f" π‘ Difference: {score - self.baseline_score:.4f} (needs to be > 0)") | |
| return # Skip Pareto front update - candidate is not better than baseline | |
| else: | |
| # CRITICAL: Baseline must be set before evaluating any non-seed candidates | |
| logger.error(f"\n β CRITICAL ERROR: Baseline score not set!") | |
| logger.error(f" Cannot evaluate {cand_notation} without baseline f(Sβ)") | |
| logger.error(f" π‘ Seed prompt must be evaluated on Dpareto first") | |
| logger.error(f" π‘ Rejecting candidate to maintain correctness") | |
| # Debug logging removed - not needed in production | |
| return # Reject candidate - baseline is required | |
| # Check if this candidate dominates any in current front | |
| dominated = [] | |
| for i, front_candidate in enumerate(self.pareto_front): | |
| front_score = front_candidate['score'] | |
| front_notation = front_candidate.get('notation', 'S') | |
| # Simple dominance: higher score dominates | |
| if score > front_score: | |
| dominated.append(i) | |
| logger.info(f"\n β {cand_notation} DOMINATES P{i+1}:") | |
| logger.info(f" f(P{i+1}) = {front_score:.4f}") | |
| logger.info(f" f({cand_notation}) = {score:.4f}") | |
| logger.info(f" f({cand_notation}) > f({front_notation}) β DOMINANCE") | |
| logger.info(f" Improvement: +{score - front_score:.4f}") | |
| if dominated: | |
| # Remove dominated candidates | |
| for i in reversed(dominated): | |
| removed = self.pareto_front.pop(i) | |
| removed_notation = removed.get('notation', 'S') | |
| logger.info(f" β‘οΈ Removing {removed_notation} from Pareto front P (dominated by {cand_notation})") | |
| # Add new candidate | |
| self.pareto_front.append({ | |
| 'prompt': prompt, | |
| 'score': score, | |
| 'type': candidate_type, | |
| 'notation': cand_notation | |
| }) | |
| # Sort by score | |
| self.pareto_front.sort(key=lambda x: x['score'], reverse=True) | |
| # Display Pareto front with candidate notations | |
| front_notations = [c.get('notation', 'S') for c in self.pareto_front] | |
| logger.info(f"\n β ADDED to Pareto Front P") | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| else: | |
| # Check if any in front dominates this candidate | |
| is_dominated = False | |
| for i, front_candidate in enumerate(self.pareto_front): | |
| if front_candidate['score'] > score: | |
| front_notation = front_candidate.get('notation', 'S') | |
| logger.info(f"\n β {cand_notation} is DOMINATED by {front_notation}:") | |
| logger.info(f" f({front_notation}) = {front_candidate['score']:.4f}") | |
| logger.info(f" f({cand_notation}) = {score:.4f}") | |
| logger.info(f" f({front_notation}) > f({cand_notation}) β DOMINATED") | |
| logger.info(f" Difference: {score - front_candidate['score']:.4f}") | |
| is_dominated = True | |
| break | |
| if not is_dominated: | |
| # Check for equal scores (for single-objective, we can add if non-dominated) | |
| equal_candidates = [c.get('notation', 'S') for c in self.pareto_front if abs(c['score'] - score) < 1e-6] | |
| # Non-dominated: add to front | |
| self.pareto_front.append({ | |
| 'prompt': prompt, | |
| 'score': score, | |
| 'type': candidate_type, | |
| 'notation': cand_notation | |
| }) | |
| self.pareto_front.sort(key=lambda x: x['score'], reverse=True) | |
| # Display Pareto front with candidate notations | |
| front_notations = [c.get('notation', 'S') for c in self.pareto_front] | |
| if equal_candidates: | |
| logger.info(f"\n β ADDED to Pareto Front P (non-dominated)") | |
| logger.info(f" f({cand_notation}) = {score:.4f} (same score as {', '.join(equal_candidates)})") | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| else: | |
| logger.info(f"\n β ADDED to Pareto Front P (non-dominated)") | |
| logger.info(f" {cand_notation} is non-dominated β kept in P") | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| else: | |
| # Show all dominating candidates with their notations | |
| dominating_list = [(c.get('notation', 'S'), c['score']) for c in self.pareto_front if c['score'] > score] | |
| if dominating_list: | |
| for dom_notation, dom_score in dominating_list: | |
| logger.info(f"\n β {cand_notation} is DOMINATED by {dom_notation}:") | |
| logger.info(f" f({dom_notation}) = {dom_score:.4f}") | |
| logger.info(f" f({cand_notation}) = {score:.4f}") | |
| logger.info(f" f({dom_notation}) > f({cand_notation}) β DOMINATED") | |
| logger.info(f"\n β NOT ADDED to Pareto Front P (dominated)") | |
| self._display_pareto_front() | |
| def _display_pareto_front(self): | |
| """Display current Pareto front with candidate notation""" | |
| logger.info(f"\nπ CURRENT PARETO FRONT P (Size: |P| = {len(self.pareto_front)}):") | |
| logger.info("β" * 80) | |
| if not self.pareto_front: | |
| logger.info(" P = {} (Empty - no candidates added yet)") | |
| logger.info(" π‘ NOTATION: P = Pareto front (non-dominated solutions)") | |
| return | |
| # Display Pareto front using candidate notations instead of P1, P2, etc. | |
| front_notations = [c.get('notation', 'S') for c in self.pareto_front] | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| for candidate in self.pareto_front: | |
| notation = candidate.get('notation', 'S') | |
| # Enhanced type labels with full notation | |
| type_labels = { | |
| 'seed': ('π± Seed Prompt', 'Sβ'), | |
| 'gepa_reflection': ('π GEPA Reflection Candidate', 'Sα΅£'), | |
| 'llego_crossover': ('π LLEGO Crossover Offspring', 'Oββ'), | |
| 'llego_mutation': ('π² LLEGO Mutation Offspring', 'Oβα΅€β'), | |
| 'unknown': ('π Unknown Candidate', 'S') | |
| } | |
| cand_type = candidate.get('type', 'unknown') | |
| type_label, type_notation = type_labels.get(cand_type, (f'π {cand_type}', notation)) | |
| # Use the notation from the candidate if available, otherwise use type notation | |
| display_notation = notation if notation != 'S' else type_notation | |
| logger.info(f"\n {display_notation}: {type_label}") | |
| logger.info(f" f({display_notation}) = {candidate['score']:.4f}") | |
| prompt_preview = candidate['prompt'][:150] if len(candidate['prompt']) > 150 else candidate['prompt'] | |
| logger.info(f" Prompt ({len(candidate['prompt'])} chars): {prompt_preview}{'...' if len(candidate['prompt']) > 150 else ''}") | |
| logger.info(f"\n π‘ NOTATION EXPLANATION:") | |
| logger.info(f" P = Pareto front (set of non-dominated solutions)") | |
| logger.info(f" Sβ = Seed prompt (baseline)") | |
| logger.info(f" Sα΅£ = GEPA Reflection candidate") | |
| logger.info(f" Oββ = LLEGO Crossover offspring (combines parents)") | |
| logger.info(f" Oβα΅€β = LLEGO Mutation offspring (explores variations)") | |
| logger.info(f" f({', '.join(front_notations[:3])}) = Fitness scores of candidates in Pareto front") | |
| logger.info("β" * 80) | |
| def set_baseline(self, score: float): | |
| """Set baseline score for comparison""" | |
| self.baseline_score = score | |
| # Add seed to Pareto front if we have it | |
| if self.pareto_front: | |
| seed_candidate = self.pareto_front[0] # First is usually seed | |
| seed_candidate['baseline_score'] = score | |
| def batch_update_pareto_front(self, candidates_with_scores: List[Dict]) -> List[Dict]: | |
| """ | |
| π₯ BATCH PARETO FRONT UPDATE | |
| Efficiently update Pareto front with multiple candidates in one operation. | |
| Steps: | |
| 1. Filter by baseline (score > baseline_score) | |
| 2. Find non-dominated among filtered candidates | |
| 3. Compare with current Pareto front | |
| 4. Update Pareto front (remove dominated, add non-dominated) | |
| Args: | |
| candidates_with_scores: List of dicts with keys: | |
| - 'prompt': str | |
| - 'score': float | |
| - 'type': str (candidate_type) | |
| - 'notation': str (optional, will be generated if missing) | |
| Returns: | |
| List of candidates that were added to Pareto front | |
| """ | |
| if not candidates_with_scores: | |
| return [] | |
| logger.info("\n" + "β" * 80) | |
| logger.info(f"π₯ BATCH PARETO FRONT UPDATE - Processing {len(candidates_with_scores)} candidates") | |
| logger.info("β" * 80) | |
| # Step 0: Deduplicate input candidates by prompt text | |
| seen_prompts = set() | |
| deduplicated_candidates = [] | |
| for cand in candidates_with_scores: | |
| normalized_prompt = cand.get('prompt', '').strip().strip('"\'') | |
| if normalized_prompt not in seen_prompts: | |
| seen_prompts.add(normalized_prompt) | |
| deduplicated_candidates.append(cand) | |
| else: | |
| logger.info(f" β οΈ Skipping duplicate candidate: {cand.get('notation', 'S')} (prompt already in batch)") | |
| if len(deduplicated_candidates) < len(candidates_with_scores): | |
| logger.info(f" π Deduplicated: {len(candidates_with_scores)} β {len(deduplicated_candidates)} candidates") | |
| candidates_with_scores = deduplicated_candidates | |
| # Step 1: Filter by baseline (score > baseline_score) | |
| if self.baseline_score is None: | |
| logger.error("β Baseline score not set - cannot perform batch update") | |
| logger.error(" π‘ Seed prompt must be evaluated on Dpareto first") | |
| return [] | |
| baseline = self.baseline_score | |
| filtered = [] | |
| for cand in candidates_with_scores: | |
| score = cand.get('score', 0.0) | |
| cand_type = cand.get('type', 'unknown') | |
| # Seed is always included (it's the baseline) | |
| if cand_type == 'seed': | |
| # π₯ FIX: Check if seed is already in Pareto front | |
| normalized_prompt = cand.get('prompt', '').strip().strip('"\'') | |
| already_in_front = False | |
| for existing_cand in self.pareto_front: | |
| existing_prompt = existing_cand.get('prompt', '').strip().strip('"\'') | |
| if existing_prompt == normalized_prompt and existing_cand.get('type') == 'seed': | |
| already_in_front = True | |
| logger.info(f" β οΈ Seed prompt already in Pareto front - skipping duplicate") | |
| break | |
| if not already_in_front: | |
| filtered.append(cand) | |
| continue | |
| # Non-seed candidates must be better than baseline | |
| if score > baseline: | |
| filtered.append(cand) | |
| logger.info(f" β {cand.get('notation', 'S')} passes baseline: f={score:.4f} > f(Sβ)={baseline:.4f}") | |
| else: | |
| notation = cand.get('notation', 'S') | |
| logger.info(f" β {notation} fails baseline: f={score:.4f} β€ f(Sβ)={baseline:.4f}") | |
| if not filtered: | |
| logger.info(f"\n β No candidates pass baseline filter (baseline: {baseline:.4f})") | |
| logger.info(" π‘ All candidates are worse than or equal to seed prompt") | |
| return [] | |
| logger.info(f"\n π After baseline filter: {len(filtered)}/{len(candidates_with_scores)} candidates remain") | |
| # Step 2: Find non-dominated among filtered candidates | |
| # Sort by score (descending) for easier dominance checking | |
| filtered_sorted = sorted(filtered, key=lambda x: x.get('score', 0.0), reverse=True) | |
| non_dominated_batch = [] | |
| for i, cand in enumerate(filtered_sorted): | |
| cand_score = cand.get('score', 0.0) | |
| cand_notation = cand.get('notation', 'S') | |
| is_dominated = False | |
| # Check if dominated by any other candidate in batch | |
| for other in filtered_sorted[:i]: # Only check candidates with higher scores | |
| other_score = other.get('score', 0.0) | |
| if other_score > cand_score: | |
| other_notation = other.get('notation', 'S') | |
| logger.info(f" β {cand_notation} dominated by {other_notation} in batch: f({other_notation})={other_score:.4f} > f({cand_notation})={cand_score:.4f}") | |
| is_dominated = True | |
| break | |
| if not is_dominated: | |
| non_dominated_batch.append(cand) | |
| logger.info(f" β {cand_notation} is non-dominated in batch: f={cand_score:.4f}") | |
| logger.info(f"\n π After batch dominance check: {len(non_dominated_batch)}/{len(filtered)} non-dominated candidates") | |
| if not non_dominated_batch: | |
| logger.info(" β No non-dominated candidates in batch") | |
| return [] | |
| # Step 3: Compare with current Pareto front and update | |
| added_to_front = [] | |
| candidates_to_remove = [] | |
| # First, check which current front candidates are dominated by new batch | |
| for front_cand in self.pareto_front: | |
| front_score = front_cand.get('score', 0.0) | |
| front_notation = front_cand.get('notation', 'S') | |
| # Check if any new candidate dominates this front candidate | |
| for new_cand in non_dominated_batch: | |
| new_score = new_cand.get('score', 0.0) | |
| new_notation = new_cand.get('notation', 'S') | |
| if new_score > front_score: | |
| candidates_to_remove.append(front_cand) | |
| logger.info(f" β‘οΈ {front_notation} will be removed (dominated by {new_notation}): f({front_notation})={front_score:.4f} < f({new_notation})={new_score:.4f}") | |
| break | |
| # Remove dominated candidates from front | |
| for cand_to_remove in candidates_to_remove: | |
| if cand_to_remove in self.pareto_front: | |
| self.pareto_front.remove(cand_to_remove) | |
| # Now add non-dominated new candidates (check they're not dominated by remaining front) | |
| for new_cand in non_dominated_batch: | |
| new_score = new_cand.get('score', 0.0) | |
| new_notation = new_cand.get('notation', 'S') | |
| new_type = new_cand.get('type', 'unknown') | |
| new_prompt = new_cand.get('prompt', '') | |
| # Check if dominated by any remaining front candidate | |
| is_dominated_by_front = False | |
| for front_cand in self.pareto_front: | |
| front_score = front_cand.get('score', 0.0) | |
| if front_score > new_score: | |
| front_notation = front_cand.get('notation', 'S') | |
| logger.info(f" β {new_notation} dominated by existing {front_notation}: f({front_notation})={front_score:.4f} > f({new_notation})={new_score:.4f}") | |
| is_dominated_by_front = True | |
| break | |
| if not is_dominated_by_front: | |
| # Generate notation if missing | |
| if 'notation' not in new_cand: | |
| if new_type == 'gepa_reflection': | |
| new_notation = 'Sα΅£' | |
| elif new_type.startswith('llego_crossover'): | |
| new_notation = 'Oββ' | |
| elif new_type.startswith('llego_mutation'): | |
| new_notation = 'Oβα΅€β' | |
| elif new_type == 'seed': | |
| new_notation = 'Sβ' | |
| else: | |
| new_notation = 'S' | |
| # Add to Pareto front | |
| front_entry = { | |
| 'prompt': new_prompt, | |
| 'score': new_score, | |
| 'type': new_type, | |
| 'notation': new_notation | |
| } | |
| self.pareto_front.append(front_entry) | |
| added_to_front.append(new_cand) | |
| # Also log to candidates_evaluated for tracking | |
| self.candidates_evaluated.append({ | |
| 'prompt': new_prompt, | |
| 'score': new_score, | |
| 'type': new_type, | |
| 'dataset': 'dpareto' | |
| }) | |
| logger.info(f" β {new_notation} ADDED to Pareto front: f={new_score:.4f}") | |
| # Sort Pareto front by score | |
| self.pareto_front.sort(key=lambda x: x.get('score', 0.0), reverse=True) | |
| # Display updated Pareto front | |
| logger.info(f"\n{'β'*80}") | |
| logger.info(f"β BATCH UPDATE COMPLETE") | |
| logger.info(f" Added: {len(added_to_front)} candidates") | |
| logger.info(f" Removed: {len(candidates_to_remove)} dominated candidates") | |
| logger.info(f" Pareto front size: |P| = {len(self.pareto_front)}") | |
| front_notations = [c.get('notation', 'S') for c in self.pareto_front] | |
| logger.info(f" P = {{{', '.join(front_notations)}}}") | |
| self._display_pareto_front() | |
| logger.info("β" * 80 + "\n") | |
| return added_to_front | |
| # Global instance | |
| _pareto_logger = ParetoLogger() | |
| def get_pareto_logger() -> ParetoLogger: | |
| """Get global Pareto logger instance""" | |
| return _pareto_logger | |
| def reset_pareto_logger() -> ParetoLogger: | |
| """Reset global Pareto logger instance (for new runs)""" | |
| global _pareto_logger | |
| _pareto_logger = ParetoLogger() | |
| # Debug logging removed - not needed in production | |
| return _pareto_logger | |