Suhasdev's picture
Deploy Universal Prompt Optimizer to HF Spaces (clean)
cacd4d0
"""
LLEGO Integration Layer for GEPA.
This module provides the integration layer that wraps LLEGO genetic operators
for use with the GEPA optimization framework.
Based on: Decision Tree Induction Through LLMs via Semantically-Aware Evolution (ICLR 2025)
GitHub: https://github.com/nicolashuynh/LLEGO
"""
from typing import List, Callable, Dict, Any, Optional, Literal
import numpy as np
import logging
# Import from modular files (SOLID: Single Responsibility)
from .models import PromptCandidate, PromptMetadata
from .crossover import FitnessGuidedCrossover
from .mutation import DiversityGuidedMutation
logger = logging.getLogger(__name__)
class LLEGOIntegrationLayer:
"""
Integration layer that wraps LLEGO operators for GEPA.
This class manages the genetic algorithm workflow:
- Population initialization
- Parent selection (fitness-based)
- Crossover and mutation operations
- Population management
Design Principles:
- Composition over inheritance (uses crossover_op, mutation_op)
- Single Responsibility: Only manages GA workflow
- Open/Closed: New operators can be added without modifying this class
"""
def __init__(
self,
alpha: float = 0.05,
tau: float = 10.0,
nu: int = 4,
population_size: int = 10,
n_crossover: int = 2,
n_mutation: int = 3
):
"""
Initialize LLEGO integration layer.
Args:
alpha: Fitness extrapolation for crossover (default 0.05)
tau: Diversity temperature for mutation
nu: Parent arity for diversity sampling
population_size: Maximum population size
n_crossover: Number of crossover offspring per generation
n_mutation: Number of mutation offspring per generation
"""
self.crossover_op = FitnessGuidedCrossover(alpha=alpha)
self.mutation_op = DiversityGuidedMutation(tau=tau, nu=nu)
self.population_size = population_size
self.n_crossover = n_crossover
self.n_mutation = n_mutation
self.population: List[PromptCandidate] = []
self.current_generation = 0
# Track metadata for prompts generated in current generation
self._generation_metadata: Dict[str, PromptMetadata] = {}
logger.debug(f"LLEGO initialized: pop_size={population_size}, crossover={n_crossover}, mutation={n_mutation}")
def initialize_population(self, seed_prompt: str, initial_fitness: float = 0.5):
"""Initialize population with seed prompt."""
seed_candidate = PromptCandidate(
prompt=seed_prompt,
fitness=initial_fitness,
metadata={
'generation': 0,
'operator': 'seed',
'parent_indices': None,
'parent_prompts': None,
'target_fitness': None,
'diversity_score': None,
'sample_scores': None,
'num_diverse_parents': None
}
)
self.population = [seed_candidate]
logger.debug(f"Population initialized with seed prompt ({len(seed_prompt)} chars)")
def create_candidate_with_metadata(
self,
prompt: str,
fitness: float,
generation: int,
operator: Literal['crossover', 'mutation'],
parent_indices: Optional[List[int]] = None,
parent_prompts: Optional[List[str]] = None,
target_fitness: Optional[float] = None,
diversity_score: Optional[float] = None,
sample_scores: Optional[List[float]] = None,
num_diverse_parents: Optional[int] = None
) -> PromptCandidate:
"""Create a PromptCandidate with properly populated metadata."""
return PromptCandidate(
prompt=prompt,
fitness=fitness,
metadata={
'generation': generation,
'operator': operator,
'parent_indices': parent_indices,
'parent_prompts': parent_prompts,
'target_fitness': target_fitness,
'diversity_score': diversity_score,
'sample_scores': sample_scores,
'num_diverse_parents': num_diverse_parents
}
)
def evolve_generation(
self,
llm: Callable[[str], str],
pareto_front: List[PromptCandidate]
) -> List[str]:
"""
Evolve one generation using LLEGO operators.
When crossover cannot run (< 2 parents with scores), it is skipped.
The caller should compensate by generating extra GEPA reflection candidates.
Args:
llm: Language model callable
pareto_front: Current Pareto front (non-dominated prompts with scores)
Returns:
List of new prompt candidates to evaluate
"""
new_prompts = []
self.current_generation += 1
self._generation_metadata = {}
# Track crossover status for caller to handle compensation
self._crossover_skipped = False
self._crossover_deficit = 0
self._actual_crossover_count = 0
logger.info(f"🧬 LLEGO Generation {self.current_generation}: pareto_front={len(pareto_front)}, population={len(self.population)}")
# Crossover: Combine BEST parents (requires >= 2 parents WITH SCORES)
if len(pareto_front) >= 2:
# Sort by fitness - always use TOP scored parents for crossover
sorted_front = sorted(pareto_front, key=lambda p: p.fitness, reverse=True)
for i in range(self.n_crossover):
# Always use top 2 highest-scored parents
parents = sorted_front[:2]
target_fitness = self._calculate_target_fitness(parents)
offspring = self.crossover_op(parents, target_fitness, llm)
new_prompts.append(offspring)
self._actual_crossover_count += 1
# Store metadata with parent fitness info
self._generation_metadata[offspring] = {
'generation': self.current_generation,
'operator': 'crossover',
'parent_indices': [self.population.index(p) for p in parents if p in self.population],
'parent_prompts': [p.prompt for p in parents],
'parent_fitnesses': [p.fitness for p in parents],
'target_fitness': target_fitness,
'diversity_score': None,
'sample_scores': None,
'num_diverse_parents': len(parents)
}
logger.info(f" Oₓₒ{i+1}: Crossed top parents (f={parents[0].fitness:.3f} × f={parents[1].fitness:.3f}) → target f*={target_fitness:.3f}")
else:
# Signal that crossover was skipped - caller should compensate with GEPA
self._crossover_skipped = True
self._crossover_deficit = self.n_crossover
logger.info(f"⚠️ Crossover SKIPPED: need 2+ scored parents, have {len(pareto_front)}")
logger.info(f" → Caller should compensate with {self._crossover_deficit} extra GEPA reflection candidates")
# Mutation: Explore diverse variations (requires >= 1 parent)
# Use pareto_front if available, otherwise fall back to population
mutation_source = pareto_front if pareto_front else self.population
if len(mutation_source) >= 1:
for i in range(self.n_mutation):
parent = self._select_parent_for_mutation(mutation_source)
offspring = self.mutation_op(parent, self.population, llm)
new_prompts.append(offspring)
parent_idx = self.population.index(parent) if parent in self.population else -1
self._generation_metadata[offspring] = {
'generation': self.current_generation,
'operator': 'mutation',
'parent_indices': [parent_idx] if parent_idx >= 0 else None,
'parent_prompts': [parent.prompt],
'parent_fitness': parent.fitness,
'target_fitness': None,
'diversity_score': None,
'sample_scores': None,
'num_diverse_parents': min(self.mutation_op.nu, len(self.population))
}
crossover_count = len([p for p in new_prompts if self._generation_metadata.get(p, {}).get('operator') == 'crossover'])
mutation_count = len([p for p in new_prompts if self._generation_metadata.get(p, {}).get('operator') == 'mutation'])
logger.info(f"🧬 LLEGO Generated {len(new_prompts)} candidates: {crossover_count} crossover, {mutation_count} mutation")
return new_prompts
def get_prompt_metadata(self, prompt: str) -> Optional[PromptMetadata]:
"""Retrieve metadata for a prompt generated in the current generation."""
return self._generation_metadata.get(prompt)
def _convert_gepa_pareto_to_candidates(
self,
gepa_pareto_front: List[Dict[str, Any]]
) -> List[PromptCandidate]:
"""
Convert GEPA Pareto front entries to PromptCandidate format.
Args:
gepa_pareto_front: List of dicts with 'prompt', 'score', 'type', 'notation'
Returns:
List of PromptCandidate objects
"""
if not gepa_pareto_front:
return []
# De-duplicate Pareto front
seen_prompts = set()
deduplicated_front = []
for entry in gepa_pareto_front:
if isinstance(entry, dict) and 'prompt' in entry:
prompt_text = entry['prompt']
if prompt_text not in seen_prompts:
seen_prompts.add(prompt_text)
deduplicated_front.append(entry)
candidates = []
for idx, entry in enumerate(deduplicated_front):
try:
if not isinstance(entry, dict):
continue
prompt = entry.get('prompt')
if not prompt or not isinstance(prompt, str):
continue
score = entry.get('score')
if score is None:
continue
try:
fitness = float(score)
except (ValueError, TypeError):
continue
candidate_type = entry.get('type', 'unknown')
notation = entry.get('notation', 'S')
metadata: PromptMetadata = {
'generation': self.current_generation,
'operator': 'gepa_pareto_front',
'parent_indices': None,
'parent_prompts': None,
'target_fitness': None,
'diversity_score': None,
'sample_scores': None,
'num_diverse_parents': None,
'candidate_type': candidate_type,
'notation': notation,
'prompt_length': len(prompt),
'word_count': len(prompt.split()),
'from_gepa_pareto': True
}
candidate = PromptCandidate(
prompt=prompt,
fitness=fitness,
metadata=metadata
)
candidates.append(candidate)
except Exception as e:
logger.error(f"Error converting Pareto entry #{idx+1}: {e}")
continue
return candidates
def update_population(self, new_candidates: List[PromptCandidate]):
"""Update population with new evaluated candidates."""
self.population.extend(new_candidates)
# Remove duplicates
seen_prompts = set()
unique_population = []
for p in self.population:
normalized = p.prompt.strip().strip('"\'')
if normalized not in seen_prompts:
seen_prompts.add(normalized)
unique_population.append(p)
self.population = unique_population
# Keep top population_size by fitness
self.population.sort(key=lambda p: p.fitness, reverse=True)
self.population = self.population[:self.population_size]
if self.population:
logger.debug(f"Population updated: {len(self.population)} candidates, best={self.population[0].fitness:.3f}")
def _select_parents_for_crossover(self, pareto_front: List[PromptCandidate], k: int = 2) -> List[PromptCandidate]:
"""Select top-k parents for crossover."""
sorted_front = sorted(pareto_front, key=lambda p: p.fitness, reverse=True)
return sorted_front[:k]
def _select_parent_for_mutation(self, pareto_front: List[PromptCandidate]) -> PromptCandidate:
"""Select a parent for mutation (fitness-proportionate)."""
if len(pareto_front) == 1:
return pareto_front[0]
fitnesses = np.array([p.fitness for p in pareto_front])
fitnesses = np.maximum(fitnesses, 0.01)
probs = fitnesses / fitnesses.sum()
idx = np.random.choice(len(pareto_front), p=probs)
return pareto_front[idx]
def _calculate_target_fitness(self, parents: List[PromptCandidate]) -> float:
"""Calculate target fitness for crossover using LLEGO formula: f* = f_max + α(f_max - f_min)"""
fitnesses = [p.fitness for p in parents]
f_max = max(fitnesses)
f_min = min(fitnesses)
target_fitness = f_max + self.crossover_op.alpha * (f_max - f_min)
return min(target_fitness, 1.0)
def get_best_candidate(self) -> Optional[PromptCandidate]:
"""Get current best prompt."""
if not self.population:
return None
return max(self.population, key=lambda p: p.fitness)
def get_stats(self) -> Dict[str, Any]:
"""Get population statistics."""
if not self.population:
return {"population_size": 0, "best_fitness": 0.0, "avg_fitness": 0.0}
fitnesses = [p.fitness for p in self.population]
return {
"population_size": len(self.population),
"best_fitness": max(fitnesses),
"avg_fitness": np.mean(fitnesses),
"min_fitness": min(fitnesses),
"fitness_std": np.std(fitnesses)
}