| import gradio as gr |
| import os |
| import yaml |
| import json |
| import random |
| from datasets import load_dataset |
| from openai import OpenAI |
| from openevolve import run_evolution |
| from typing import Dict, List, Tuple |
| import tempfile |
| import shutil |
|
|
| |
| FREE_MODELS = [ |
| "google/gemini-2.0-flash-001:free", |
| "google/gemini-flash-1.5-8b:free", |
| "meta-llama/llama-3.2-3b-instruct:free", |
| "meta-llama/llama-3.2-1b-instruct:free", |
| "microsoft/phi-3-mini-128k-instruct:free", |
| "microsoft/phi-3-medium-128k-instruct:free", |
| "qwen/qwen-2-7b-instruct:free", |
| "mistralai/mistral-7b-instruct:free", |
| ] |
|
|
| |
| SAMPLE_DATASETS = { |
| "Question Answering": [ |
| "hotpot_qa", |
| "squad", |
| "trivia_qa", |
| ], |
| "Sentiment Analysis": [ |
| "imdb", |
| "yelp_review_full", |
| "emotion", |
| ], |
| "Text Classification": [ |
| "ag_news", |
| "dbpedia_14", |
| "SetFit/sst5", |
| ], |
| "Math Reasoning": [ |
| "gsm8k", |
| "math_qa", |
| ], |
| } |
|
|
|
|
| def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int, |
| api_key: str, model: str, input_field: str, target_field: str) -> Dict: |
| """Evaluate a prompt on a dataset using the selected model.""" |
| try: |
| |
| dataset = load_dataset(dataset_name, split=split, streaming=False) |
|
|
| |
| if len(dataset) > num_samples: |
| indices = random.sample(range(len(dataset)), num_samples) |
| samples = [dataset[i] for i in indices] |
| else: |
| samples = list(dataset)[:num_samples] |
|
|
| |
| client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
|
|
| correct = 0 |
| total = 0 |
| results = [] |
|
|
| for sample in samples: |
| try: |
| |
| input_text = sample.get(input_field, "") |
| if isinstance(input_text, dict): |
| input_text = str(input_text) |
|
|
| target = sample.get(target_field, "") |
| if isinstance(target, dict): |
| target = str(target) |
|
|
| |
| formatted_prompt = prompt.replace("{input}", str(input_text)) |
|
|
| |
| response = client.chat.completions.create( |
| model=model, |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant."}, |
| {"role": "user", "content": formatted_prompt} |
| ], |
| temperature=0.1, |
| max_tokens=500, |
| ) |
|
|
| prediction = response.choices[0].message.content.strip() |
|
|
| |
| is_correct = str(target).lower().strip() in prediction.lower() |
| if is_correct: |
| correct += 1 |
| total += 1 |
|
|
| results.append({ |
| "input": str(input_text)[:100] + "...", |
| "target": str(target), |
| "prediction": prediction[:100] + "...", |
| "correct": is_correct |
| }) |
|
|
| except Exception as e: |
| print(f"Error evaluating sample: {e}") |
| continue |
|
|
| accuracy = (correct / total * 100) if total > 0 else 0 |
|
|
| return { |
| "accuracy": accuracy, |
| "correct": correct, |
| "total": total, |
| "results": results |
| } |
|
|
| except Exception as e: |
| return { |
| "error": str(e), |
| "accuracy": 0, |
| "correct": 0, |
| "total": 0, |
| "results": [] |
| } |
|
|
|
|
| def create_evaluator_file(dataset_name: str, split: str, model: str, |
| input_field: str, target_field: str, work_dir: str): |
| """Create an evaluator.py file for OpenEvolve.""" |
| evaluator_code = f''' |
| import os |
| import random |
| from datasets import load_dataset |
| from openai import OpenAI |
| |
| def evaluate(prompt: str) -> float: |
| """Evaluate a prompt and return a score between 0 and 1.""" |
| try: |
| # Load dataset |
| dataset = load_dataset("{dataset_name}", split="{split}", streaming=False) |
| |
| # Sample 100 random examples |
| num_samples = min(100, len(dataset)) |
| if len(dataset) > num_samples: |
| indices = random.sample(range(len(dataset)), num_samples) |
| samples = [dataset[i] for i in indices] |
| else: |
| samples = list(dataset)[:num_samples] |
| |
| # Initialize OpenAI client |
| api_key = os.environ.get("OPENAI_API_KEY") |
| client = OpenAI( |
| base_url="https://openrouter.ai/api/v1", |
| api_key=api_key, |
| ) |
| |
| correct = 0 |
| total = 0 |
| |
| for sample in samples: |
| try: |
| # Get input and target |
| input_text = sample.get("{input_field}", "") |
| if isinstance(input_text, dict): |
| input_text = str(input_text) |
| |
| target = sample.get("{target_field}", "") |
| if isinstance(target, dict): |
| target = str(target) |
| |
| # Format the prompt |
| formatted_prompt = prompt.replace("{{input}}", str(input_text)) |
| |
| # Call the model |
| response = client.chat.completions.create( |
| model="{model}", |
| messages=[ |
| {{"role": "system", "content": "You are a helpful assistant."}}, |
| {{"role": "user", "content": formatted_prompt}} |
| ], |
| temperature=0.1, |
| max_tokens=500, |
| ) |
| |
| prediction = response.choices[0].message.content.strip() |
| |
| # Simple evaluation |
| is_correct = str(target).lower().strip() in prediction.lower() |
| if is_correct: |
| correct += 1 |
| total += 1 |
| |
| except Exception as e: |
| print(f"Error evaluating sample: {{e}}") |
| continue |
| |
| # Return score between 0 and 1 |
| return (correct / total) if total > 0 else 0.0 |
| |
| except Exception as e: |
| print(f"Error in evaluation: {{e}}") |
| return 0.0 |
| ''' |
|
|
| evaluator_path = os.path.join(work_dir, "evaluator.py") |
| with open(evaluator_path, "w") as f: |
| f.write(evaluator_code) |
|
|
| return evaluator_path |
|
|
|
|
| def create_config_file(model: str, work_dir: str): |
| """Create a config.yaml file for OpenEvolve.""" |
| config = { |
| "llm": { |
| "api_base": "https://openrouter.ai/api/v1", |
| "model": model, |
| "temperature": 0.7, |
| "max_tokens": 4096, |
| }, |
| "evolution": { |
| "max_iterations": 10, |
| "population_size": 10, |
| "num_islands": 1, |
| "elite_ratio": 0.1, |
| "explore_ratio": 0.3, |
| "exploit_ratio": 0.6, |
| }, |
| "evaluation": { |
| "timeout": 1800, |
| } |
| } |
|
|
| config_path = os.path.join(work_dir, "config.yaml") |
| with open(config_path, "w") as f: |
| yaml.dump(config, f) |
|
|
| return config_path |
|
|
|
|
| def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str, |
| model: str, api_key: str, input_field: str, target_field: str, |
| progress=gr.Progress()) -> Tuple[str, str, str]: |
| """Run OpenEvolve to optimize the prompt.""" |
|
|
| if not api_key: |
| return "Error: OpenAI API Key is required", "", "" |
|
|
| |
| os.environ["OPENAI_API_KEY"] = api_key |
|
|
| progress(0, desc="Setting up...") |
|
|
| |
| work_dir = tempfile.mkdtemp(prefix="openevolve_") |
|
|
| try: |
| |
| initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt") |
| with open(initial_prompt_path, "w") as f: |
| f.write(initial_prompt) |
|
|
| |
| progress(0.1, desc="Creating evaluator...") |
| evaluator_path = create_evaluator_file(dataset_name, dataset_split, model, |
| input_field, target_field, work_dir) |
|
|
| |
| progress(0.2, desc="Creating configuration...") |
| config_path = create_config_file(model, work_dir) |
|
|
| |
| progress(0.3, desc="Running initial evaluation...") |
| initial_eval = evaluate_prompt( |
| initial_prompt, dataset_name, dataset_split, 100, |
| api_key, model, input_field, target_field |
| ) |
|
|
| initial_results = f""" |
| ### Initial Prompt Evaluation |
| |
| **Prompt:** |
| ``` |
| {initial_prompt} |
| ``` |
| |
| **Results:** |
| - Accuracy: {initial_eval['accuracy']:.2f}% |
| - Correct: {initial_eval['correct']}/{initial_eval['total']} |
| |
| **Sample Results:** |
| """ |
| for i, result in enumerate(initial_eval['results'][:5], 1): |
| initial_results += f"\n{i}. Input: {result['input']}\n" |
| initial_results += f" Target: {result['target']}\n" |
| initial_results += f" Prediction: {result['prediction']}\n" |
| initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
| |
| progress(0.4, desc="Running OpenEvolve (this may take several minutes)...") |
|
|
| output_dir = os.path.join(work_dir, "output") |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| try: |
| |
| result = run_evolution( |
| initial_program_path=initial_prompt_path, |
| evaluator_path=evaluator_path, |
| config_path=config_path, |
| output_dir=output_dir, |
| verbose=True |
| ) |
|
|
| progress(0.8, desc="Evaluating best prompt...") |
|
|
| |
| best_prompt_path = os.path.join(output_dir, "best_program.txt") |
| if os.path.exists(best_prompt_path): |
| with open(best_prompt_path, "r") as f: |
| best_prompt = f.read() |
| else: |
| best_prompt = initial_prompt |
|
|
| |
| final_eval = evaluate_prompt( |
| best_prompt, dataset_name, dataset_split, 100, |
| api_key, model, input_field, target_field |
| ) |
|
|
| final_results = f""" |
| ### Evolved Prompt Evaluation |
| |
| **Prompt:** |
| ``` |
| {best_prompt} |
| ``` |
| |
| **Results:** |
| - Accuracy: {final_eval['accuracy']:.2f}% |
| - Correct: {final_eval['correct']}/{final_eval['total']} |
| - Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:.2f}% |
| |
| **Sample Results:** |
| """ |
| for i, result in enumerate(final_eval['results'][:5], 1): |
| final_results += f"\n{i}. Input: {result['input']}\n" |
| final_results += f" Target: {result['target']}\n" |
| final_results += f" Prediction: {result['prediction']}\n" |
| final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
| summary = f""" |
| ## Optimization Complete! |
| |
| ### Summary |
| - Initial Accuracy: {initial_eval['accuracy']:.2f}% |
| - Final Accuracy: {final_eval['accuracy']:.2f}% |
| - Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:.2f}% |
| - Dataset: {dataset_name} |
| - Model: {model} |
| - Samples Evaluated: 100 |
| - Iterations: 10 |
| """ |
|
|
| progress(1.0, desc="Complete!") |
|
|
| return summary, initial_results, final_results |
|
|
| except Exception as e: |
| return f"Error during evolution: {str(e)}", initial_results, "" |
|
|
| finally: |
| |
| try: |
| shutil.rmtree(work_dir) |
| except: |
| pass |
|
|
|
|
| |
| with gr.Blocks(title="OpenEvolve Prompt Optimizer", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # 🧬 OpenEvolve Prompt Optimizer |
| |
| Automatically evolve and optimize your prompts using evolutionary algorithms! |
| |
| This space uses [OpenEvolve](https://github.com/codelion/openevolve) to iteratively improve prompts |
| by testing them on real datasets and evolving better versions. |
| |
| ## How it works: |
| 1. Enter an initial prompt (use `{input}` as a placeholder for dataset inputs) |
| 2. Select a HuggingFace dataset to test on |
| 3. Choose a free model from OpenRouter |
| 4. Click "Optimize Prompt" to evolve better versions |
| 5. Compare initial vs. evolved performance! |
| """) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### Configuration") |
|
|
| api_key = gr.Textbox( |
| label="OpenAI API Key (for OpenRouter)", |
| type="password", |
| placeholder="sk-or-v1-...", |
| info="Get your free key at https://openrouter.ai/keys" |
| ) |
|
|
| model = gr.Dropdown( |
| choices=FREE_MODELS, |
| value=FREE_MODELS[0], |
| label="Select Model", |
| info="Free models available on OpenRouter" |
| ) |
|
|
| dataset_name = gr.Textbox( |
| label="HuggingFace Dataset", |
| value="imdb", |
| placeholder="e.g., imdb, hotpot_qa, gsm8k", |
| info="Any dataset from HuggingFace Hub" |
| ) |
|
|
| dataset_split = gr.Textbox( |
| label="Dataset Split", |
| value="test", |
| placeholder="e.g., train, test, validation" |
| ) |
|
|
| input_field = gr.Textbox( |
| label="Input Field Name", |
| value="text", |
| placeholder="e.g., text, question, context", |
| info="The field containing inputs to process" |
| ) |
|
|
| target_field = gr.Textbox( |
| label="Target Field Name", |
| value="label", |
| placeholder="e.g., label, answer, target", |
| info="The field containing expected outputs" |
| ) |
|
|
| initial_prompt = gr.TextArea( |
| label="Initial Prompt", |
| value="Analyze the sentiment of the following text and classify it as positive or negative:\n\n{input}\n\nClassification:", |
| lines=6, |
| info="Use {input} as placeholder for dataset inputs" |
| ) |
|
|
| optimize_btn = gr.Button("🚀 Optimize Prompt", variant="primary", size="lg") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| summary = gr.Markdown(label="Summary") |
|
|
| with gr.Row(): |
| with gr.Column(): |
| initial_results = gr.Markdown(label="Initial Results") |
| with gr.Column(): |
| final_results = gr.Markdown(label="Evolved Results") |
|
|
| gr.Markdown(""" |
| ### Example Datasets & Fields: |
| |
| | Dataset | Split | Input Field | Target Field | Task | |
| |---------|-------|-------------|--------------|------| |
| | imdb | test | text | label | Sentiment Analysis | |
| | hotpot_qa | validation | question | answer | Question Answering | |
| | emotion | test | text | label | Emotion Classification | |
| | gsm8k | test | question | answer | Math Reasoning | |
| | ag_news | test | text | label | News Classification | |
| |
| ### Notes: |
| - Evolution runs for 10 iterations with 1 island |
| - Each evaluation uses 100 random samples from the dataset |
| - The process may take 5-15 minutes depending on the dataset and model |
| - Make sure your API key has sufficient credits for the requests |
| """) |
|
|
| optimize_btn.click( |
| fn=optimize_prompt, |
| inputs=[initial_prompt, dataset_name, dataset_split, model, api_key, |
| input_field, target_field], |
| outputs=[summary, initial_results, final_results] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|