|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
|
import random |
|
|
import re |
|
|
import os |
|
|
import csv |
|
|
import time |
|
|
import itertools |
|
|
from typing import Callable, Dict, List |
|
|
from datetime import datetime |
|
|
|
|
|
import numpy as np |
|
|
from tqdm.asyncio import tqdm as aio_tqdm |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
from evoagentx.agents import CustomizeAgent |
|
|
from evoagentx.benchmark.bigbenchhard import BIGBenchHard |
|
|
from evoagentx.core.logging import logger |
|
|
from evoagentx.models import OpenAILLMConfig |
|
|
from evoagentx.optimizers.engine.base import BaseOptimizer |
|
|
from evoagentx.optimizers.engine.registry import ParamRegistry |
|
|
|
|
|
|
|
|
class EvopromptOptimizer(BaseOptimizer): |
|
|
""" |
|
|
Base class for evolutionary prompt optimization algorithms. |
|
|
|
|
|
This optimizer uses evolutionary algorithms to improve prompts in multi-agent workflows. |
|
|
It supports both node-based and combination-based evolution strategies. |
|
|
""" |
|
|
def __init__(self, |
|
|
registry: ParamRegistry, |
|
|
program: Callable, |
|
|
population_size: int, |
|
|
iterations: int, |
|
|
llm_config: OpenAILLMConfig, |
|
|
concurrency_limit: int = 10, |
|
|
combination_sample_size: int = None, |
|
|
enable_logging: bool = True, |
|
|
log_dir: str = None, |
|
|
enable_early_stopping: bool = True, |
|
|
early_stopping_patience: int = 3): |
|
|
""" |
|
|
Initialize the EvoPrompt optimizer. |
|
|
|
|
|
Args: |
|
|
registry: Parameter registry for tracking prompt nodes |
|
|
program: The program/workflow to optimize |
|
|
population_size: Size of the evolution population |
|
|
iterations: Number of evolution iterations |
|
|
llm_config: Configuration for the LLM used in evolution |
|
|
concurrency_limit: Maximum concurrent API calls |
|
|
combination_sample_size: Sample size for combination evaluation |
|
|
enable_logging: Whether to enable detailed logging |
|
|
log_dir: Directory for saving logs |
|
|
enable_early_stopping: Whether to enable early stopping |
|
|
early_stopping_patience: Number of generations to wait before stopping |
|
|
""" |
|
|
super().__init__(registry=registry, program=program) |
|
|
|
|
|
|
|
|
self.population_size = population_size |
|
|
self.iterations = iterations |
|
|
self.llm_config = llm_config |
|
|
self.semaphore = asyncio.Semaphore(concurrency_limit) |
|
|
self.combination_sample_size = combination_sample_size |
|
|
|
|
|
|
|
|
self.enable_logging = enable_logging |
|
|
self.log_dir_base = log_dir |
|
|
self.log_dir = None |
|
|
|
|
|
|
|
|
self.enable_early_stopping = enable_early_stopping |
|
|
self.early_stopping_patience = early_stopping_patience |
|
|
self._best_score_so_far = -float('inf') |
|
|
self._generations_without_improvement = 0 |
|
|
|
|
|
|
|
|
self._eval_cache = {} |
|
|
self.node_populations: Dict[str, List[str]] = {} |
|
|
self.node_scores: Dict[str, List[float]] = {} |
|
|
self.best_scores_per_gen: Dict[str, Dict[str, float]] = {} |
|
|
self.avg_scores_per_gen: Dict[str, Dict[str, float]] = {} |
|
|
self.best_combo_scores_per_gen: Dict[str, float] = {} |
|
|
self.avg_combo_scores_per_gen: Dict[str, float] = {} |
|
|
|
|
|
|
|
|
self.paraphrase_agent = CustomizeAgent( |
|
|
name="ParaphraseAgent", |
|
|
description="An agent that paraphrases a given instruction.", |
|
|
prompt="""Task: Generate a semantically equivalent but differently worded version of the user-provided instruction. |
|
|
|
|
|
Now, please process the following instruction: |
|
|
Input: {instruction} |
|
|
|
|
|
Please provide the paraphrased version in the following format: |
|
|
|
|
|
## paraphrased_instruction |
|
|
[Your paraphrased version here]""", |
|
|
llm_config=self.llm_config, |
|
|
inputs=[ |
|
|
{"name": "instruction", "type": "string", "description": "The instruction to paraphrase."}, |
|
|
], |
|
|
outputs=[ |
|
|
{"name": "paraphrased_instruction", "type": "string", "description": "The paraphrased instruction."} |
|
|
], |
|
|
parse_mode="title" |
|
|
) |
|
|
|
|
|
def _setup_logging_directory(self, benchmark: BIGBenchHard): |
|
|
""" |
|
|
Set up logging directory for evolution tracking. |
|
|
|
|
|
Args: |
|
|
benchmark: The benchmark instance containing task information |
|
|
""" |
|
|
if not self.enable_logging or self.log_dir: |
|
|
return |
|
|
|
|
|
task_name = benchmark.task if hasattr(benchmark, 'task') else 'unknown_task' |
|
|
|
|
|
if self.log_dir_base is None: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
algo_name = self.__class__.__name__.replace("Optimizer", "") |
|
|
self.log_dir = f"node_evolution_logs_{algo_name}_{self.llm_config.model}_{task_name}_{timestamp}" |
|
|
else: |
|
|
self.log_dir = self.log_dir_base |
|
|
|
|
|
os.makedirs(self.log_dir, exist_ok=True) |
|
|
logger.info(f"Logging enabled. Log files will be saved to: {self.log_dir}") |
|
|
|
|
|
def _log_generation_summary(self, generation: int, operation: str = "Evolution"): |
|
|
""" |
|
|
Log detailed summary of each generation's population and scores. |
|
|
|
|
|
Args: |
|
|
generation: The current generation number |
|
|
operation: Type of operation (Evolution, Initial, etc.) |
|
|
""" |
|
|
if not self.enable_logging: |
|
|
return |
|
|
|
|
|
filename = f"generation_{generation:02d}_{operation.lower()}.csv" |
|
|
filepath = os.path.join(self.log_dir, filename) |
|
|
|
|
|
with open(filepath, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow(['Node_Name', 'Individual_ID', 'Prompt_Text', 'Fitness_Score', 'Status', 'Rank_in_Node', 'Generation', 'Timestamp']) |
|
|
timestamp = datetime.now().isoformat() |
|
|
|
|
|
for node_name in self.node_populations.keys(): |
|
|
node_pop = self.node_populations.get(node_name, []) |
|
|
node_scores = self.node_scores.get(node_name, []) |
|
|
|
|
|
if not node_pop: |
|
|
continue |
|
|
|
|
|
sorted_indices = sorted(range(len(node_scores)), key=lambda i: node_scores[i], reverse=True) |
|
|
|
|
|
for rank, idx in enumerate(sorted_indices, 1): |
|
|
prompt = node_pop[idx] |
|
|
score = node_scores[idx] |
|
|
status = "Best" if rank == 1 else "Survivor" if rank <= self.population_size else "Eliminated" |
|
|
|
|
|
writer.writerow([ |
|
|
node_name, f"{node_name}_{idx}", prompt[:200] + "..." if len(prompt) > 200 else prompt, |
|
|
f"{score:.6f}", status, rank, generation, timestamp |
|
|
]) |
|
|
|
|
|
def _log_detailed_evaluation(self, generation: int, combinations: List[Dict[str, str]], |
|
|
combination_scores: List[float]): |
|
|
if not self.enable_logging: |
|
|
return |
|
|
|
|
|
filename = f"combo_evaluation_gen_{generation:02d}.csv" |
|
|
filepath = os.path.join(self.log_dir, filename) |
|
|
|
|
|
with open(filepath, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
node_names = list(combinations[0].keys()) if combinations else [] |
|
|
header = ['Combination_ID', 'Average_Score'] |
|
|
for node_name in node_names: |
|
|
header.append(f'{node_name}_Prompt_Preview') |
|
|
header.extend(['Generation', 'Timestamp']) |
|
|
writer.writerow(header) |
|
|
|
|
|
timestamp = datetime.now().isoformat() |
|
|
|
|
|
for combo_id, (combination, avg_score) in enumerate(zip(combinations, combination_scores)): |
|
|
try: |
|
|
row = [f"combo_{combo_id}", f"{avg_score:.6f}"] |
|
|
for node_name in node_names: |
|
|
prompt = combination[node_name] |
|
|
row.append(prompt[:50] + "..." if len(prompt) > 50 else prompt) |
|
|
row.extend([generation, timestamp]) |
|
|
writer.writerow(row) |
|
|
except Exception as e: |
|
|
logger.error(f"Error logging evaluation for combination {combo_id}: {e}") |
|
|
|
|
|
def _create_single_metric_plot(self, metric_name: str, generations: List[int], |
|
|
best_scores: List[float], avg_scores: List[float], |
|
|
algorithm_name: str, plot_dir: str): |
|
|
fig, ax = plt.subplots(figsize=(12, 7)) |
|
|
ax.plot(generations, best_scores, marker='o', linestyle='-', linewidth=2, markersize=8, label='Best Score') |
|
|
ax.plot(generations, avg_scores, marker='x', linestyle='--', linewidth=2, markersize=8, label='Average Score') |
|
|
|
|
|
title = f"Performance for '{metric_name}' ({algorithm_name})" |
|
|
ax.set_title(title, fontsize=16, weight='bold') |
|
|
ax.set_xlabel('Generation', fontsize=12) |
|
|
ax.set_ylabel('Fitness Score', fontsize=12) |
|
|
ax.set_xticks(generations) |
|
|
ax.set_xticklabels([f"Gen {g}" for g in generations], rotation=45, ha="right") |
|
|
ax.legend(loc='best', fontsize=10) |
|
|
ax.grid(True, which='both', linestyle='--', linewidth=0.5) |
|
|
|
|
|
plt.tight_layout() |
|
|
|
|
|
safe_metric_name = re.sub(r'[^a-zA-Z0-9_-]', '_', metric_name) |
|
|
filename = f"performance_plot_{safe_metric_name}.png" |
|
|
filepath = os.path.join(plot_dir, filename) |
|
|
|
|
|
try: |
|
|
plt.savefig(filepath, dpi=200, bbox_inches='tight') |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save individual plot for {metric_name}: {e}") |
|
|
finally: |
|
|
plt.close(fig) |
|
|
|
|
|
def _plot_and_save_performance_graph(self, algorithm_name: str): |
|
|
if not self.enable_logging or plt is None: |
|
|
if plt is None: |
|
|
logger.warning("Matplotlib not found, skipping plot generation.") |
|
|
return |
|
|
if not self.best_scores_per_gen and not self.best_combo_scores_per_gen: |
|
|
logger.warning("No performance data to plot.") |
|
|
return |
|
|
|
|
|
plt.style.use('seaborn-v0_8-whitegrid') |
|
|
all_gen_keys = set(self.best_scores_per_gen.keys()) | set(self.best_combo_scores_per_gen.keys()) |
|
|
generations = sorted([int(re.search(r'\d+', gen).group()) for gen in all_gen_keys if re.search(r'\d+', gen)]) |
|
|
|
|
|
fig_combined, ax_combined = plt.subplots(figsize=(16, 9)) |
|
|
|
|
|
if self.best_combo_scores_per_gen: |
|
|
combo_best = [self.best_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
|
|
combo_avg = [self.avg_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
|
|
ax_combined.plot(generations, combo_best, marker='*', linestyle='-', linewidth=2.5, markersize=10, label='Best Combination Score (Overall)') |
|
|
ax_combined.plot(generations, combo_avg, marker='D', linestyle='--', linewidth=2.5, markersize=8, label='Average Combination Score (Overall)') |
|
|
|
|
|
all_node_metrics = set() |
|
|
for gen_data in self.best_scores_per_gen.values(): |
|
|
all_node_metrics.update(gen_data.keys()) |
|
|
|
|
|
for metric in sorted(list(all_node_metrics)): |
|
|
best_scores = [self.best_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
|
|
avg_scores = [self.avg_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
|
|
ax_combined.plot(generations, best_scores, marker='o', linestyle='-', alpha=0.7, label=f'Best Score ({metric})') |
|
|
ax_combined.plot(generations, avg_scores, marker='x', linestyle='--', alpha=0.7, label=f'Average Score ({metric})') |
|
|
|
|
|
ax_combined.set_title(f'Overall Performance Evolution ({algorithm_name})', fontsize=18, weight='bold') |
|
|
ax_combined.set_xlabel('Generation', fontsize=14) |
|
|
ax_combined.set_ylabel('Fitness Score', fontsize=14) |
|
|
ax_combined.set_xticks(generations) |
|
|
ax_combined.set_xticklabels([f"Gen {g}" for g in generations], rotation=45, ha="right") |
|
|
handles, labels = ax_combined.get_legend_handles_labels() |
|
|
combo_indices = [i for i, label in enumerate(labels) if 'Combination' in label] |
|
|
node_indices = [i for i, label in enumerate(labels) if 'Combination' not in label] |
|
|
ax_combined.legend([handles[i] for i in combo_indices + node_indices], |
|
|
[labels[i] for i in combo_indices + node_indices], |
|
|
loc='best', fontsize=10) |
|
|
ax_combined.grid(True, which='both', linestyle='--', linewidth=0.5) |
|
|
plt.tight_layout() |
|
|
|
|
|
combined_filepath = os.path.join(self.log_dir, "performance_summary_OVERALL.png") |
|
|
try: |
|
|
plt.savefig(combined_filepath, dpi=300, bbox_inches='tight') |
|
|
logger.info(f"Overall performance plot saved to: {combined_filepath}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save overall performance plot: {e}") |
|
|
finally: |
|
|
plt.close(fig_combined) |
|
|
|
|
|
individual_plot_dir = os.path.join(self.log_dir, 'individual_plots') |
|
|
os.makedirs(individual_plot_dir, exist_ok=True) |
|
|
|
|
|
for metric in sorted(list(all_node_metrics)): |
|
|
best_scores = [self.best_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
|
|
avg_scores = [self.avg_scores_per_gen.get(f"Gen_{g}", {}).get(metric) for g in generations] |
|
|
self._create_single_metric_plot(metric, generations, best_scores, avg_scores, algorithm_name, individual_plot_dir) |
|
|
|
|
|
if self.best_combo_scores_per_gen: |
|
|
combo_best = [self.best_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
|
|
combo_avg = [self.avg_combo_scores_per_gen.get(f"Gen_{g}") for g in generations] |
|
|
self._create_single_metric_plot("Combination", generations, combo_best, combo_avg, algorithm_name, individual_plot_dir) |
|
|
|
|
|
logger.info(f"Individual performance plots saved to: {individual_plot_dir}") |
|
|
|
|
|
def _log_optimization_summary(self, algorithm_name: str, best_config: Dict[str, str], |
|
|
test_accuracy: float = None): |
|
|
if not self.enable_logging: |
|
|
return |
|
|
|
|
|
filename = f"optimization_summary_{algorithm_name.lower()}.csv" |
|
|
filepath = os.path.join(self.log_dir, filename) |
|
|
|
|
|
with open(filepath, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerow(['Metric', 'Value', 'Timestamp']) |
|
|
timestamp = datetime.now().isoformat() |
|
|
|
|
|
writer.writerow(['Algorithm', algorithm_name, timestamp]) |
|
|
writer.writerow(['Population_Size', self.population_size, timestamp]) |
|
|
writer.writerow(['Iterations', self.iterations, timestamp]) |
|
|
writer.writerow(['Combination_Sample_Size', self.combination_sample_size, timestamp]) |
|
|
writer.writerow(['Early_Stopping_Enabled', self.enable_early_stopping, timestamp]) |
|
|
if self.enable_early_stopping: |
|
|
writer.writerow(['Early_Stopping_Patience', self.early_stopping_patience, timestamp]) |
|
|
|
|
|
if test_accuracy is not None: |
|
|
writer.writerow(['Final_Test_Accuracy', f"{test_accuracy:.6f}", timestamp]) |
|
|
|
|
|
for node_name, prompt in best_config.items(): |
|
|
writer.writerow([f'Best_{node_name}', prompt, timestamp]) |
|
|
|
|
|
for gen_name in self.best_scores_per_gen.keys(): |
|
|
for metric_name, best_score in self.best_scores_per_gen[gen_name].items(): |
|
|
writer.writerow([f'{gen_name}_{metric_name}_Best', f"{best_score:.6f}", timestamp]) |
|
|
|
|
|
if gen_name in self.avg_scores_per_gen: |
|
|
for metric_name, avg_score in self.avg_scores_per_gen[gen_name].items(): |
|
|
writer.writerow([f'{gen_name}_{metric_name}_Avg', f"{avg_score:.6f}", timestamp]) |
|
|
|
|
|
self._plot_and_save_performance_graph(algorithm_name) |
|
|
|
|
|
async def _log_evaluation_details(self, benchmark: BIGBenchHard, dataset: List[Dict], |
|
|
predictions: List[str], scores: List[float], eval_mode: str, |
|
|
accuracy: float, correct_count: int, total_count: int): |
|
|
if not self.enable_logging: |
|
|
return |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"evaluation_testset_{eval_mode}_{timestamp}.csv" |
|
|
filepath = os.path.join(self.log_dir, filename) |
|
|
|
|
|
logger.info(f"Logging detailed evaluation results to {filepath}") |
|
|
|
|
|
with open(filepath, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
|
|
|
writer.writerow(['Metric', 'Value']) |
|
|
writer.writerow(['Overall_Accuracy', f"{accuracy:.6f}"]) |
|
|
writer.writerow(['Correct_Count', correct_count]) |
|
|
writer.writerow(['Total_Count', total_count]) |
|
|
writer.writerow([]) |
|
|
|
|
|
|
|
|
writer.writerow(['example_id', 'input_text', 'prediction', 'ground_truth', 'score']) |
|
|
|
|
|
for i, example in enumerate(dataset): |
|
|
example_id = benchmark._get_id(example) |
|
|
input_text = example.get("input", "") |
|
|
label = benchmark.get_label(example) |
|
|
|
|
|
writer.writerow([ |
|
|
example_id, |
|
|
input_text[:200] + "..." if len(input_text) > 200 else input_text, |
|
|
predictions[i], |
|
|
label, |
|
|
scores[i] |
|
|
]) |
|
|
|
|
|
def _log_generation(self, generation: int, combos_with_scores: List[tuple]): |
|
|
""" |
|
|
Log generation data for combination-based evolution. |
|
|
""" |
|
|
if not self.enable_logging: |
|
|
return |
|
|
|
|
|
filename = f"combo_generation_{generation:02d}_log.csv" |
|
|
filepath = os.path.join(self.log_dir, filename) |
|
|
|
|
|
with open(filepath, 'w', newline='', encoding='utf-8') as f: |
|
|
writer = csv.writer(f) |
|
|
header = ['Combination_ID', 'Combination_Score', 'Node_Name', 'Prompt_Text', 'Generation', 'Timestamp'] |
|
|
writer.writerow(header) |
|
|
timestamp = datetime.now().isoformat() |
|
|
|
|
|
sorted_combos = sorted(combos_with_scores, key=lambda x: x[1], reverse=True) |
|
|
|
|
|
for combo_rank, (combination, avg_score) in enumerate(sorted_combos): |
|
|
combo_id = f"combo_rank_{combo_rank + 1}" |
|
|
for node_name, prompt_text in combination.items(): |
|
|
writer.writerow([ |
|
|
combo_id, |
|
|
f"{avg_score:.6f}", |
|
|
node_name, |
|
|
prompt_text[:200] + "..." if len(prompt_text) > 200 else prompt_text, |
|
|
generation, |
|
|
timestamp |
|
|
]) |
|
|
|
|
|
async def _evaluate_combination_list(self, combinations: List[Dict], benchmark: BIGBenchHard, dev_set: list) -> List[float]: |
|
|
if not combinations: |
|
|
return [] |
|
|
eval_dev_set = dev_set[:50] if len(dev_set) > 50 else dev_set |
|
|
all_scores = [] |
|
|
pbar = aio_tqdm(total=len(combinations), desc="Evaluating batch", leave=False) |
|
|
for combo in combinations: |
|
|
tasks = [self._evaluate_combination_on_example(combo, benchmark, ex) for ex in eval_dev_set] |
|
|
example_scores = await asyncio.gather(*tasks) |
|
|
avg_score = sum(example_scores) / len(example_scores) if example_scores else 0.0 |
|
|
all_scores.append(avg_score) |
|
|
pbar.update(1) |
|
|
pbar.close() |
|
|
return all_scores |
|
|
|
|
|
def _generate_combinations(self, node_populations: Dict[str, List[str]]) -> List[Dict[str, str]]: |
|
|
node_names = list(node_populations.keys()) |
|
|
node_prompts = [node_populations[node] for node in node_names] |
|
|
total_possible = np.prod([len(p) for p in node_prompts if p]) if all(p for p in node_prompts) else 0 |
|
|
|
|
|
if total_possible == 0: |
|
|
logger.warning("Cannot generate combinations, one or more node populations are empty.") |
|
|
return [] |
|
|
|
|
|
if self.combination_sample_size is None: |
|
|
target_size = min(self.population_size, int(total_possible), 200) |
|
|
else: |
|
|
target_size = min(self.combination_sample_size, int(total_possible)) |
|
|
|
|
|
logger.info(f"Total possible combinations: {total_possible}, sampling: {target_size}") |
|
|
|
|
|
if target_size >= total_possible: |
|
|
all_combinations = [] |
|
|
for combination in itertools.product(*node_prompts): |
|
|
combo_dict = {node_names[i]: combination[i] for i in range(len(node_names))} |
|
|
all_combinations.append(combo_dict) |
|
|
return all_combinations |
|
|
|
|
|
sampled_combinations = [] |
|
|
sampled_keys = set() |
|
|
max_attempts = target_size * 5 |
|
|
attempts = 0 |
|
|
|
|
|
while len(sampled_combinations) < target_size and attempts < max_attempts: |
|
|
combination = {name: random.choice(prompts) for name, prompts in node_populations.items()} |
|
|
combo_key = tuple(sorted(combination.items())) |
|
|
if combo_key not in sampled_keys: |
|
|
sampled_combinations.append(combination) |
|
|
sampled_keys.add(combo_key) |
|
|
attempts += 1 |
|
|
|
|
|
logger.info(f"Generated {len(sampled_combinations)} unique combinations") |
|
|
return sampled_combinations |
|
|
|
|
|
async def _evaluate_combination_on_example(self, combination: Dict[str, str], |
|
|
benchmark: BIGBenchHard, example: Dict) -> float: |
|
|
combo_key = tuple(sorted(combination.items())) |
|
|
example_key = str(hash(str(example))) |
|
|
cache_key = hash((combo_key, example_key)) |
|
|
|
|
|
if not hasattr(self, '_eval_cache'): |
|
|
self._eval_cache = {} |
|
|
|
|
|
if cache_key in self._eval_cache: |
|
|
return self._eval_cache[cache_key] |
|
|
|
|
|
async with self.semaphore: |
|
|
try: |
|
|
original_config = self.get_current_cfg() |
|
|
self.apply_cfg(combination) |
|
|
inputs = {k: v for k, v in example.items() if k in benchmark.get_input_keys()} |
|
|
prediction, _ = await asyncio.to_thread(self.program, **inputs) |
|
|
label = benchmark.get_label(example) |
|
|
score_dict = benchmark.evaluate(prediction, label) |
|
|
score = score_dict.get("em", 0.0) |
|
|
self.apply_cfg(original_config) |
|
|
self._eval_cache[cache_key] = score |
|
|
if len(self._eval_cache) > 5000: |
|
|
keys_to_del = list(self._eval_cache.keys())[:1000] |
|
|
for key in keys_to_del: |
|
|
del self._eval_cache[key] |
|
|
return score |
|
|
except Exception as e: |
|
|
logger.error(f"Error evaluating combination: {e}") |
|
|
return 0.0 |
|
|
|
|
|
async def _evaluate_combinations_and_update_node_scores(self, combinations: List[Dict[str, str]], |
|
|
benchmark: BIGBenchHard, dev_set: list) -> List[float]: |
|
|
eval_dev_set = dev_set[:50] if len(dev_set) > 50 else dev_set |
|
|
combination_scores = [] |
|
|
print(f"Evaluating {len(combinations)} combinations on {len(eval_dev_set)} examples...") |
|
|
combo_pbar = aio_tqdm(total=len(combinations), desc="Evaluating Combinations") |
|
|
for combination in combinations: |
|
|
tasks = [self._evaluate_combination_on_example(combination, benchmark, ex) for ex in eval_dev_set] |
|
|
example_scores = await asyncio.gather(*tasks) |
|
|
avg_score = sum(example_scores) / len(example_scores) if example_scores else 0.0 |
|
|
combination_scores.append(avg_score) |
|
|
combo_pbar.update(1) |
|
|
combo_pbar.close() |
|
|
|
|
|
for node_name in self.node_populations.keys(): |
|
|
self.node_scores[node_name] = [0.0] * len(self.node_populations[node_name]) |
|
|
for prompt_idx, prompt in enumerate(self.node_populations[node_name]): |
|
|
participating_scores = [ |
|
|
combo_score for combo_idx, combo_score in enumerate(combination_scores) |
|
|
if combinations[combo_idx].get(node_name) == prompt |
|
|
] |
|
|
if participating_scores: |
|
|
self.node_scores[node_name][prompt_idx] = sum(participating_scores) / len(participating_scores) |
|
|
else: |
|
|
self.node_scores[node_name][prompt_idx] = 0.0 |
|
|
return combination_scores |
|
|
|
|
|
async def _perform_paraphrase(self, prompt: str) -> str: |
|
|
async with self.semaphore: |
|
|
output = await asyncio.to_thread( |
|
|
self.paraphrase_agent, |
|
|
inputs={"instruction": prompt} |
|
|
) |
|
|
return output.content.paraphrased_instruction.strip() |
|
|
|
|
|
async def _perform_evolution(self, agent: Callable, inputs: Dict[str, str]) -> str: |
|
|
async with self.semaphore: |
|
|
output = await asyncio.to_thread(agent, inputs=inputs) |
|
|
if hasattr(output.content, 'evolved_prompt'): |
|
|
return output.content.evolved_prompt.strip() |
|
|
return str(output.content).strip() |
|
|
|
|
|
async def _initialize_node_populations(self, initial_config: Dict[str, any]): |
|
|
for node_name, initial_value in initial_config.items(): |
|
|
node_population = [] |
|
|
if isinstance(initial_value, list): |
|
|
provided_size = len(initial_value) |
|
|
if self.population_size < provided_size: |
|
|
logger.info(f"Node '{node_name}': Provided population ({provided_size}) is larger than target size ({self.population_size}). Randomly sampling.") |
|
|
node_population = random.sample(initial_value, self.population_size) |
|
|
elif self.population_size == provided_size: |
|
|
logger.info(f"Node '{node_name}': Provided population size ({provided_size}) matches target size. Using directly.") |
|
|
node_population = list(initial_value) |
|
|
else: |
|
|
logger.info(f"Node '{node_name}': Target population size ({self.population_size}) is larger than provided ({provided_size}). Expanding.") |
|
|
node_population = list(initial_value) |
|
|
num_to_generate = self.population_size - provided_size |
|
|
source_prompts_for_generation = random.choices(initial_value, k=num_to_generate) |
|
|
paraphrase_tasks = [self._perform_paraphrase(prompt) for prompt in source_prompts_for_generation] |
|
|
new_prompts = await aio_tqdm.gather( |
|
|
*paraphrase_tasks, desc=f"Expanding population for {node_name}" |
|
|
) |
|
|
node_population.extend(new_prompts) |
|
|
elif isinstance(initial_value, str): |
|
|
logger.info(f"Node '{node_name}': Generating population from a single initial prompt.") |
|
|
node_population = [initial_value] |
|
|
if self.population_size > 1: |
|
|
num_to_generate = self.population_size - 1 |
|
|
paraphrase_tasks = [self._perform_paraphrase(initial_value) for _ in range(num_to_generate)] |
|
|
new_prompts = await aio_tqdm.gather( |
|
|
*paraphrase_tasks, desc=f"Generating initial population for {node_name}" |
|
|
) |
|
|
node_population.extend(new_prompts) |
|
|
else: |
|
|
raise TypeError(f"Unsupported type for tracked parameter '{node_name}': {type(initial_value)}. Must be str or list.") |
|
|
self.node_populations[node_name] = node_population |
|
|
self.node_scores[node_name] = [0.0] * self.population_size |
|
|
|
|
|
async def evaluate(self, benchmark: BIGBenchHard, eval_mode: str = "test") -> Dict[str, float]: |
|
|
""" |
|
|
Evaluates the optimized program on a specified dataset. |
|
|
|
|
|
Args: |
|
|
benchmark (BIGBenchHard): The benchmark instance containing the data. |
|
|
eval_mode (str): The evaluation mode, either "test" or "dev". |
|
|
|
|
|
Returns: |
|
|
Dict[str, float]: A dictionary containing evaluation metrics. |
|
|
""" |
|
|
logger.info(f"--- Evaluating optimized program on '{eval_mode}' set ---") |
|
|
|
|
|
dataset = benchmark.get_test_data() if eval_mode == "test" else benchmark.get_dev_data() |
|
|
if not dataset: |
|
|
logger.warning(f"No data found for '{eval_mode}' set. Returning empty results.") |
|
|
return {} |
|
|
|
|
|
async def evaluate_example(example: Dict) -> tuple[float, str]: |
|
|
prediction, _ = await asyncio.to_thread(self.program, input=example["input"]) |
|
|
score_dict = benchmark.evaluate(prediction, benchmark.get_label(example)) |
|
|
score = score_dict.get("em", 0.0) |
|
|
return score, prediction |
|
|
|
|
|
tasks = [evaluate_example(ex) for ex in dataset] |
|
|
results = await aio_tqdm.gather(*tasks, desc=f"Evaluating on {eval_mode.capitalize()} Set") |
|
|
|
|
|
scores, predictions = zip(*results) if results else ([], []) |
|
|
|
|
|
correct_count = sum(scores) |
|
|
total_count = len(dataset) |
|
|
accuracy = correct_count / total_count if total_count > 0 else 0.0 |
|
|
|
|
|
logger.info(f"{eval_mode.capitalize()} Set Accuracy: {accuracy:.4f} ({int(correct_count)}/{total_count})") |
|
|
|
|
|
if self.enable_logging: |
|
|
await self._log_evaluation_details( |
|
|
benchmark, dataset, predictions, scores, eval_mode, |
|
|
accuracy, int(correct_count), total_count |
|
|
) |
|
|
|
|
|
return {"accuracy": accuracy} |
|
|
|
|
|
|
|
|
|
|
|
class GAOptimizer(EvopromptOptimizer): |
|
|
""" |
|
|
Genetic Algorithm optimizer for prompt evolution. |
|
|
|
|
|
This optimizer uses genetic algorithm operations (crossover, mutation, selection) |
|
|
to evolve prompts. It supports both node-based and combination-based evolution. |
|
|
""" |
|
|
|
|
|
def __init__(self, *args, full_evaluation: bool = False, **kwargs): |
|
|
""" |
|
|
Initialize the GA optimizer. |
|
|
|
|
|
Args: |
|
|
full_evaluation: Whether to use full node-based evaluation or combination-based |
|
|
*args: Arguments passed to parent class |
|
|
**kwargs: Keyword arguments passed to parent class |
|
|
""" |
|
|
super().__init__(*args, **kwargs) |
|
|
self.full_evaluation = full_evaluation |
|
|
|
|
|
|
|
|
mode_str = "full_evaluation" if self.full_evaluation else "combination-based" |
|
|
logger.info(f"GAOptimizer initialized with '{mode_str}' mode.") |
|
|
|
|
|
|
|
|
self.ga_agent = CustomizeAgent( |
|
|
name="ga_agent", |
|
|
description="An agent that evolves a new prompt from two parent prompts.", |
|
|
prompt="""Please follow the instructions step-by-step to generate a better prompt. |
|
|
|
|
|
1. Crossover the following prompts to generate a new prompt: |
|
|
Prompt 1: {parent1} |
|
|
Prompt 2: {parent2} |
|
|
|
|
|
2. Mutate the prompt generated in Step 1 and generate a final evolved prompt. Strictly preserve the original XML tags structure. |
|
|
|
|
|
Now process the given prompts and provide your output in the following format: |
|
|
|
|
|
## evolved_prompt |
|
|
[Your evolved prompt here]""", |
|
|
llm_config=self.llm_config, |
|
|
inputs=[ |
|
|
{"name": "parent1", "type": "string", "description": "The first parent prompt."}, |
|
|
{"name": "parent2", "type": "string", "description": "The second parent prompt."} |
|
|
], |
|
|
outputs=[ |
|
|
{"name": "evolved_prompt", "type": "string", "description": "The evolved prompt with XML tags preserved."} |
|
|
], |
|
|
parse_mode="title" |
|
|
) |
|
|
|
|
|
async def _perform_node_evolution(self, node_name: str, node_population: List[str], |
|
|
node_scores: List[float] = None, |
|
|
evolution_agent: Callable = None) -> List[str]: |
|
|
probabilities = None |
|
|
if node_scores: |
|
|
total_fitness = sum(node_scores) |
|
|
if total_fitness > 0: |
|
|
probabilities = [s / total_fitness for s in node_scores] |
|
|
|
|
|
agent = evolution_agent or self.ga_agent |
|
|
|
|
|
num_children_to_create = len(node_population) |
|
|
evolution_tasks = [] |
|
|
for _ in range(num_children_to_create): |
|
|
parents = random.choices(node_population, weights=probabilities, k=2) if probabilities else random.choices(node_population, k=2) |
|
|
task = self._perform_evolution(agent=agent, inputs={"parent1": parents[0], "parent2": parents[1]}) |
|
|
evolution_tasks.append(task) |
|
|
|
|
|
new_children = await aio_tqdm.gather(*evolution_tasks, desc=f"Evolving {node_name}") |
|
|
return new_children |
|
|
|
|
|
async def optimize(self, benchmark: BIGBenchHard) -> tuple[Dict[str, str], dict, dict]: |
|
|
self._setup_logging_directory(benchmark) |
|
|
initial_config = self.get_current_cfg() |
|
|
if not initial_config: |
|
|
raise ValueError("Registry is empty.") |
|
|
await self._initialize_node_populations(initial_config) |
|
|
dev_set = benchmark.get_dev_data() |
|
|
if not dev_set: |
|
|
raise ValueError("Benchmark has no development set.") |
|
|
|
|
|
self._best_score_so_far = -float('inf') |
|
|
self._generations_without_improvement = 0 |
|
|
|
|
|
if self.full_evaluation: |
|
|
logger.info("--- Starting Node-Based Evolution with Makeup Evaluation (full_evaluation=True) ---") |
|
|
|
|
|
print("--- Step 1: Initial evaluation of node combinations... ---") |
|
|
combinations = self._generate_combinations(self.node_populations) |
|
|
combination_scores = await self._evaluate_combinations_and_update_node_scores(combinations, benchmark, dev_set) |
|
|
|
|
|
self._log_generation_summary(0, "Initial") |
|
|
self._log_detailed_evaluation(0, combinations, combination_scores) |
|
|
|
|
|
self.best_scores_per_gen["Gen_0"] = {name: max(scores) if scores else 0 for name, scores in self.node_scores.items()} |
|
|
self.avg_scores_per_gen["Gen_0"] = {name: np.mean(scores) if scores else 0 for name, scores in self.node_scores.items()} |
|
|
|
|
|
if combination_scores: |
|
|
initial_best_combo_score = max(combination_scores) |
|
|
self._best_score_so_far = initial_best_combo_score |
|
|
self.best_combo_scores_per_gen["Gen_0"] = initial_best_combo_score |
|
|
self.avg_combo_scores_per_gen["Gen_0"] = np.mean(combination_scores) |
|
|
logger.info(f"Early stopping baseline set to initial best combination score: {self._best_score_so_far:.4f}") |
|
|
|
|
|
for t in range(self.iterations): |
|
|
generation_start_time = time.time() |
|
|
print(f"\n--- Generation {t + 1}/{self.iterations} ---") |
|
|
|
|
|
children_populations = {} |
|
|
for node_name in self.node_populations.keys(): |
|
|
children = await self._perform_node_evolution( |
|
|
node_name, self.node_populations[node_name], self.node_scores[node_name], self.ga_agent |
|
|
) |
|
|
children_populations[node_name] = children |
|
|
|
|
|
current_populations = { |
|
|
name: self.node_populations[name] + children_populations[name] |
|
|
for name in self.node_populations.keys() |
|
|
} |
|
|
self.node_populations = current_populations |
|
|
|
|
|
print(f"Performing main evaluation for {len(list(current_populations.values())[0])} individuals in each node...") |
|
|
combinations = self._generate_combinations(self.node_populations) |
|
|
combination_scores = await self._evaluate_combinations_and_update_node_scores(combinations, benchmark, dev_set) |
|
|
|
|
|
prompts_needing_makeup = [] |
|
|
for node_name, scores in self.node_scores.items(): |
|
|
for idx, score in enumerate(scores): |
|
|
if score == 0.0: |
|
|
prompt_to_check = self.node_populations[node_name][idx] |
|
|
is_in_combos = any(c.get(node_name) == prompt_to_check for c in combinations) |
|
|
if not is_in_combos: |
|
|
prompts_needing_makeup.append((node_name, idx, prompt_to_check)) |
|
|
|
|
|
if prompts_needing_makeup: |
|
|
print(f"--- Performing makeup evaluation for {len(prompts_needing_makeup)} unsampled individuals... ---") |
|
|
makeup_combinations = [] |
|
|
for node_name, idx, prompt in prompts_needing_makeup: |
|
|
makeup_combo = {name: random.choice(pop) for name, pop in self.node_populations.items()} |
|
|
makeup_combo[node_name] = prompt |
|
|
makeup_combinations.append(makeup_combo) |
|
|
|
|
|
makeup_scores = await self._evaluate_combination_list(makeup_combinations, benchmark, dev_set) |
|
|
|
|
|
for i, (node_name, idx, prompt) in enumerate(prompts_needing_makeup): |
|
|
self.node_scores[node_name][idx] = makeup_scores[i] |
|
|
logger.info(f"Updated score for '{prompt[:30]}...' to {makeup_scores[i]:.4f} after makeup eval.") |
|
|
|
|
|
print("--- Selecting survivors for the next generation... ---") |
|
|
survivor_populations = {} |
|
|
survivor_scores = {} |
|
|
for node_name in self.node_populations.keys(): |
|
|
population = self.node_populations[node_name] |
|
|
scores = self.node_scores[node_name] |
|
|
sorted_pairs = sorted(zip(scores, population), key=lambda x: x[0], reverse=True) |
|
|
selected_pairs = sorted_pairs[:self.population_size] |
|
|
|
|
|
if selected_pairs: |
|
|
selected_scores, selected_population = zip(*selected_pairs) |
|
|
survivor_scores[node_name] = list(selected_scores) |
|
|
survivor_populations[node_name] = list(selected_population) |
|
|
else: |
|
|
survivor_scores[node_name], survivor_populations[node_name] = [], [] |
|
|
print(f"Node {node_name}: Selected top {len(survivor_populations[node_name])} from {len(population)} individuals") |
|
|
|
|
|
self.node_populations = survivor_populations |
|
|
self.node_scores = survivor_scores |
|
|
|
|
|
generation_time = time.time() - generation_start_time |
|
|
print(f"Generation {t + 1} completed in {generation_time:.2f}s") |
|
|
self._log_generation_summary(t + 1, "Evolution") |
|
|
if combination_scores: |
|
|
self._log_detailed_evaluation(t + 1, combinations, combination_scores) |
|
|
|
|
|
gen_name = f"Gen_{t + 1}" |
|
|
self.best_scores_per_gen[gen_name] = {name: max(scores) if scores else 0 for name, scores in self.node_scores.items()} |
|
|
self.avg_scores_per_gen[gen_name] = {name: np.mean(scores) if scores else 0 for name, scores in self.node_scores.items()} |
|
|
|
|
|
best_combo_score_this_gen = max(combination_scores) if combination_scores else -float('inf') |
|
|
self.best_combo_scores_per_gen[gen_name] = best_combo_score_this_gen |
|
|
self.avg_combo_scores_per_gen[gen_name] = np.mean(combination_scores) if combination_scores else 0.0 |
|
|
|
|
|
if self.enable_early_stopping: |
|
|
if best_combo_score_this_gen > self._best_score_so_far + 1e-6: |
|
|
self._best_score_so_far = best_combo_score_this_gen |
|
|
self._generations_without_improvement = 0 |
|
|
logger.info(f"Early stopping: New best combination score found: {self._best_score_so_far:.4f}.") |
|
|
else: |
|
|
self._generations_without_improvement += 1 |
|
|
logger.info(f"Early stopping: No improvement for {self._generations_without_improvement} generation(s).") |
|
|
|
|
|
if self._generations_without_improvement >= self.early_stopping_patience: |
|
|
logger.warning(f"\n--- EARLY STOPPING TRIGGERED at generation {t + 1} ---") |
|
|
break |
|
|
|
|
|
else: |
|
|
logger.info("--- Starting Combo-Based Evolution (full_evaluation=False) ---") |
|
|
|
|
|
print("--- Step 1: Creating and evaluating initial combination population... ---") |
|
|
initial_combinations = self._generate_combinations(self.node_populations) |
|
|
initial_scores = await self._evaluate_combination_list(initial_combinations, benchmark, dev_set) |
|
|
|
|
|
combo_population_with_scores = sorted(zip(initial_combinations, initial_scores), key=lambda x: x[1], reverse=True) |
|
|
combo_population_with_scores = combo_population_with_scores[:self.population_size] |
|
|
|
|
|
gen_0_scores = [score for _, score in combo_population_with_scores] |
|
|
if gen_0_scores: |
|
|
best_gen_score = max(gen_0_scores) |
|
|
avg_gen_score = np.mean(gen_0_scores) |
|
|
self.best_combo_scores_per_gen["Gen_0"] = best_gen_score |
|
|
self.avg_combo_scores_per_gen["Gen_0"] = avg_gen_score |
|
|
self._best_score_so_far = best_gen_score |
|
|
print(f"Generation 0 complete. Best score: {best_gen_score:.4f}, Avg score: {avg_gen_score:.4f}") |
|
|
logger.info(f"Early stopping baseline set to: {self._best_score_so_far:.4f}") |
|
|
|
|
|
self._log_generation(0, combo_population_with_scores) |
|
|
|
|
|
for t in range(self.iterations): |
|
|
print(f"\n--- Generation {t + 1}/{self.iterations} (Combo Evolution) ---") |
|
|
|
|
|
parent_prompts_for_node = {name: [] for name in initial_config.keys()} |
|
|
for combo, _ in combo_population_with_scores: |
|
|
for node_name, prompt in combo.items(): |
|
|
parent_prompts_for_node[node_name].append(prompt) |
|
|
|
|
|
children_populations = {} |
|
|
for node_name, prompts in parent_prompts_for_node.items(): |
|
|
children_populations[node_name] = await self._perform_node_evolution(node_name, prompts) |
|
|
|
|
|
print("Evaluating new child combinations...") |
|
|
child_combinations = self._generate_combinations(children_populations) |
|
|
child_scores = await self._evaluate_combination_list(child_combinations, benchmark, dev_set) |
|
|
child_combos_with_scores = list(zip(child_combinations, child_scores)) |
|
|
|
|
|
print("Selecting best combinations from parents and children...") |
|
|
combined_population = combo_population_with_scores + child_combos_with_scores |
|
|
|
|
|
sorted_combos = sorted(combined_population, key=lambda x: x[1], reverse=True) |
|
|
combo_population_with_scores = sorted_combos[:self.population_size] |
|
|
|
|
|
self._log_generation(t + 1, combo_population_with_scores) |
|
|
current_scores = [score for _, score in combo_population_with_scores] |
|
|
best_gen_score = max(current_scores) if current_scores else 0 |
|
|
avg_gen_score = np.mean(current_scores) if current_scores else 0 |
|
|
|
|
|
gen_name = f"Gen_{t + 1}" |
|
|
self.best_combo_scores_per_gen[gen_name] = best_gen_score |
|
|
self.avg_combo_scores_per_gen[gen_name] = avg_gen_score |
|
|
print(f"Generation {t + 1} complete. Best score: {best_gen_score:.4f}, Avg score: {avg_gen_score:.4f}") |
|
|
|
|
|
if self.enable_early_stopping: |
|
|
if best_gen_score > self._best_score_so_far + 1e-6: |
|
|
self._best_score_so_far = best_gen_score |
|
|
self._generations_without_improvement = 0 |
|
|
logger.info(f"Early stopping: New best combination score found: {self._best_score_so_far:.4f}. Patience counter reset.") |
|
|
else: |
|
|
self._generations_without_improvement += 1 |
|
|
logger.info(f"Early stopping: No improvement for {self._generations_without_improvement} generation(s). Patience: {self.early_stopping_patience}.") |
|
|
|
|
|
if self._generations_without_improvement >= self.early_stopping_patience: |
|
|
logger.warning(f"\n--- EARLY STOPPING TRIGGERED at generation {t + 1} ---") |
|
|
break |
|
|
|
|
|
print("\n--- Evolution complete ---") |
|
|
if self.full_evaluation: |
|
|
best_config = { |
|
|
name: self.node_populations[name][np.argmax(self.node_scores[name])] |
|
|
for name in self.node_populations.keys() if self.node_populations.get(name) and self.node_scores.get(name) |
|
|
} |
|
|
else: |
|
|
best_config, _ = max(combo_population_with_scores, key=lambda x: x[1]) if combo_population_with_scores else ({}, 0) |
|
|
|
|
|
self._log_optimization_summary("GA", best_config) |
|
|
self.apply_cfg(best_config) |
|
|
logger.info("Optimization finished! The best configuration has been applied to the program.") |
|
|
|
|
|
return best_config, self.best_combo_scores_per_gen, self.avg_scores_per_gen |
|
|
|
|
|
|
|
|
class DEOptimizer(EvopromptOptimizer): |
|
|
""" |
|
|
Differential Evolution optimizer for prompt evolution. |
|
|
|
|
|
This optimizer uses differential evolution strategy for prompt optimization, |
|
|
including mutation, crossover, and selection operations based on DE principles. |
|
|
""" |
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
""" |
|
|
Initialize the DE optimizer. |
|
|
|
|
|
Args: |
|
|
*args: Arguments passed to parent class |
|
|
**kwargs: Keyword arguments passed to parent class |
|
|
""" |
|
|
super().__init__(*args, **kwargs) |
|
|
|
|
|
|
|
|
self.de_agent = CustomizeAgent( |
|
|
name="DE_Agent", |
|
|
description="Generates a new trial prompt using the Differential Evolution strategy.", |
|
|
prompt="""Please follow the instructions step-by-step to generate a better prompt using Differential Evolution strategy. |
|
|
|
|
|
1. Identify the different parts between these two donor prompts: |
|
|
Donor Prompt 1: {donor1} |
|
|
Donor Prompt 2: {donor2} |
|
|
|
|
|
2. Randomly mutate the different parts identified above. |
|
|
|
|
|
3. Combine the mutated parts with the best prompt, selectively replacing its content: |
|
|
Best Prompt: {best_prompt} |
|
|
|
|
|
4. Crossover the result from Step 3 with the current prompt to generate the final evolved prompt. Strictly preserve the original XML tags structure: |
|
|
Current Prompt: {current_prompt} |
|
|
|
|
|
Please provide the final evolved prompt in the following format: |
|
|
|
|
|
## evolved_prompt |
|
|
[Your evolved prompt here]""", |
|
|
llm_config=self.llm_config, |
|
|
inputs=[ |
|
|
{"name": "current_prompt", "type": "string", "description": "The base prompt to be mutated, p_i."}, |
|
|
{"name": "donor1", "type": "string", "description": "The first donor prompt, p_r1."}, |
|
|
{"name": "donor2", "type": "string", "description": "The second donor prompt, p_r2."}, |
|
|
{"name": "best_prompt", "type": "string", "description": "The best prompt found so far in the population, p_best."}, |
|
|
], |
|
|
outputs=[ |
|
|
{"name": "evolved_prompt", "type": "string", "description": "The evolved prompt with XML tags preserved."} |
|
|
], |
|
|
parse_mode="title" |
|
|
) |
|
|
|
|
|
async def _evolve_and_select_one( |
|
|
self, |
|
|
target_combo_with_score: tuple, |
|
|
full_pop_with_scores: List[tuple], |
|
|
benchmark: BIGBenchHard, |
|
|
dev_set: list |
|
|
) -> tuple: |
|
|
""" |
|
|
Evolve a single combination using differential evolution and select the better one. |
|
|
|
|
|
Args: |
|
|
target_combo_with_score: The target combination and its score |
|
|
full_pop_with_scores: The full population with scores |
|
|
benchmark: The benchmark for evaluation |
|
|
dev_set: Development set for evaluation |
|
|
|
|
|
Returns: |
|
|
Tuple of the better combination (target or trial) and its score |
|
|
""" |
|
|
target_combo, target_score = target_combo_with_score |
|
|
best_combo, _ = max(full_pop_with_scores, key=lambda x: x[1]) |
|
|
|
|
|
|
|
|
donor_pool = [c for c in full_pop_with_scores if c[0] != target_combo] |
|
|
if len(donor_pool) < 2: |
|
|
donors = random.choices(full_pop_with_scores, k=2) |
|
|
else: |
|
|
donors = random.sample(donor_pool, 2) |
|
|
donor1_combo, _ = donors[0] |
|
|
donor2_combo, _ = donors[1] |
|
|
|
|
|
|
|
|
evolution_tasks = [] |
|
|
node_names = list(target_combo.keys()) |
|
|
for node_name in node_names: |
|
|
task = self._perform_evolution( |
|
|
agent=self.de_agent, |
|
|
inputs={ |
|
|
"current_prompt": target_combo[node_name], |
|
|
"donor1": donor1_combo[node_name], |
|
|
"donor2": donor2_combo[node_name], |
|
|
"best_prompt": best_combo[node_name] |
|
|
} |
|
|
) |
|
|
evolution_tasks.append(task) |
|
|
|
|
|
|
|
|
evolved_components = await asyncio.gather(*evolution_tasks) |
|
|
trial_combo = {name: comp for name, comp in zip(node_names, evolved_components)} |
|
|
trial_scores = await self._evaluate_combination_list([trial_combo], benchmark, dev_set) |
|
|
trial_score = trial_scores[0] |
|
|
|
|
|
|
|
|
return (trial_combo, trial_score) if trial_score > target_score else (target_combo, target_score) |
|
|
|
|
|
async def optimize(self, benchmark: BIGBenchHard) -> tuple[Dict[str, str], dict, dict]: |
|
|
self._setup_logging_directory(benchmark) |
|
|
initial_config = self.get_current_cfg() |
|
|
if not initial_config: |
|
|
raise ValueError("Registry is empty.") |
|
|
logger.info("Optimizing with DEOptimizer (Pipelined Combination Evolution).") |
|
|
await self._initialize_node_populations(initial_config) |
|
|
dev_set = benchmark.get_dev_data() |
|
|
if not dev_set: |
|
|
raise ValueError("Benchmark has no development set.") |
|
|
|
|
|
self._best_score_so_far = -float('inf') |
|
|
self._generations_without_improvement = 0 |
|
|
|
|
|
print("--- Step 1: Creating and evaluating initial combination population... ---") |
|
|
initial_combinations = self._generate_combinations(self.node_populations) |
|
|
initial_scores = await self._evaluate_combination_list(initial_combinations, benchmark, dev_set) |
|
|
combo_pop_with_scores = list(zip(initial_combinations, initial_scores)) |
|
|
|
|
|
self._log_generation(0, combo_pop_with_scores) |
|
|
initial_best = max(initial_scores) if initial_scores else 0 |
|
|
initial_avg = np.mean(initial_scores) if initial_scores else 0 |
|
|
self.best_combo_scores_per_gen["Gen_0"] = initial_best |
|
|
self.avg_combo_scores_per_gen["Gen_0"] = initial_avg |
|
|
print(f"Initial population - Best score: {initial_best:.4f}, Avg score: {initial_avg:.4f}") |
|
|
|
|
|
if initial_scores: |
|
|
self._best_score_so_far = initial_best |
|
|
|
|
|
for t in range(self.iterations): |
|
|
print(f"\n--- Generation {t + 1}/{self.iterations} ---") |
|
|
evolution_pipeline_tasks = [ |
|
|
self._evolve_and_select_one(combo_with_score, combo_pop_with_scores, benchmark, dev_set) |
|
|
for combo_with_score in combo_pop_with_scores |
|
|
] |
|
|
pbar = aio_tqdm(total=len(evolution_pipeline_tasks), desc=f"Pipelined Evolution Gen {t+1}") |
|
|
next_gen_pop_with_scores = [] |
|
|
for future in asyncio.as_completed(evolution_pipeline_tasks): |
|
|
result = await future |
|
|
next_gen_pop_with_scores.append(result) |
|
|
pbar.update(1) |
|
|
pbar.close() |
|
|
combo_pop_with_scores = next_gen_pop_with_scores |
|
|
|
|
|
self._log_generation(t + 1, combo_pop_with_scores) |
|
|
current_scores = [score for _, score in combo_pop_with_scores] |
|
|
best_gen_score = max(current_scores) if current_scores else 0 |
|
|
avg_gen_score = np.mean(current_scores) if current_scores else 0 |
|
|
|
|
|
gen_name = f"Gen_{t + 1}" |
|
|
self.best_combo_scores_per_gen[gen_name] = best_gen_score |
|
|
self.avg_combo_scores_per_gen[gen_name] = avg_gen_score |
|
|
print(f"Generation {t + 1} complete. Best score: {best_gen_score:.4f}, Avg score: {avg_gen_score:.4f}") |
|
|
|
|
|
if self.enable_early_stopping: |
|
|
if best_gen_score > self._best_score_so_far + 1e-6: |
|
|
self._best_score_so_far = best_gen_score |
|
|
self._generations_without_improvement = 0 |
|
|
logger.info(f"Early stopping: New best score found: {self._best_score_so_far:.4f}. Patience counter reset.") |
|
|
else: |
|
|
self._generations_without_improvement += 1 |
|
|
logger.info(f"Early stopping: No improvement for {self._generations_without_improvement} generation(s). Patience: {self.early_stopping_patience}.") |
|
|
|
|
|
if self._generations_without_improvement >= self.early_stopping_patience: |
|
|
logger.warning(f"\n--- EARLY STOPPING TRIGGERED at generation {t + 1} ---") |
|
|
logger.warning(f"No improvement in best score for {self.early_stopping_patience} consecutive generations.") |
|
|
break |
|
|
|
|
|
print("\n--- Combination-Level Evolution complete ---") |
|
|
best_combination, best_score = max(combo_pop_with_scores, key=lambda x: x[1]) if combo_pop_with_scores else ({}, 0) |
|
|
logger.info(f"Optimization finished! Best combination found with score {best_score:.4f}.") |
|
|
|
|
|
self._log_optimization_summary("DE", best_combination) |
|
|
self.apply_cfg(best_combination) |
|
|
return best_combination, self.best_combo_scores_per_gen, self.avg_combo_scores_per_gen |