Spaces:
Sleeping
Sleeping
| """ | |
| LLEGO Integration Layer for GEPA. | |
| This module provides the integration layer that wraps LLEGO genetic operators | |
| for use with the GEPA optimization framework. | |
| Based on: Decision Tree Induction Through LLMs via Semantically-Aware Evolution (ICLR 2025) | |
| GitHub: https://github.com/nicolashuynh/LLEGO | |
| """ | |
| from typing import List, Callable, Dict, Any, Optional, Literal | |
| import numpy as np | |
| import logging | |
| # Import from modular files (SOLID: Single Responsibility) | |
| from .models import PromptCandidate, PromptMetadata | |
| from .crossover import FitnessGuidedCrossover | |
| from .mutation import DiversityGuidedMutation | |
| logger = logging.getLogger(__name__) | |
| class LLEGOIntegrationLayer: | |
| """ | |
| Integration layer that wraps LLEGO operators for GEPA. | |
| This class manages the genetic algorithm workflow: | |
| - Population initialization | |
| - Parent selection (fitness-based) | |
| - Crossover and mutation operations | |
| - Population management | |
| Design Principles: | |
| - Composition over inheritance (uses crossover_op, mutation_op) | |
| - Single Responsibility: Only manages GA workflow | |
| - Open/Closed: New operators can be added without modifying this class | |
| """ | |
| def __init__( | |
| self, | |
| alpha: float = 0.05, | |
| tau: float = 10.0, | |
| nu: int = 4, | |
| population_size: int = 10, | |
| n_crossover: int = 2, | |
| n_mutation: int = 3 | |
| ): | |
| """ | |
| Initialize LLEGO integration layer. | |
| Args: | |
| alpha: Fitness extrapolation for crossover (default 0.05) | |
| tau: Diversity temperature for mutation | |
| nu: Parent arity for diversity sampling | |
| population_size: Maximum population size | |
| n_crossover: Number of crossover offspring per generation | |
| n_mutation: Number of mutation offspring per generation | |
| """ | |
| self.crossover_op = FitnessGuidedCrossover(alpha=alpha) | |
| self.mutation_op = DiversityGuidedMutation(tau=tau, nu=nu) | |
| self.population_size = population_size | |
| self.n_crossover = n_crossover | |
| self.n_mutation = n_mutation | |
| self.population: List[PromptCandidate] = [] | |
| self.current_generation = 0 | |
| # Track metadata for prompts generated in current generation | |
| self._generation_metadata: Dict[str, PromptMetadata] = {} | |
| logger.debug(f"LLEGO initialized: pop_size={population_size}, crossover={n_crossover}, mutation={n_mutation}") | |
| def initialize_population(self, seed_prompt: str, initial_fitness: float = 0.5): | |
| """Initialize population with seed prompt.""" | |
| seed_candidate = PromptCandidate( | |
| prompt=seed_prompt, | |
| fitness=initial_fitness, | |
| metadata={ | |
| 'generation': 0, | |
| 'operator': 'seed', | |
| 'parent_indices': None, | |
| 'parent_prompts': None, | |
| 'target_fitness': None, | |
| 'diversity_score': None, | |
| 'sample_scores': None, | |
| 'num_diverse_parents': None | |
| } | |
| ) | |
| self.population = [seed_candidate] | |
| logger.debug(f"Population initialized with seed prompt ({len(seed_prompt)} chars)") | |
| def create_candidate_with_metadata( | |
| self, | |
| prompt: str, | |
| fitness: float, | |
| generation: int, | |
| operator: Literal['crossover', 'mutation'], | |
| parent_indices: Optional[List[int]] = None, | |
| parent_prompts: Optional[List[str]] = None, | |
| target_fitness: Optional[float] = None, | |
| diversity_score: Optional[float] = None, | |
| sample_scores: Optional[List[float]] = None, | |
| num_diverse_parents: Optional[int] = None | |
| ) -> PromptCandidate: | |
| """Create a PromptCandidate with properly populated metadata.""" | |
| return PromptCandidate( | |
| prompt=prompt, | |
| fitness=fitness, | |
| metadata={ | |
| 'generation': generation, | |
| 'operator': operator, | |
| 'parent_indices': parent_indices, | |
| 'parent_prompts': parent_prompts, | |
| 'target_fitness': target_fitness, | |
| 'diversity_score': diversity_score, | |
| 'sample_scores': sample_scores, | |
| 'num_diverse_parents': num_diverse_parents | |
| } | |
| ) | |
| def evolve_generation( | |
| self, | |
| llm: Callable[[str], str], | |
| pareto_front: List[PromptCandidate] | |
| ) -> List[str]: | |
| """ | |
| Evolve one generation using LLEGO operators. | |
| When crossover cannot run (< 2 parents with scores), it is skipped. | |
| The caller should compensate by generating extra GEPA reflection candidates. | |
| Args: | |
| llm: Language model callable | |
| pareto_front: Current Pareto front (non-dominated prompts with scores) | |
| Returns: | |
| List of new prompt candidates to evaluate | |
| """ | |
| new_prompts = [] | |
| self.current_generation += 1 | |
| self._generation_metadata = {} | |
| # Track crossover status for caller to handle compensation | |
| self._crossover_skipped = False | |
| self._crossover_deficit = 0 | |
| self._actual_crossover_count = 0 | |
| logger.info(f"🧬 LLEGO Generation {self.current_generation}: pareto_front={len(pareto_front)}, population={len(self.population)}") | |
| # Crossover: Combine BEST parents (requires >= 2 parents WITH SCORES) | |
| if len(pareto_front) >= 2: | |
| # Sort by fitness - always use TOP scored parents for crossover | |
| sorted_front = sorted(pareto_front, key=lambda p: p.fitness, reverse=True) | |
| for i in range(self.n_crossover): | |
| # Always use top 2 highest-scored parents | |
| parents = sorted_front[:2] | |
| target_fitness = self._calculate_target_fitness(parents) | |
| offspring = self.crossover_op(parents, target_fitness, llm) | |
| new_prompts.append(offspring) | |
| self._actual_crossover_count += 1 | |
| # Store metadata with parent fitness info | |
| self._generation_metadata[offspring] = { | |
| 'generation': self.current_generation, | |
| 'operator': 'crossover', | |
| 'parent_indices': [self.population.index(p) for p in parents if p in self.population], | |
| 'parent_prompts': [p.prompt for p in parents], | |
| 'parent_fitnesses': [p.fitness for p in parents], | |
| 'target_fitness': target_fitness, | |
| 'diversity_score': None, | |
| 'sample_scores': None, | |
| 'num_diverse_parents': len(parents) | |
| } | |
| logger.info(f" Oₓₒ{i+1}: Crossed top parents (f={parents[0].fitness:.3f} × f={parents[1].fitness:.3f}) → target f*={target_fitness:.3f}") | |
| else: | |
| # Signal that crossover was skipped - caller should compensate with GEPA | |
| self._crossover_skipped = True | |
| self._crossover_deficit = self.n_crossover | |
| logger.info(f"⚠️ Crossover SKIPPED: need 2+ scored parents, have {len(pareto_front)}") | |
| logger.info(f" → Caller should compensate with {self._crossover_deficit} extra GEPA reflection candidates") | |
| # Mutation: Explore diverse variations (requires >= 1 parent) | |
| # Use pareto_front if available, otherwise fall back to population | |
| mutation_source = pareto_front if pareto_front else self.population | |
| if len(mutation_source) >= 1: | |
| for i in range(self.n_mutation): | |
| parent = self._select_parent_for_mutation(mutation_source) | |
| offspring = self.mutation_op(parent, self.population, llm) | |
| new_prompts.append(offspring) | |
| parent_idx = self.population.index(parent) if parent in self.population else -1 | |
| self._generation_metadata[offspring] = { | |
| 'generation': self.current_generation, | |
| 'operator': 'mutation', | |
| 'parent_indices': [parent_idx] if parent_idx >= 0 else None, | |
| 'parent_prompts': [parent.prompt], | |
| 'parent_fitness': parent.fitness, | |
| 'target_fitness': None, | |
| 'diversity_score': None, | |
| 'sample_scores': None, | |
| 'num_diverse_parents': min(self.mutation_op.nu, len(self.population)) | |
| } | |
| crossover_count = len([p for p in new_prompts if self._generation_metadata.get(p, {}).get('operator') == 'crossover']) | |
| mutation_count = len([p for p in new_prompts if self._generation_metadata.get(p, {}).get('operator') == 'mutation']) | |
| logger.info(f"🧬 LLEGO Generated {len(new_prompts)} candidates: {crossover_count} crossover, {mutation_count} mutation") | |
| return new_prompts | |
| def get_prompt_metadata(self, prompt: str) -> Optional[PromptMetadata]: | |
| """Retrieve metadata for a prompt generated in the current generation.""" | |
| return self._generation_metadata.get(prompt) | |
| def _convert_gepa_pareto_to_candidates( | |
| self, | |
| gepa_pareto_front: List[Dict[str, Any]] | |
| ) -> List[PromptCandidate]: | |
| """ | |
| Convert GEPA Pareto front entries to PromptCandidate format. | |
| Args: | |
| gepa_pareto_front: List of dicts with 'prompt', 'score', 'type', 'notation' | |
| Returns: | |
| List of PromptCandidate objects | |
| """ | |
| if not gepa_pareto_front: | |
| return [] | |
| # De-duplicate Pareto front | |
| seen_prompts = set() | |
| deduplicated_front = [] | |
| for entry in gepa_pareto_front: | |
| if isinstance(entry, dict) and 'prompt' in entry: | |
| prompt_text = entry['prompt'] | |
| if prompt_text not in seen_prompts: | |
| seen_prompts.add(prompt_text) | |
| deduplicated_front.append(entry) | |
| candidates = [] | |
| for idx, entry in enumerate(deduplicated_front): | |
| try: | |
| if not isinstance(entry, dict): | |
| continue | |
| prompt = entry.get('prompt') | |
| if not prompt or not isinstance(prompt, str): | |
| continue | |
| score = entry.get('score') | |
| if score is None: | |
| continue | |
| try: | |
| fitness = float(score) | |
| except (ValueError, TypeError): | |
| continue | |
| candidate_type = entry.get('type', 'unknown') | |
| notation = entry.get('notation', 'S') | |
| metadata: PromptMetadata = { | |
| 'generation': self.current_generation, | |
| 'operator': 'gepa_pareto_front', | |
| 'parent_indices': None, | |
| 'parent_prompts': None, | |
| 'target_fitness': None, | |
| 'diversity_score': None, | |
| 'sample_scores': None, | |
| 'num_diverse_parents': None, | |
| 'candidate_type': candidate_type, | |
| 'notation': notation, | |
| 'prompt_length': len(prompt), | |
| 'word_count': len(prompt.split()), | |
| 'from_gepa_pareto': True | |
| } | |
| candidate = PromptCandidate( | |
| prompt=prompt, | |
| fitness=fitness, | |
| metadata=metadata | |
| ) | |
| candidates.append(candidate) | |
| except Exception as e: | |
| logger.error(f"Error converting Pareto entry #{idx+1}: {e}") | |
| continue | |
| return candidates | |
| def update_population(self, new_candidates: List[PromptCandidate]): | |
| """Update population with new evaluated candidates.""" | |
| self.population.extend(new_candidates) | |
| # Remove duplicates | |
| seen_prompts = set() | |
| unique_population = [] | |
| for p in self.population: | |
| normalized = p.prompt.strip().strip('"\'') | |
| if normalized not in seen_prompts: | |
| seen_prompts.add(normalized) | |
| unique_population.append(p) | |
| self.population = unique_population | |
| # Keep top population_size by fitness | |
| self.population.sort(key=lambda p: p.fitness, reverse=True) | |
| self.population = self.population[:self.population_size] | |
| if self.population: | |
| logger.debug(f"Population updated: {len(self.population)} candidates, best={self.population[0].fitness:.3f}") | |
| def _select_parents_for_crossover(self, pareto_front: List[PromptCandidate], k: int = 2) -> List[PromptCandidate]: | |
| """Select top-k parents for crossover.""" | |
| sorted_front = sorted(pareto_front, key=lambda p: p.fitness, reverse=True) | |
| return sorted_front[:k] | |
| def _select_parent_for_mutation(self, pareto_front: List[PromptCandidate]) -> PromptCandidate: | |
| """Select a parent for mutation (fitness-proportionate).""" | |
| if len(pareto_front) == 1: | |
| return pareto_front[0] | |
| fitnesses = np.array([p.fitness for p in pareto_front]) | |
| fitnesses = np.maximum(fitnesses, 0.01) | |
| probs = fitnesses / fitnesses.sum() | |
| idx = np.random.choice(len(pareto_front), p=probs) | |
| return pareto_front[idx] | |
| def _calculate_target_fitness(self, parents: List[PromptCandidate]) -> float: | |
| """Calculate target fitness for crossover using LLEGO formula: f* = f_max + α(f_max - f_min)""" | |
| fitnesses = [p.fitness for p in parents] | |
| f_max = max(fitnesses) | |
| f_min = min(fitnesses) | |
| target_fitness = f_max + self.crossover_op.alpha * (f_max - f_min) | |
| return min(target_fitness, 1.0) | |
| def get_best_candidate(self) -> Optional[PromptCandidate]: | |
| """Get current best prompt.""" | |
| if not self.population: | |
| return None | |
| return max(self.population, key=lambda p: p.fitness) | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get population statistics.""" | |
| if not self.population: | |
| return {"population_size": 0, "best_fitness": 0.0, "avg_fitness": 0.0} | |
| fitnesses = [p.fitness for p in self.population] | |
| return { | |
| "population_size": len(self.population), | |
| "best_fitness": max(fitnesses), | |
| "avg_fitness": np.mean(fitnesses), | |
| "min_fitness": min(fitnesses), | |
| "fitness_std": np.std(fitnesses) | |
| } | |