Suhasdev's picture
Fix duplicate seed prompts in Pareto front - add deduplication checks
74d4bea
"""
Pareto Front Logger - Tracks candidate comparisons and Pareto front updates
"""
from typing import Dict, List, Optional
from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
class ParetoLogger:
"""Tracks evaluations and Pareto front updates"""
def __init__(self):
self.candidates_evaluated = [] # List of (prompt, score, type, dataset)
self.pareto_front = [] # Current Pareto front (prompt, score, type)
self.baseline_score = None
def log_candidate_evaluation(self, prompt: str, score: float, candidate_type: str, dataset_type: str):
"""Log a candidate evaluation"""
self.candidates_evaluated.append({
'prompt': prompt,
'score': score,
'type': candidate_type,
'dataset': dataset_type
})
# If evaluated on Dpareto, check against Pareto front
if dataset_type == 'dpareto':
self._check_pareto_update(prompt, score, candidate_type)
def _check_pareto_update(self, prompt: str, score: float, candidate_type: str):
"""Check if candidate should be added to Pareto front
πŸ”₯ CRITICAL RULE: Candidate must be better than baseline (f(Sβ‚€)) to enter Pareto front
Exception: Seed prompt (Sβ‚€) itself is always added as baseline
"""
# Get notation for candidate with better mapping
if candidate_type == 'gepa_reflection':
cand_notation = 'Sα΅£'
elif candidate_type == 'llego_crossover' or candidate_type == 'llego_crossover1' or candidate_type == 'llego_crossover2':
cand_notation = 'Oβ‚“β‚’'
elif candidate_type == 'llego_mutation' or candidate_type == 'llego_mutation1' or candidate_type == 'llego_mutation2':
cand_notation = 'Oβ‚˜α΅€β‚œ'
elif candidate_type == 'seed':
cand_notation = 'Sβ‚€'
elif candidate_type == 'unknown' or not candidate_type:
cand_notation = 'S' # Default for unknown
else:
# For any other type, use base notation
cand_notation = 'S'
logger.info("\n" + "═" * 80)
logger.info(f"πŸ“Š PARETO FRONT P ANALYSIS - Evaluating {cand_notation}")
logger.info("═" * 80)
logger.info(f"\n πŸ“Š Evaluating: {cand_notation} with f({cand_notation}) = {score:.4f}")
# πŸ”₯ CRITICAL BASELINE CHECK: Candidate must be better than baseline (unless it's the seed itself)
# Rule: Only candidates with f(candidate) > f(Sβ‚€) can enter Pareto front
# Exception: Seed prompt (Sβ‚€) itself is always added as the baseline
if candidate_type == 'seed':
# πŸ”₯ FIX: Check if seed prompt is already in Pareto front to prevent duplicates
normalized_prompt = prompt.strip().strip('"\'')
for existing_cand in self.pareto_front:
existing_prompt = existing_cand.get('prompt', '').strip().strip('"\'')
if existing_prompt == normalized_prompt and existing_cand.get('type') == 'seed':
logger.info(f"\n ⚠️ {cand_notation} is already in Pareto Front P (duplicate detected)")
logger.info(f" Skipping duplicate seed prompt addition")
front_notations = [c.get('notation', 'S') for c in self.pareto_front]
logger.info(f" P = {{{', '.join(front_notations)}}}")
return # Skip adding duplicate
logger.info(f"\n βœ… {cand_notation} is seed prompt - always added as baseline")
# Set baseline if not already set (safety check - adapter should have done this)
if self.baseline_score is None:
self.baseline_score = score
logger.info(f" πŸ’‘ Setting baseline: f(Sβ‚€) = {score:.4f}")
# Add seed to Pareto front immediately (no dominance check needed)
self.pareto_front.append({
'prompt': prompt,
'score': score,
'type': candidate_type,
'notation': cand_notation
})
self.pareto_front.sort(key=lambda x: x['score'], reverse=True)
# Display Pareto front with seed
front_notations = [c.get('notation', 'S') for c in self.pareto_front]
logger.info(f"\n βœ… ADDED to Pareto Front P (baseline)")
logger.info(f" P = {{{', '.join(front_notations)}}}")
self._display_pareto_front()
return # Seed is always added - skip dominance check
else:
# For non-seed candidates, must be better than baseline to proceed
if self.baseline_score is not None:
if score > self.baseline_score:
logger.info(f"\n βœ… {cand_notation} meets baseline requirement:")
logger.info(f" f(Sβ‚€) = {self.baseline_score:.4f} (baseline)")
logger.info(f" f({cand_notation}) = {score:.4f}")
logger.info(f" f({cand_notation}) > f(Sβ‚€) β†’ Can be added to Pareto front")
logger.info(f" Improvement over baseline: +{score - self.baseline_score:.4f}")
else:
logger.info(f"\n ❌ {cand_notation} does NOT meet baseline requirement:")
logger.info(f" f(Sβ‚€) = {self.baseline_score:.4f} (baseline)")
logger.info(f" f({cand_notation}) = {score:.4f}")
logger.info(f" f({cand_notation}) ≀ f(Sβ‚€) β†’ NOT ADDED to Pareto front")
logger.info(f" πŸ’‘ Only candidates better than baseline can enter Pareto front")
logger.info(f" πŸ’‘ Difference: {score - self.baseline_score:.4f} (needs to be > 0)")
return # Skip Pareto front update - candidate is not better than baseline
else:
# CRITICAL: Baseline must be set before evaluating any non-seed candidates
logger.error(f"\n ❌ CRITICAL ERROR: Baseline score not set!")
logger.error(f" Cannot evaluate {cand_notation} without baseline f(Sβ‚€)")
logger.error(f" πŸ’‘ Seed prompt must be evaluated on Dpareto first")
logger.error(f" πŸ’‘ Rejecting candidate to maintain correctness")
# Debug logging removed - not needed in production
return # Reject candidate - baseline is required
# Check if this candidate dominates any in current front
dominated = []
for i, front_candidate in enumerate(self.pareto_front):
front_score = front_candidate['score']
front_notation = front_candidate.get('notation', 'S')
# Simple dominance: higher score dominates
if score > front_score:
dominated.append(i)
logger.info(f"\n βœ… {cand_notation} DOMINATES P{i+1}:")
logger.info(f" f(P{i+1}) = {front_score:.4f}")
logger.info(f" f({cand_notation}) = {score:.4f}")
logger.info(f" f({cand_notation}) > f({front_notation}) β†’ DOMINANCE")
logger.info(f" Improvement: +{score - front_score:.4f}")
if dominated:
# Remove dominated candidates
for i in reversed(dominated):
removed = self.pareto_front.pop(i)
removed_notation = removed.get('notation', 'S')
logger.info(f" ➑️ Removing {removed_notation} from Pareto front P (dominated by {cand_notation})")
# Add new candidate
self.pareto_front.append({
'prompt': prompt,
'score': score,
'type': candidate_type,
'notation': cand_notation
})
# Sort by score
self.pareto_front.sort(key=lambda x: x['score'], reverse=True)
# Display Pareto front with candidate notations
front_notations = [c.get('notation', 'S') for c in self.pareto_front]
logger.info(f"\n βœ… ADDED to Pareto Front P")
logger.info(f" P = {{{', '.join(front_notations)}}}")
else:
# Check if any in front dominates this candidate
is_dominated = False
for i, front_candidate in enumerate(self.pareto_front):
if front_candidate['score'] > score:
front_notation = front_candidate.get('notation', 'S')
logger.info(f"\n ❌ {cand_notation} is DOMINATED by {front_notation}:")
logger.info(f" f({front_notation}) = {front_candidate['score']:.4f}")
logger.info(f" f({cand_notation}) = {score:.4f}")
logger.info(f" f({front_notation}) > f({cand_notation}) β†’ DOMINATED")
logger.info(f" Difference: {score - front_candidate['score']:.4f}")
is_dominated = True
break
if not is_dominated:
# Check for equal scores (for single-objective, we can add if non-dominated)
equal_candidates = [c.get('notation', 'S') for c in self.pareto_front if abs(c['score'] - score) < 1e-6]
# Non-dominated: add to front
self.pareto_front.append({
'prompt': prompt,
'score': score,
'type': candidate_type,
'notation': cand_notation
})
self.pareto_front.sort(key=lambda x: x['score'], reverse=True)
# Display Pareto front with candidate notations
front_notations = [c.get('notation', 'S') for c in self.pareto_front]
if equal_candidates:
logger.info(f"\n βœ… ADDED to Pareto Front P (non-dominated)")
logger.info(f" f({cand_notation}) = {score:.4f} (same score as {', '.join(equal_candidates)})")
logger.info(f" P = {{{', '.join(front_notations)}}}")
else:
logger.info(f"\n βœ… ADDED to Pareto Front P (non-dominated)")
logger.info(f" {cand_notation} is non-dominated β†’ kept in P")
logger.info(f" P = {{{', '.join(front_notations)}}}")
else:
# Show all dominating candidates with their notations
dominating_list = [(c.get('notation', 'S'), c['score']) for c in self.pareto_front if c['score'] > score]
if dominating_list:
for dom_notation, dom_score in dominating_list:
logger.info(f"\n ❌ {cand_notation} is DOMINATED by {dom_notation}:")
logger.info(f" f({dom_notation}) = {dom_score:.4f}")
logger.info(f" f({cand_notation}) = {score:.4f}")
logger.info(f" f({dom_notation}) > f({cand_notation}) β†’ DOMINATED")
logger.info(f"\n ❌ NOT ADDED to Pareto Front P (dominated)")
self._display_pareto_front()
def _display_pareto_front(self):
"""Display current Pareto front with candidate notation"""
logger.info(f"\nπŸ“‹ CURRENT PARETO FRONT P (Size: |P| = {len(self.pareto_front)}):")
logger.info("─" * 80)
if not self.pareto_front:
logger.info(" P = {} (Empty - no candidates added yet)")
logger.info(" πŸ’‘ NOTATION: P = Pareto front (non-dominated solutions)")
return
# Display Pareto front using candidate notations instead of P1, P2, etc.
front_notations = [c.get('notation', 'S') for c in self.pareto_front]
logger.info(f" P = {{{', '.join(front_notations)}}}")
for candidate in self.pareto_front:
notation = candidate.get('notation', 'S')
# Enhanced type labels with full notation
type_labels = {
'seed': ('🌱 Seed Prompt', 'Sβ‚€'),
'gepa_reflection': ('πŸ“ GEPA Reflection Candidate', 'Sα΅£'),
'llego_crossover': ('πŸ”€ LLEGO Crossover Offspring', 'Oβ‚“β‚’'),
'llego_mutation': ('🎲 LLEGO Mutation Offspring', 'Oβ‚˜α΅€β‚œ'),
'unknown': ('πŸ”„ Unknown Candidate', 'S')
}
cand_type = candidate.get('type', 'unknown')
type_label, type_notation = type_labels.get(cand_type, (f'πŸ”„ {cand_type}', notation))
# Use the notation from the candidate if available, otherwise use type notation
display_notation = notation if notation != 'S' else type_notation
logger.info(f"\n {display_notation}: {type_label}")
logger.info(f" f({display_notation}) = {candidate['score']:.4f}")
prompt_preview = candidate['prompt'][:150] if len(candidate['prompt']) > 150 else candidate['prompt']
logger.info(f" Prompt ({len(candidate['prompt'])} chars): {prompt_preview}{'...' if len(candidate['prompt']) > 150 else ''}")
logger.info(f"\n πŸ’‘ NOTATION EXPLANATION:")
logger.info(f" P = Pareto front (set of non-dominated solutions)")
logger.info(f" Sβ‚€ = Seed prompt (baseline)")
logger.info(f" Sα΅£ = GEPA Reflection candidate")
logger.info(f" Oβ‚“β‚’ = LLEGO Crossover offspring (combines parents)")
logger.info(f" Oβ‚˜α΅€β‚œ = LLEGO Mutation offspring (explores variations)")
logger.info(f" f({', '.join(front_notations[:3])}) = Fitness scores of candidates in Pareto front")
logger.info("─" * 80)
def set_baseline(self, score: float):
"""Set baseline score for comparison"""
self.baseline_score = score
# Add seed to Pareto front if we have it
if self.pareto_front:
seed_candidate = self.pareto_front[0] # First is usually seed
seed_candidate['baseline_score'] = score
def batch_update_pareto_front(self, candidates_with_scores: List[Dict]) -> List[Dict]:
"""
πŸ”₯ BATCH PARETO FRONT UPDATE
Efficiently update Pareto front with multiple candidates in one operation.
Steps:
1. Filter by baseline (score > baseline_score)
2. Find non-dominated among filtered candidates
3. Compare with current Pareto front
4. Update Pareto front (remove dominated, add non-dominated)
Args:
candidates_with_scores: List of dicts with keys:
- 'prompt': str
- 'score': float
- 'type': str (candidate_type)
- 'notation': str (optional, will be generated if missing)
Returns:
List of candidates that were added to Pareto front
"""
if not candidates_with_scores:
return []
logger.info("\n" + "═" * 80)
logger.info(f"πŸ”₯ BATCH PARETO FRONT UPDATE - Processing {len(candidates_with_scores)} candidates")
logger.info("═" * 80)
# Step 0: Deduplicate input candidates by prompt text
seen_prompts = set()
deduplicated_candidates = []
for cand in candidates_with_scores:
normalized_prompt = cand.get('prompt', '').strip().strip('"\'')
if normalized_prompt not in seen_prompts:
seen_prompts.add(normalized_prompt)
deduplicated_candidates.append(cand)
else:
logger.info(f" ⚠️ Skipping duplicate candidate: {cand.get('notation', 'S')} (prompt already in batch)")
if len(deduplicated_candidates) < len(candidates_with_scores):
logger.info(f" πŸ“Š Deduplicated: {len(candidates_with_scores)} β†’ {len(deduplicated_candidates)} candidates")
candidates_with_scores = deduplicated_candidates
# Step 1: Filter by baseline (score > baseline_score)
if self.baseline_score is None:
logger.error("❌ Baseline score not set - cannot perform batch update")
logger.error(" πŸ’‘ Seed prompt must be evaluated on Dpareto first")
return []
baseline = self.baseline_score
filtered = []
for cand in candidates_with_scores:
score = cand.get('score', 0.0)
cand_type = cand.get('type', 'unknown')
# Seed is always included (it's the baseline)
if cand_type == 'seed':
# πŸ”₯ FIX: Check if seed is already in Pareto front
normalized_prompt = cand.get('prompt', '').strip().strip('"\'')
already_in_front = False
for existing_cand in self.pareto_front:
existing_prompt = existing_cand.get('prompt', '').strip().strip('"\'')
if existing_prompt == normalized_prompt and existing_cand.get('type') == 'seed':
already_in_front = True
logger.info(f" ⚠️ Seed prompt already in Pareto front - skipping duplicate")
break
if not already_in_front:
filtered.append(cand)
continue
# Non-seed candidates must be better than baseline
if score > baseline:
filtered.append(cand)
logger.info(f" βœ… {cand.get('notation', 'S')} passes baseline: f={score:.4f} > f(Sβ‚€)={baseline:.4f}")
else:
notation = cand.get('notation', 'S')
logger.info(f" ❌ {notation} fails baseline: f={score:.4f} ≀ f(Sβ‚€)={baseline:.4f}")
if not filtered:
logger.info(f"\n ❌ No candidates pass baseline filter (baseline: {baseline:.4f})")
logger.info(" πŸ’‘ All candidates are worse than or equal to seed prompt")
return []
logger.info(f"\n πŸ“Š After baseline filter: {len(filtered)}/{len(candidates_with_scores)} candidates remain")
# Step 2: Find non-dominated among filtered candidates
# Sort by score (descending) for easier dominance checking
filtered_sorted = sorted(filtered, key=lambda x: x.get('score', 0.0), reverse=True)
non_dominated_batch = []
for i, cand in enumerate(filtered_sorted):
cand_score = cand.get('score', 0.0)
cand_notation = cand.get('notation', 'S')
is_dominated = False
# Check if dominated by any other candidate in batch
for other in filtered_sorted[:i]: # Only check candidates with higher scores
other_score = other.get('score', 0.0)
if other_score > cand_score:
other_notation = other.get('notation', 'S')
logger.info(f" ❌ {cand_notation} dominated by {other_notation} in batch: f({other_notation})={other_score:.4f} > f({cand_notation})={cand_score:.4f}")
is_dominated = True
break
if not is_dominated:
non_dominated_batch.append(cand)
logger.info(f" βœ… {cand_notation} is non-dominated in batch: f={cand_score:.4f}")
logger.info(f"\n πŸ“Š After batch dominance check: {len(non_dominated_batch)}/{len(filtered)} non-dominated candidates")
if not non_dominated_batch:
logger.info(" ❌ No non-dominated candidates in batch")
return []
# Step 3: Compare with current Pareto front and update
added_to_front = []
candidates_to_remove = []
# First, check which current front candidates are dominated by new batch
for front_cand in self.pareto_front:
front_score = front_cand.get('score', 0.0)
front_notation = front_cand.get('notation', 'S')
# Check if any new candidate dominates this front candidate
for new_cand in non_dominated_batch:
new_score = new_cand.get('score', 0.0)
new_notation = new_cand.get('notation', 'S')
if new_score > front_score:
candidates_to_remove.append(front_cand)
logger.info(f" ➑️ {front_notation} will be removed (dominated by {new_notation}): f({front_notation})={front_score:.4f} < f({new_notation})={new_score:.4f}")
break
# Remove dominated candidates from front
for cand_to_remove in candidates_to_remove:
if cand_to_remove in self.pareto_front:
self.pareto_front.remove(cand_to_remove)
# Now add non-dominated new candidates (check they're not dominated by remaining front)
for new_cand in non_dominated_batch:
new_score = new_cand.get('score', 0.0)
new_notation = new_cand.get('notation', 'S')
new_type = new_cand.get('type', 'unknown')
new_prompt = new_cand.get('prompt', '')
# Check if dominated by any remaining front candidate
is_dominated_by_front = False
for front_cand in self.pareto_front:
front_score = front_cand.get('score', 0.0)
if front_score > new_score:
front_notation = front_cand.get('notation', 'S')
logger.info(f" ❌ {new_notation} dominated by existing {front_notation}: f({front_notation})={front_score:.4f} > f({new_notation})={new_score:.4f}")
is_dominated_by_front = True
break
if not is_dominated_by_front:
# Generate notation if missing
if 'notation' not in new_cand:
if new_type == 'gepa_reflection':
new_notation = 'Sα΅£'
elif new_type.startswith('llego_crossover'):
new_notation = 'Oβ‚“β‚’'
elif new_type.startswith('llego_mutation'):
new_notation = 'Oβ‚˜α΅€β‚œ'
elif new_type == 'seed':
new_notation = 'Sβ‚€'
else:
new_notation = 'S'
# Add to Pareto front
front_entry = {
'prompt': new_prompt,
'score': new_score,
'type': new_type,
'notation': new_notation
}
self.pareto_front.append(front_entry)
added_to_front.append(new_cand)
# Also log to candidates_evaluated for tracking
self.candidates_evaluated.append({
'prompt': new_prompt,
'score': new_score,
'type': new_type,
'dataset': 'dpareto'
})
logger.info(f" βœ… {new_notation} ADDED to Pareto front: f={new_score:.4f}")
# Sort Pareto front by score
self.pareto_front.sort(key=lambda x: x.get('score', 0.0), reverse=True)
# Display updated Pareto front
logger.info(f"\n{'═'*80}")
logger.info(f"βœ… BATCH UPDATE COMPLETE")
logger.info(f" Added: {len(added_to_front)} candidates")
logger.info(f" Removed: {len(candidates_to_remove)} dominated candidates")
logger.info(f" Pareto front size: |P| = {len(self.pareto_front)}")
front_notations = [c.get('notation', 'S') for c in self.pareto_front]
logger.info(f" P = {{{', '.join(front_notations)}}}")
self._display_pareto_front()
logger.info("═" * 80 + "\n")
return added_to_front
# Global instance
_pareto_logger = ParetoLogger()
def get_pareto_logger() -> ParetoLogger:
"""Get global Pareto logger instance"""
return _pareto_logger
def reset_pareto_logger() -> ParetoLogger:
"""Reset global Pareto logger instance (for new runs)"""
global _pareto_logger
_pareto_logger = ParetoLogger()
# Debug logging removed - not needed in production
return _pareto_logger