| import gradio as gr |
| import os |
| import yaml |
| import json |
| import random |
| from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names |
| from openai import OpenAI |
| from openevolve import run_evolution |
| from typing import Dict, List, Tuple, Optional |
| import tempfile |
| import shutil |
| import requests |
| import glob |
|
|
| |
| |
| FREE_MODELS = [ |
| "qwen/qwen-2.5-72b-instruct:free", |
| "meta-llama/llama-3.3-70b-instruct:free", |
| "google/gemma-3-27b-it:free", |
| "mistralai/mistral-small-3.1-24b-instruct:free", |
| "deepseek/deepseek-r1:free", |
| ] |
|
|
|
|
| def validate_dataset(dataset_name: str, split: str, input_field: str, target_field: str) -> Tuple[bool, str]: |
| """ |
| Validate that the dataset exists and has the required fields. |
| |
| Returns: |
| Tuple of (is_valid, error_message) |
| """ |
| try: |
| |
| if not dataset_name or dataset_name.strip() == "": |
| return False, "❌ Dataset name cannot be empty" |
|
|
| dataset_name = dataset_name.strip() |
|
|
| |
| hf_token = os.environ.get("HF_TOKEN", None) |
| headers = {} |
| if hf_token: |
| headers["Authorization"] = f"Bearer {hf_token}" |
|
|
| |
| api_url = f"https://huggingface.co/api/datasets/{dataset_name}" |
| response = requests.get(api_url, headers=headers, timeout=10) |
|
|
| if response.status_code == 404: |
| return False, f"❌ Dataset '{dataset_name}' not found on HuggingFace Hub. Please use the full dataset name (e.g., 'stanfordnlp/imdb' or 'gsm8k')" |
| elif response.status_code != 200: |
| |
| print(f"Warning: Could not verify dataset via API (status {response.status_code}), attempting to load...") |
|
|
| |
| print(f"Loading dataset {dataset_name} with split {split}...") |
|
|
| |
| try: |
| available_splits = get_dataset_split_names(dataset_name) |
| if split not in available_splits: |
| return False, f"❌ Split '{split}' not found. Available splits: {', '.join(available_splits)}" |
| except Exception as e: |
| print(f"Could not get split names: {e}. Will try to load anyway...") |
|
|
| |
| dataset = load_dataset(dataset_name, split=split, streaming=True) |
|
|
| |
| first_example = next(iter(dataset)) |
| available_fields = list(first_example.keys()) |
|
|
| |
| if input_field not in available_fields: |
| return False, f"❌ Input field '{input_field}' not found. Available fields: {', '.join(available_fields)}" |
|
|
| |
| if target_field not in available_fields: |
| return False, f"❌ Target field '{target_field}' not found. Available fields: {', '.join(available_fields)}" |
|
|
| |
| return True, f"✅ Dataset validated successfully! Fields '{input_field}' and '{target_field}' found." |
|
|
| except Exception as e: |
| error_msg = str(e) |
| if "404" in error_msg or "not found" in error_msg.lower(): |
| return False, f"❌ Dataset '{dataset_name}' not found. Please check the dataset name (use format: org/dataset-name)" |
| return False, f"❌ Error validating dataset: {error_msg}" |
|
|
|
|
| def validate_inputs(dataset_name: str, split: str, input_field: str, target_field: str, |
| initial_prompt: str) -> Tuple[bool, str]: |
| """ |
| Validate all inputs before starting optimization. |
| |
| Returns: |
| Tuple of (is_valid, message) |
| """ |
| |
| api_key = os.environ.get("OPENAI_API_KEY") |
| if not api_key: |
| return False, "❌ OPENAI_API_KEY environment variable not set. Please set it in the Space secrets." |
|
|
| |
| if "{input}" not in initial_prompt: |
| return False, "❌ Prompt must contain '{input}' placeholder for dataset inputs" |
|
|
| |
| dataset_name = dataset_name.strip() |
| if not dataset_name: |
| return False, "❌ Dataset name cannot be empty" |
|
|
| |
| is_valid, message = validate_dataset(dataset_name, split, input_field, target_field) |
| if not is_valid: |
| return False, message |
|
|
| return True, message |
|
|
|
|
| def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int, |
| model: str, input_field: str, target_field: str) -> Dict: |
| """Evaluate a prompt on a dataset using the selected model.""" |
| try: |
| |
| api_key = os.environ.get("OPENAI_API_KEY") |
| if not api_key: |
| return { |
| "error": "OPENAI_API_KEY not set in environment", |
| "accuracy": 0, |
| "correct": 0, |
| "total": 0, |
| "results": [] |
| } |
|
|
| |
| dataset = load_dataset(dataset_name, split=split, streaming=False) |
|
|
| |
| if len(dataset) > num_samples: |
| indices = random.sample(range(len(dataset)), num_samples) |
| samples = [dataset[i] for i in indices] |
| else: |
| samples = list(dataset)[:num_samples] |
|
|
| |
| client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
|
|
| correct = 0 |
| total = 0 |
| results = [] |
| errors = [] |
|
|
| for idx, sample in enumerate(samples): |
| try: |
| |
| input_text = sample.get(input_field, "") |
| if isinstance(input_text, dict): |
| input_text = str(input_text) |
|
|
| target = sample.get(target_field, "") |
| if isinstance(target, dict): |
| target = str(target) |
|
|
| |
| formatted_prompt = prompt.replace("{input}", str(input_text)) |
|
|
| |
| response = client.chat.completions.create( |
| model=model, |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant."}, |
| {"role": "user", "content": formatted_prompt} |
| ], |
| temperature=0.1, |
| max_tokens=500, |
| ) |
|
|
| prediction = response.choices[0].message.content.strip() |
|
|
| |
| target_str = str(target).lower().strip() |
| pred_lower = prediction.lower() |
|
|
| |
| is_correct = target_str in pred_lower |
|
|
| |
| if not is_correct: |
| |
| if target_str in ["1", "positive", "pos"]: |
| is_correct = any(word in pred_lower for word in ["positive", "good", "great"]) |
| elif target_str in ["0", "negative", "neg"]: |
| is_correct = any(word in pred_lower for word in ["negative", "bad", "poor"]) |
|
|
| if is_correct: |
| correct += 1 |
| total += 1 |
|
|
| results.append({ |
| "input": str(input_text)[:100] + "..." if len(str(input_text)) > 100 else str(input_text), |
| "target": str(target), |
| "prediction": prediction[:100] + "..." if len(prediction) > 100 else prediction, |
| "correct": is_correct |
| }) |
|
|
| except Exception as e: |
| error_msg = f"Sample {idx+1}: {str(e)}" |
| print(f"Error evaluating sample {idx+1}: {e}") |
| errors.append(error_msg) |
| |
| if len(errors) > len(samples) // 2: |
| print(f"Too many errors ({len(errors)} out of {len(samples)}), stopping evaluation") |
| break |
| continue |
|
|
| accuracy = (correct / total * 100) if total > 0 else 0 |
|
|
| result_dict = { |
| "accuracy": accuracy, |
| "correct": correct, |
| "total": total, |
| "results": results |
| } |
|
|
| |
| if errors: |
| result_dict["errors"] = errors |
| if total == 0: |
| |
| result_dict["error"] = f"All {len(samples)} samples failed to evaluate. First few errors:\n" + "\n".join(errors[:3]) |
|
|
| return result_dict |
|
|
| except Exception as e: |
| return { |
| "error": str(e), |
| "accuracy": 0, |
| "correct": 0, |
| "total": 0, |
| "results": [] |
| } |
|
|
|
|
| def collect_prompt_history(output_dir: str) -> List[Dict]: |
| """ |
| Collect all prompts discovered during evolution with their scores. |
| |
| Returns a list of dicts with: {prompt, score, iteration, id} |
| """ |
| try: |
| prompts = [] |
|
|
| |
| program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt"))) |
|
|
| |
| log_dir = os.path.join(output_dir, "logs") |
|
|
| for pfile in program_files: |
| try: |
| with open(pfile, 'r') as f: |
| prompt_content = f.read() |
|
|
| |
| prog_id = os.path.basename(pfile).replace("program_", "").replace(".txt", "") |
|
|
| prompts.append({ |
| "prompt": prompt_content, |
| "id": prog_id, |
| "file": pfile |
| }) |
| except: |
| continue |
|
|
| return prompts |
| except Exception as e: |
| print(f"Error collecting prompt history: {e}") |
| return [] |
|
|
|
|
| def parse_evolution_history(output_dir: str) -> str: |
| """ |
| Parse evolution history from OpenEvolve output directory. |
| |
| Returns a markdown string with visualization of the evolution process. |
| """ |
| try: |
| evolution_viz = "## 🧬 Evolution Progress\n\n" |
|
|
| |
| generation_files = sorted(glob.glob(os.path.join(output_dir, "generation_*.txt"))) |
| log_file = os.path.join(output_dir, "evolution.log") |
|
|
| |
| if generation_files: |
| evolution_viz += "### Generation-by-Generation Progress\n\n" |
| for gen_file in generation_files: |
| gen_num = os.path.basename(gen_file).replace("generation_", "").replace(".txt", "") |
| try: |
| with open(gen_file, 'r') as f: |
| content = f.read() |
| evolution_viz += f"**Generation {gen_num}:**\n```\n{content[:200]}{'...' if len(content) > 200 else ''}\n```\n\n" |
| except: |
| pass |
|
|
| |
| elif os.path.exists(log_file): |
| evolution_viz += "### Evolution Log\n\n" |
| try: |
| with open(log_file, 'r') as f: |
| log_content = f.read() |
| evolution_viz += f"```\n{log_content[-1000:]}\n```\n\n" |
| except: |
| pass |
|
|
| |
| scores_file = os.path.join(output_dir, "scores.json") |
| if os.path.exists(scores_file): |
| try: |
| with open(scores_file, 'r') as f: |
| scores = json.load(f) |
|
|
| evolution_viz += "### Score Progression\n\n" |
| evolution_viz += "| Generation | Best Score | Avg Score | Population |\n" |
| evolution_viz += "|------------|-----------|-----------|------------|\n" |
|
|
| for gen in scores: |
| evolution_viz += f"| {gen['generation']} | {gen['best']:.3f} | {gen['avg']:.3f} | {gen['population']} |\n" |
|
|
| evolution_viz += "\n" |
| except: |
| pass |
|
|
| |
| program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt"))) |
| if program_files: |
| evolution_viz += f"### Explored Variants\n\n" |
| evolution_viz += f"OpenEvolve explored {len(program_files)} different prompt variants during evolution.\n\n" |
|
|
| |
| if len(program_files) > 3: |
| sample_files = [program_files[0], program_files[len(program_files)//2], program_files[-2]] |
| evolution_viz += "**Sample Intermediate Prompts:**\n\n" |
| for idx, pfile in enumerate(sample_files, 1): |
| try: |
| with open(pfile, 'r') as f: |
| prompt_content = f.read() |
| evolution_viz += f"**Variant {idx}:**\n```\n{prompt_content[:150]}{'...' if len(prompt_content) > 150 else ''}\n```\n\n" |
| except: |
| pass |
|
|
| |
| if not generation_files and not os.path.exists(log_file) and not os.path.exists(scores_file): |
| evolution_viz += "### Evolution Complete\n\n" |
| evolution_viz += "OpenEvolve ran 10 iterations of evolutionary optimization using:\n" |
| evolution_viz += "- **Population Size**: 10 prompts per generation\n" |
| evolution_viz += "- **Selection Strategy**: 10% elite, 30% explore, 60% exploit\n" |
| evolution_viz += "- **Islands**: 1 population with mutation and crossover\n" |
| evolution_viz += "- **Evaluation**: 100 samples per prompt variant\n\n" |
|
|
| |
| all_files = os.listdir(output_dir) |
| evolution_viz += f"Generated {len(all_files)} files during evolution process.\n\n" |
|
|
| return evolution_viz |
|
|
| except Exception as e: |
| return f"## 🧬 Evolution Progress\n\nEvolution completed successfully. Unable to parse detailed history: {str(e)}\n\n" |
|
|
|
|
| def create_evaluator_file(dataset_name: str, split: str, model: str, |
| input_field: str, target_field: str, work_dir: str): |
| """Create an evaluator.py file for OpenEvolve with staged/cascading evaluation.""" |
| evaluator_code = f''' |
| import os |
| import random |
| from datasets import load_dataset |
| from openai import OpenAI |
| |
| def evaluate(prompt: str) -> dict: |
| """ |
| Evaluate a prompt using 2-stage cascading evaluation to save API calls. |
| |
| Stage 1: Evaluate with 20 samples |
| - If accuracy >= 0.5, proceed to Stage 2 |
| - If accuracy < 0.5, return early (no point wasting 80 more samples) |
| |
| Stage 2: Evaluate with 80 more samples (total 100) |
| - Combine results for final score |
| |
| Returns dict with combined_score (0-1), accuracy, correct, and total. |
| """ |
| try: |
| # Load dataset |
| dataset = load_dataset("{dataset_name}", split="{split}", streaming=False) |
| |
| # Initialize OpenAI client |
| api_key = os.environ.get("OPENAI_API_KEY") |
| client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
| |
| def evaluate_samples(samples, correct_so_far=0, total_so_far=0): |
| """Helper function to evaluate a batch of samples.""" |
| correct = correct_so_far |
| total = total_so_far |
| |
| for sample in samples: |
| try: |
| # Get input and target |
| input_text = sample.get("{input_field}", "") |
| if isinstance(input_text, dict): |
| input_text = str(input_text) |
| |
| target = sample.get("{target_field}", "") |
| if isinstance(target, dict): |
| target = str(target) |
| |
| # Format the prompt |
| formatted_prompt = prompt.replace("{{input}}", str(input_text)) |
| |
| # Call the model |
| response = client.chat.completions.create( |
| model="{model}", |
| messages=[ |
| {{"role": "system", "content": "You are a helpful assistant."}}, |
| {{"role": "user", "content": formatted_prompt}} |
| ], |
| temperature=0.1, |
| max_tokens=500, |
| ) |
| |
| prediction = response.choices[0].message.content.strip() |
| |
| # Smart evaluation - handle both exact match and semantic match |
| target_str = str(target).lower().strip() |
| pred_lower = prediction.lower() |
| |
| # Check exact match first |
| is_correct = target_str in pred_lower |
| |
| # If not exact match, check for semantic equivalents (e.g., "1" = "positive") |
| if not is_correct: |
| # Common sentiment mappings |
| if target_str in ["1", "positive", "pos"]: |
| is_correct = any(word in pred_lower for word in ["positive", "good", "great"]) |
| elif target_str in ["0", "negative", "neg"]: |
| is_correct = any(word in pred_lower for word in ["negative", "bad", "poor"]) |
| |
| if is_correct: |
| correct += 1 |
| total += 1 |
| |
| except Exception as e: |
| print(f"Error evaluating sample: {{e}}") |
| continue |
| |
| return correct, total |
| |
| # STAGE 1: Evaluate with 20 samples first |
| stage1_size = 20 |
| stage1_samples_count = min(stage1_size, len(dataset)) |
| |
| if len(dataset) > stage1_samples_count: |
| stage1_indices = random.sample(range(len(dataset)), stage1_samples_count) |
| stage1_samples = [dataset[i] for i in stage1_indices] |
| else: |
| stage1_samples = list(dataset)[:stage1_samples_count] |
| |
| print(f"[Stage 1/2] Evaluating with {{len(stage1_samples)}} samples...") |
| correct, total = evaluate_samples(stage1_samples) |
| stage1_score = (correct / total) if total > 0 else 0.0 |
| |
| print(f"[Stage 1/2] Score: {{stage1_score:.3f}} ({{correct}}/{{total}})") |
| |
| # Early exit if Stage 1 score is below threshold |
| if stage1_score < 0.5: |
| print(f"[Stage 1/2] Score below 0.5 threshold - skipping Stage 2 (saved 80 API calls)") |
| return {{ |
| "combined_score": stage1_score, |
| "accuracy": stage1_score, |
| "correct": correct, |
| "total": total, |
| "stage": "stage1_early_exit" |
| }} |
| |
| # STAGE 2: Continue with 80 more samples |
| print(f"[Stage 2/2] Score >= 0.5 - proceeding with 80 more samples...") |
| stage2_size = 80 |
| stage2_samples_count = min(stage2_size, max(0, len(dataset) - stage1_samples_count)) |
| |
| if stage2_samples_count > 0: |
| # Get different samples from Stage 1 |
| remaining_indices = list(set(range(len(dataset))) - set(stage1_indices if 'stage1_indices' in locals() else [])) |
| |
| if len(remaining_indices) >= stage2_samples_count: |
| stage2_indices = random.sample(remaining_indices, stage2_samples_count) |
| stage2_samples = [dataset[i] for i in stage2_indices] |
| else: |
| stage2_samples = [dataset[i] for i in remaining_indices[:stage2_samples_count]] |
| |
| correct, total = evaluate_samples(stage2_samples, correct, total) |
| final_score = (correct / total) if total > 0 else stage1_score |
| |
| print(f"[Stage 2/2] Final score: {{final_score:.3f}} ({{correct}}/{{total}})") |
| return {{ |
| "combined_score": final_score, |
| "accuracy": final_score, |
| "correct": correct, |
| "total": total, |
| "stage": "stage2_complete" |
| }} |
| else: |
| print(f"[Stage 2/2] Not enough samples in dataset for Stage 2") |
| return {{ |
| "combined_score": stage1_score, |
| "accuracy": stage1_score, |
| "correct": correct, |
| "total": total, |
| "stage": "stage1_complete" |
| }} |
| |
| except Exception as e: |
| print(f"Error in evaluation: {{e}}") |
| return {{ |
| "combined_score": 0.0, |
| "accuracy": 0.0, |
| "correct": 0, |
| "total": 0, |
| "error": str(e) |
| }} |
| ''' |
|
|
| evaluator_path = os.path.join(work_dir, "evaluator.py") |
| with open(evaluator_path, "w") as f: |
| f.write(evaluator_code) |
|
|
| return evaluator_path |
|
|
|
|
| def create_config_file(model: str, work_dir: str): |
| """Create a config.yaml file for OpenEvolve.""" |
| config = { |
| "llm": { |
| "primary_model": model, |
| "api_base": "https://openrouter.ai/api/v1", |
| "temperature": 0.7, |
| }, |
| "max_iterations": 10, |
| "evolution": { |
| "population_size": 10, |
| "num_islands": 1, |
| "elite_ratio": 0.1, |
| "explore_ratio": 0.3, |
| "exploit_ratio": 0.6, |
| }, |
| "evaluator": { |
| "timeout": None, |
| "cascade_evaluation": False, |
| "parallel_evaluations": 1, |
| "distributed": False, |
| } |
| } |
|
|
| config_path = os.path.join(work_dir, "config.yaml") |
| with open(config_path, "w") as f: |
| yaml.dump(config, f) |
|
|
| return config_path |
|
|
|
|
| def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str, |
| model: str, input_field: str, target_field: str, |
| progress=gr.Progress()) -> Tuple[str, str, str, str, List[str], int, int]: |
| """Run OpenEvolve to optimize the prompt.""" |
|
|
| progress(0, desc="Validating inputs...") |
|
|
| |
| is_valid, validation_message = validate_inputs( |
| dataset_name, dataset_split, input_field, target_field, initial_prompt |
| ) |
|
|
| if not is_valid: |
| return f"## Validation Failed\n\n{validation_message}", "", "", "", [], 0, 0 |
|
|
| progress(0.05, desc=f"Validation passed: {validation_message}") |
|
|
| |
| work_dir = tempfile.mkdtemp(prefix="openevolve_") |
|
|
| try: |
| |
| initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt") |
| with open(initial_prompt_path, "w") as f: |
| f.write(initial_prompt) |
|
|
| |
| progress(0.1, desc="Creating evaluator...") |
| evaluator_path = create_evaluator_file(dataset_name, dataset_split, model, |
| input_field, target_field, work_dir) |
|
|
| |
| progress(0.15, desc="Creating configuration...") |
| config_path = create_config_file(model, work_dir) |
|
|
| |
| progress(0.2, desc="Running initial evaluation on 20 samples...") |
| initial_eval = evaluate_prompt( |
| initial_prompt, dataset_name, dataset_split, 20, |
| model, input_field, target_field |
| ) |
|
|
| if "error" in initial_eval: |
| return f"## Error\n\n❌ Initial evaluation failed: {initial_eval['error']}", "", "", "", [initial_prompt], 0, 1 |
|
|
| if initial_eval["total"] == 0: |
| return f"## Error\n\n❌ Initial evaluation failed: No samples could be evaluated. This usually means:\n- API key is invalid or has no credits\n- Model is unavailable or rate-limited\n- Dataset fields are incorrect\n- Network connectivity issues\n\nPlease check your configuration and try again.", "", "", "", [initial_prompt], 0, 1 |
|
|
| initial_results = f""" |
| ### Initial Prompt Evaluation |
| |
| **Prompt:** |
| ``` |
| {initial_prompt} |
| ``` |
| |
| **Results:** |
| - Accuracy: {initial_eval['accuracy']:.2f}% |
| - Correct: {initial_eval['correct']}/{initial_eval['total']} |
| |
| **Sample Results:** |
| """ |
| for i, result in enumerate(initial_eval['results'][:5], 1): |
| initial_results += f"\n{i}. Input: {result['input']}\n" |
| initial_results += f" Target: {result['target']}\n" |
| initial_results += f" Prediction: {result['prediction']}\n" |
| initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
| |
| progress(0.3, desc="Starting OpenEvolve optimization (10 iterations with staged evaluation)...") |
|
|
| output_dir = os.path.join(work_dir, "output") |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| try: |
| |
| |
| import sys |
| import openevolve.controller.parallel_controller as pc_module |
|
|
| |
| OriginalProcessController = pc_module.ProcessParallelController |
|
|
| |
| class NoOpProcessController: |
| def __init__(self, *args, **kwargs): |
| self.num_workers = 1 |
| pass |
|
|
| def __enter__(self): |
| return self |
|
|
| def __exit__(self, *args): |
| pass |
|
|
| def submit(self, func, *args, **kwargs): |
| |
| return func(*args, **kwargs) |
|
|
| def map(self, func, *args): |
| |
| return [func(*a) for a in zip(*args)] |
|
|
| |
| pc_module.ProcessParallelController = NoOpProcessController |
|
|
| try: |
| |
| result = run_evolution( |
| initial_program=initial_prompt_path, |
| evaluator=evaluator_path, |
| config=config_path, |
| output_dir=output_dir |
| ) |
| finally: |
| |
| pc_module.ProcessParallelController = OriginalProcessController |
|
|
| progress(0.80, desc="Parsing evolution history...") |
|
|
| |
| evolution_viz = parse_evolution_history(output_dir) |
|
|
| progress(0.85, desc="Evaluating best evolved prompt on 20 samples...") |
|
|
| |
| best_prompt_path = os.path.join(output_dir, "best_program.txt") |
| if os.path.exists(best_prompt_path): |
| with open(best_prompt_path, "r") as f: |
| best_prompt = f.read() |
| else: |
| best_prompt = initial_prompt |
|
|
| |
| final_eval = evaluate_prompt( |
| best_prompt, dataset_name, dataset_split, 20, |
| model, input_field, target_field |
| ) |
|
|
| final_results = f""" |
| ### Evolved Prompt Evaluation |
| |
| **Prompt:** |
| ``` |
| {best_prompt} |
| ``` |
| |
| **Results:** |
| - Accuracy: {final_eval['accuracy']:.2f}% |
| - Correct: {final_eval['correct']}/{final_eval['total']} |
| - Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}% |
| |
| **Sample Results:** |
| """ |
| for i, result in enumerate(final_eval['results'][:5], 1): |
| final_results += f"\n{i}. Input: {result['input']}\n" |
| final_results += f" Target: {result['target']}\n" |
| final_results += f" Prediction: {result['prediction']}\n" |
| final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
| summary = f""" |
| ## 🎉 Optimization Complete! |
| |
| ### Summary |
| - **Dataset**: {dataset_name} ({dataset_split} split) |
| - **Model**: {model} |
| - **Initial/Final Eval**: 20 samples each |
| - **Evolution Eval**: Staged (20 → 100 if score ≥ 0.5) |
| - **Iterations**: 10 |
| |
| ### Results |
| - **Initial Accuracy**: {initial_eval['accuracy']:.2f}% |
| - **Final Accuracy**: {final_eval['accuracy']:.2f}% |
| - **Improvement**: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}% |
| |
| {validation_message} |
| """ |
|
|
| progress(1.0, desc="Complete!") |
|
|
| |
| all_prompts = [initial_prompt] |
| prompt_history = collect_prompt_history(output_dir) |
| for p in prompt_history: |
| all_prompts.append(p["prompt"]) |
|
|
| |
| if best_prompt not in all_prompts: |
| all_prompts.append(best_prompt) |
|
|
| return summary, initial_results, evolution_viz, final_results, all_prompts, 0, len(all_prompts) |
|
|
| except Exception as e: |
| return f"## Error During Evolution\n\n❌ {str(e)}", initial_results, "", "", [initial_prompt], 0, 1 |
|
|
| finally: |
| |
| |
| pass |
|
|
|
|
| |
| with gr.Blocks(title="OpenEvolve Prompt Optimizer", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # 🧬 OpenEvolve Prompt Optimizer |
| |
| Automatically evolve and optimize your prompts using evolutionary algorithms! |
| |
| This space uses [OpenEvolve](https://github.com/algorithmicsuperintelligence/openevolve) to iteratively improve prompts |
| by testing them on real datasets and evolving better versions. |
| |
| ## How it works: |
| 1. Enter an initial prompt (use `{input}` as a placeholder for dataset inputs) |
| 2. Enter the full HuggingFace dataset name (e.g., `stanfordnlp/imdb`, `gsm8k`) |
| 3. Specify the dataset split and field names |
| 4. Choose a free model from OpenRouter |
| 5. Click "Optimize Prompt" - the system will validate everything first! |
| 6. Watch the evolution progress in real-time |
| 7. Compare initial vs. evolved performance! |
| |
| **Note**: API key is read from `OPENAI_API_KEY` environment variable (set in Space secrets) |
| """) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### Configuration") |
|
|
| model = gr.Dropdown( |
| choices=FREE_MODELS, |
| value=FREE_MODELS[0], |
| label="Select Model", |
| info="Choose from 5 curated free models on OpenRouter (24B to 671B parameters)" |
| ) |
|
|
| dataset_name = gr.Textbox( |
| label="HuggingFace Dataset (Full Name)", |
| value="stanfordnlp/imdb", |
| placeholder="e.g., stanfordnlp/imdb, openai/gsm8k, SetFit/sst5", |
| info="Full dataset name from HuggingFace Hub (org/dataset-name or dataset-name)" |
| ) |
|
|
| dataset_split = gr.Textbox( |
| label="Dataset Split", |
| value="test", |
| placeholder="e.g., train, test, validation" |
| ) |
|
|
| input_field = gr.Textbox( |
| label="Input Field Name", |
| value="text", |
| placeholder="e.g., text, question, context", |
| info="The field containing inputs to process" |
| ) |
|
|
| target_field = gr.Textbox( |
| label="Target Field Name", |
| value="label", |
| placeholder="e.g., label, answer, target", |
| info="The field containing expected outputs" |
| ) |
|
|
| initial_prompt = gr.TextArea( |
| label="Initial Prompt", |
| value="Analyze the sentiment of the following text and classify it as positive or negative:\n\n{input}\n\nClassification:", |
| lines=6, |
| info="Use {input} as placeholder for dataset inputs" |
| ) |
|
|
| |
| with gr.Row(): |
| with gr.Column(): |
| optimize_btn = gr.Button("🚀 Validate & Optimize Prompt", variant="primary", size="lg") |
|
|
| |
| gr.Markdown("---") |
| gr.Markdown("## 📊 Results") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| summary = gr.Markdown("Click 'Validate & Optimize Prompt' to start optimization...", visible=True) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| initial_results = gr.Markdown("### Initial Results\nWill appear here after validation...", visible=True) |
| with gr.Column(): |
| final_results = gr.Markdown("### Final Results\nWill appear here after optimization...", visible=True) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| evolution_progress = gr.Markdown("### Evolution Progress\nEvolution progress will appear here during optimization...", visible=True) |
|
|
| |
| gr.Markdown("---") |
| gr.Markdown("## 📜 Prompt History Browser") |
| gr.Markdown("Browse through all prompts discovered during evolution (initial → intermediate → final)") |
|
|
| with gr.Row(): |
| with gr.Column(scale=8): |
| prompt_display = gr.TextArea( |
| label="", |
| lines=10, |
| interactive=False, |
| placeholder="Prompts will appear here after optimization completes...", |
| show_label=False |
| ) |
| with gr.Column(scale=2): |
| prompt_counter = gr.Markdown("**Prompt**: -/-") |
| prev_btn = gr.Button("⬅️ Previous", size="sm") |
| next_btn = gr.Button("Next ➡️", size="sm") |
| gr.Markdown("**Prompt Types:**\n- First = Initial\n- Middle = Intermediate\n- Last = Final Best") |
|
|
| |
| prompt_history_state = gr.State([]) |
| current_prompt_index = gr.State(0) |
|
|
| |
| gr.Markdown("---") |
| with gr.Accordion("📚 Documentation & Examples", open=False): |
| gr.Markdown(""" |
| ### Example Datasets & Fields: |
| |
| | Dataset | Split | Input Field | Target Field | Task | |
| |---------|-------|-------------|--------------|------| |
| | stanfordnlp/imdb | test | text | label | Sentiment Analysis | |
| | rajpurkar/squad | validation | question | answers | Question Answering | |
| | dair-ai/emotion | test | text | label | Emotion Classification | |
| | openai/gsm8k | test | question | answer | Math Reasoning | |
| | fancyzhx/ag_news | test | text | label | News Classification | |
| |
| ### About This Demo Space: |
| |
| **This is a demonstration space** showcasing OpenEvolve's prompt optimization capabilities. |
| The interface shows you how the system works, but **you'll need to set up your own instance to run optimizations**. |
| |
| ### How to Run This Yourself: |
| |
| 1. **Clone this Space**: Click "⋮" (three dots) at top-right → "Duplicate this Space" |
| 2. **Set Environment Variables** in your cloned Space's settings: |
| - `OPENAI_API_KEY`: Your OpenRouter API key (get free key at [openrouter.ai/keys](https://openrouter.ai/keys)) |
| - `HF_TOKEN`: (Optional) HuggingFace token for private datasets |
| 3. **Configure Your Optimization**: |
| - Dataset: Use full name format (e.g., `stanfordnlp/imdb` or `openai/gsm8k`) |
| - Fields: Specify exact field names from the dataset schema |
| - Model: Choose from 5 curated free models (larger models = better results but slower/rate-limited) |
| 4. **Run & Monitor**: |
| - All inputs are validated before starting |
| - Evolution uses staged evaluation (20 samples first, then 80 more if promising) |
| - Saves API calls by early-stopping poor prompts (< 50% accuracy) |
| - Watch evolution progress visualization in real-time |
| |
| ### About OpenEvolve: |
| OpenEvolve is an open-source evolutionary optimization framework. Learn more at: |
| - [GitHub Repository](https://github.com/algorithmicsuperintelligence/openevolve) |
| - [Documentation](https://github.com/algorithmicsuperintelligence/openevolve#readme) |
| """) |
|
|
| |
| def show_previous_prompt(prompts, current_idx): |
| if not prompts or len(prompts) == 0: |
| return "", "**Prompt**: -/-", 0 |
| new_idx = max(0, current_idx - 1) |
| counter_text = f"**Prompt**: {new_idx + 1}/{len(prompts)}" |
| if new_idx == 0: |
| counter_text += " (Initial)" |
| elif new_idx == len(prompts) - 1: |
| counter_text += " (Final Best)" |
| else: |
| counter_text += " (Intermediate)" |
| return prompts[new_idx], counter_text, new_idx |
|
|
| def show_next_prompt(prompts, current_idx): |
| if not prompts or len(prompts) == 0: |
| return "", "**Prompt**: -/-", 0 |
| new_idx = min(len(prompts) - 1, current_idx + 1) |
| counter_text = f"**Prompt**: {new_idx + 1}/{len(prompts)}" |
| if new_idx == 0: |
| counter_text += " (Initial)" |
| elif new_idx == len(prompts) - 1: |
| counter_text += " (Final Best)" |
| else: |
| counter_text += " (Intermediate)" |
| return prompts[new_idx], counter_text, new_idx |
|
|
| def update_prompt_display(prompts, idx, total): |
| if not prompts or len(prompts) == 0: |
| return "", "**Prompt**: -/-" |
| idx = min(idx, len(prompts) - 1) |
| counter_text = f"**Prompt**: {idx + 1}/{len(prompts)}" |
| if idx == 0: |
| counter_text += " (Initial)" |
| elif idx == len(prompts) - 1: |
| counter_text += " (Final Best)" |
| else: |
| counter_text += " (Intermediate)" |
| return prompts[idx], counter_text |
|
|
| |
| optimize_result = optimize_btn.click( |
| fn=optimize_prompt, |
| inputs=[initial_prompt, dataset_name, dataset_split, model, |
| input_field, target_field], |
| outputs=[summary, initial_results, evolution_progress, final_results, |
| prompt_history_state, current_prompt_index, gr.State()] |
| ) |
|
|
| |
| optimize_result.then( |
| fn=update_prompt_display, |
| inputs=[prompt_history_state, current_prompt_index, gr.State()], |
| outputs=[prompt_display, prompt_counter] |
| ) |
|
|
| |
| prev_btn.click( |
| fn=show_previous_prompt, |
| inputs=[prompt_history_state, current_prompt_index], |
| outputs=[prompt_display, prompt_counter, current_prompt_index] |
| ) |
|
|
| next_btn.click( |
| fn=show_next_prompt, |
| inputs=[prompt_history_state, current_prompt_index], |
| outputs=[prompt_display, prompt_counter, current_prompt_index] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|