Spaces:
Paused
Paused
| """ | |
| Human Evaluation Generator Module | |
| Generate evaluation sets for blind testing by human evaluators. | |
| Creates paired comparisons and rating tasks. | |
| Example usage: | |
| generator = HumanEvalGenerator() | |
| eval_set = generator.generate_blind_test( | |
| questions=test_questions, | |
| responses_a=model_a_responses, | |
| responses_b=model_b_responses, | |
| ) | |
| """ | |
| import json | |
| import random | |
| import hashlib | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional | |
| from loguru import logger | |
| class EvaluationItem: | |
| """A single item for human evaluation.""" | |
| item_id: str | |
| question: str | |
| response_a: str | |
| response_b: str | |
| response_a_source: str # Hidden from evaluator | |
| response_b_source: str # Hidden from evaluator | |
| category: str | |
| metadata: dict = field(default_factory=dict) | |
| def to_evaluator_dict(self) -> dict: | |
| """Return dict for evaluator (without source info).""" | |
| return { | |
| "item_id": self.item_id, | |
| "question": self.question, | |
| "response_a": self.response_a, | |
| "response_b": self.response_b, | |
| "category": self.category, | |
| } | |
| def to_full_dict(self) -> dict: | |
| """Return full dict with sources.""" | |
| return { | |
| "item_id": self.item_id, | |
| "question": self.question, | |
| "response_a": self.response_a, | |
| "response_b": self.response_b, | |
| "response_a_source": self.response_a_source, | |
| "response_b_source": self.response_b_source, | |
| "category": self.category, | |
| "metadata": self.metadata, | |
| } | |
| class RatingItem: | |
| """A single item for absolute rating.""" | |
| item_id: str | |
| question: str | |
| response: str | |
| source: str # Hidden | |
| category: str | |
| criteria: list = field(default_factory=list) | |
| metadata: dict = field(default_factory=dict) | |
| def to_evaluator_dict(self) -> dict: | |
| """Return dict for evaluator.""" | |
| return { | |
| "item_id": self.item_id, | |
| "question": self.question, | |
| "response": self.response, | |
| "category": self.category, | |
| "criteria": self.criteria, | |
| } | |
| class HumanEvalGenerator: | |
| """ | |
| Generate evaluation sets for human evaluation. | |
| Creates: | |
| - Blind A/B comparisons | |
| - Absolute rating tasks | |
| - Multi-criteria evaluations | |
| Example: | |
| >>> generator = HumanEvalGenerator() | |
| >>> eval_set = generator.generate_blind_test(questions, resp_a, resp_b) | |
| >>> generator.save_evaluation_set(eval_set, "evaluation/") | |
| """ | |
| # Default evaluation criteria | |
| DEFAULT_CRITERIA = [ | |
| { | |
| "name": "voice_authenticity", | |
| "description": "How well does the response capture the CEO's authentic voice and communication style?", | |
| "scale": "1-5 (1=Not at all, 5=Perfectly authentic)", | |
| }, | |
| { | |
| "name": "helpfulness", | |
| "description": "How helpful and substantive is the response in addressing the question?", | |
| "scale": "1-5 (1=Not helpful, 5=Extremely helpful)", | |
| }, | |
| { | |
| "name": "clarity", | |
| "description": "How clear and well-organized is the response?", | |
| "scale": "1-5 (1=Confusing, 5=Crystal clear)", | |
| }, | |
| { | |
| "name": "professionalism", | |
| "description": "How appropriate is the response for professional business communication?", | |
| "scale": "1-5 (1=Inappropriate, 5=Highly professional)", | |
| }, | |
| ] | |
| # Question categories for balanced evaluation | |
| CATEGORIES = [ | |
| "opinion", | |
| "strategic", | |
| "factual", | |
| "personal_philosophy", | |
| "challenge", | |
| ] | |
| def __init__( | |
| self, | |
| criteria: Optional[list] = None, | |
| randomize: bool = True, | |
| seed: Optional[int] = None, | |
| ): | |
| """ | |
| Initialize the generator. | |
| Args: | |
| criteria: Custom evaluation criteria | |
| randomize: Whether to randomize response order | |
| seed: Random seed for reproducibility | |
| """ | |
| self.criteria = criteria or self.DEFAULT_CRITERIA | |
| self.randomize = randomize | |
| if seed is not None: | |
| random.seed(seed) | |
| def generate_blind_test( | |
| self, | |
| questions: list[str], | |
| responses_a: list[str], | |
| responses_b: list[str], | |
| source_a_name: str = "Model A", | |
| source_b_name: str = "Model B", | |
| categories: Optional[list[str]] = None, | |
| ) -> list[EvaluationItem]: | |
| """ | |
| Generate a blind A/B comparison test. | |
| Args: | |
| questions: List of questions | |
| responses_a: Responses from source A | |
| responses_b: Responses from source B | |
| source_a_name: Name of source A (hidden from evaluator) | |
| source_b_name: Name of source B (hidden from evaluator) | |
| categories: Optional category for each question | |
| Returns: | |
| List of EvaluationItem objects | |
| """ | |
| if len(questions) != len(responses_a) or len(questions) != len(responses_b): | |
| raise ValueError("Questions and responses must have same length") | |
| categories = categories or ["general"] * len(questions) | |
| items = [] | |
| for i, (q, ra, rb, cat) in enumerate(zip(questions, responses_a, responses_b, categories)): | |
| # Generate unique ID | |
| item_id = self._generate_id(q, ra, rb) | |
| # Randomize order if enabled | |
| if self.randomize and random.random() > 0.5: | |
| ra, rb = rb, ra | |
| source_a_name, source_b_name = source_b_name, source_a_name | |
| items.append(EvaluationItem( | |
| item_id=item_id, | |
| question=q, | |
| response_a=ra, | |
| response_b=rb, | |
| response_a_source=source_a_name, | |
| response_b_source=source_b_name, | |
| category=cat, | |
| metadata={"index": i}, | |
| )) | |
| if self.randomize: | |
| random.shuffle(items) | |
| return items | |
| def generate_rating_test( | |
| self, | |
| questions: list[str], | |
| responses: list[str], | |
| source_name: str = "Model", | |
| categories: Optional[list[str]] = None, | |
| criteria: Optional[list] = None, | |
| ) -> list[RatingItem]: | |
| """ | |
| Generate an absolute rating test. | |
| Args: | |
| questions: List of questions | |
| responses: List of responses | |
| source_name: Name of source (hidden from evaluator) | |
| categories: Optional category for each question | |
| criteria: Evaluation criteria to use | |
| Returns: | |
| List of RatingItem objects | |
| """ | |
| if len(questions) != len(responses): | |
| raise ValueError("Questions and responses must have same length") | |
| categories = categories or ["general"] * len(questions) | |
| criteria = criteria or self.criteria | |
| items = [] | |
| for i, (q, r, cat) in enumerate(zip(questions, responses, categories)): | |
| item_id = self._generate_id(q, r) | |
| items.append(RatingItem( | |
| item_id=item_id, | |
| question=q, | |
| response=r, | |
| source=source_name, | |
| category=cat, | |
| criteria=criteria, | |
| metadata={"index": i}, | |
| )) | |
| if self.randomize: | |
| random.shuffle(items) | |
| return items | |
| def generate_multi_model_test( | |
| self, | |
| questions: list[str], | |
| model_responses: dict[str, list[str]], | |
| categories: Optional[list[str]] = None, | |
| ) -> list[EvaluationItem]: | |
| """ | |
| Generate pairwise comparisons for multiple models. | |
| Args: | |
| questions: List of questions | |
| model_responses: Dict of {model_name: responses} | |
| categories: Optional categories | |
| Returns: | |
| List of pairwise EvaluationItem objects | |
| """ | |
| model_names = list(model_responses.keys()) | |
| if len(model_names) < 2: | |
| raise ValueError("Need at least 2 models for comparison") | |
| items = [] | |
| # Generate all pairs | |
| for i in range(len(model_names)): | |
| for j in range(i + 1, len(model_names)): | |
| name_a = model_names[i] | |
| name_b = model_names[j] | |
| pair_items = self.generate_blind_test( | |
| questions=questions, | |
| responses_a=model_responses[name_a], | |
| responses_b=model_responses[name_b], | |
| source_a_name=name_a, | |
| source_b_name=name_b, | |
| categories=categories, | |
| ) | |
| items.extend(pair_items) | |
| if self.randomize: | |
| random.shuffle(items) | |
| return items | |
| def save_evaluation_set( | |
| self, | |
| items: list, | |
| output_dir: str | Path, | |
| include_sources: bool = False, | |
| ) -> dict: | |
| """ | |
| Save evaluation set to files. | |
| Args: | |
| items: Evaluation items | |
| output_dir: Output directory | |
| include_sources: Include source info in evaluator file | |
| Returns: | |
| Dict with file paths | |
| """ | |
| output_dir = Path(output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Evaluator file (without source info) | |
| evaluator_data = { | |
| "metadata": { | |
| "created": timestamp, | |
| "num_items": len(items), | |
| "criteria": self.criteria, | |
| }, | |
| "items": [ | |
| item.to_evaluator_dict() for item in items | |
| ], | |
| } | |
| evaluator_path = output_dir / f"evaluation_set_{timestamp}.json" | |
| with open(evaluator_path, "w", encoding="utf-8") as f: | |
| json.dump(evaluator_data, f, indent=2, ensure_ascii=False) | |
| # Answer key (with source info) | |
| answer_key_data = { | |
| "metadata": { | |
| "created": timestamp, | |
| "num_items": len(items), | |
| }, | |
| "items": [ | |
| item.to_full_dict() for item in items | |
| ], | |
| } | |
| answer_key_path = output_dir / f"answer_key_{timestamp}.json" | |
| with open(answer_key_path, "w", encoding="utf-8") as f: | |
| json.dump(answer_key_data, f, indent=2, ensure_ascii=False) | |
| # Instructions file | |
| instructions = self._generate_instructions() | |
| instructions_path = output_dir / "evaluation_instructions.md" | |
| with open(instructions_path, "w", encoding="utf-8") as f: | |
| f.write(instructions) | |
| logger.info(f"Saved evaluation set to: {output_dir}") | |
| return { | |
| "evaluator_file": str(evaluator_path), | |
| "answer_key": str(answer_key_path), | |
| "instructions": str(instructions_path), | |
| } | |
| def load_results( | |
| self, | |
| results_path: str | Path, | |
| answer_key_path: str | Path, | |
| ) -> dict: | |
| """ | |
| Load and analyze evaluation results. | |
| Args: | |
| results_path: Path to evaluator results | |
| answer_key_path: Path to answer key | |
| Returns: | |
| Analysis results | |
| """ | |
| with open(results_path, "r") as f: | |
| results = json.load(f) | |
| with open(answer_key_path, "r") as f: | |
| answer_key = json.load(f) | |
| # Build lookup | |
| key_lookup = {item["item_id"]: item for item in answer_key["items"]} | |
| # Analyze | |
| model_wins = {} | |
| model_ratings = {} | |
| for result in results.get("responses", []): | |
| item_id = result.get("item_id") | |
| if item_id not in key_lookup: | |
| continue | |
| key = key_lookup[item_id] | |
| # For A/B comparisons | |
| if "preference" in result: | |
| pref = result["preference"] | |
| if pref == "A": | |
| winner = key.get("response_a_source", "A") | |
| elif pref == "B": | |
| winner = key.get("response_b_source", "B") | |
| else: | |
| winner = "tie" | |
| model_wins[winner] = model_wins.get(winner, 0) + 1 | |
| # For ratings | |
| if "ratings" in result: | |
| source = key.get("source", "unknown") | |
| if source not in model_ratings: | |
| model_ratings[source] = {c["name"]: [] for c in self.criteria} | |
| for criterion, rating in result["ratings"].items(): | |
| if criterion in model_ratings[source]: | |
| model_ratings[source][criterion].append(rating) | |
| # Calculate averages | |
| avg_ratings = {} | |
| for source, ratings in model_ratings.items(): | |
| avg_ratings[source] = { | |
| criterion: sum(scores) / len(scores) if scores else 0 | |
| for criterion, scores in ratings.items() | |
| } | |
| return { | |
| "comparison_wins": model_wins, | |
| "average_ratings": avg_ratings, | |
| "num_responses": len(results.get("responses", [])), | |
| } | |
| def _generate_id(self, *args) -> str: | |
| """Generate unique ID from content.""" | |
| content = "|".join(str(a)[:50] for a in args) | |
| return hashlib.md5(content.encode()).hexdigest()[:12] | |
| def _generate_instructions(self) -> str: | |
| """Generate evaluation instructions.""" | |
| criteria_text = "\n".join( | |
| f"- **{c['name']}**: {c['description']} ({c['scale']})" | |
| for c in self.criteria | |
| ) | |
| return f"""# Human Evaluation Instructions | |
| ## Overview | |
| You will be evaluating AI-generated responses to various questions. Your task is to assess the quality of these responses based on specific criteria. | |
| ## Evaluation Criteria | |
| {criteria_text} | |
| ## Guidelines | |
| 1. **Read Carefully**: Read each question and response thoroughly before evaluating. | |
| 2. **Be Consistent**: Apply the same standards across all evaluations. | |
| 3. **Consider Context**: The responses are meant to represent a CEO's communication style for a technology company. | |
| 4. **A/B Comparisons**: When comparing two responses, select the one that better addresses the question while maintaining an authentic CEO voice. | |
| 5. **Rating Scale**: Use the full range of the rating scale. Reserve 5s for truly exceptional responses and 1s for clearly deficient ones. | |
| ## What to Look For | |
| ### Voice Authenticity | |
| - Does it sound like a real CEO speaking? | |
| - Is the tone confident but not arrogant? | |
| - Does it reflect genuine experience and insight? | |
| ### Helpfulness | |
| - Does it actually answer the question? | |
| - Does it provide actionable insights? | |
| - Is it substantive rather than generic? | |
| ### Clarity | |
| - Is the response well-organized? | |
| - Are ideas expressed clearly? | |
| - Is it easy to follow the reasoning? | |
| ### Professionalism | |
| - Is it appropriate for business communication? | |
| - Does it maintain proper decorum? | |
| - Is it culturally appropriate? | |
| ## Notes | |
| - Take your time with each evaluation | |
| - If unsure, re-read and consider both options carefully | |
| - Your honest assessment is valuable | |
| Thank you for your participation! | |
| """ | |
| def main(): | |
| """CLI entry point for generating evaluation sets.""" | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="Generate human evaluation sets", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python human_eval_generator.py --questions q.json --responses-a a.json --responses-b b.json | |
| python human_eval_generator.py --questions q.json --responses r.json --rating-only | |
| """, | |
| ) | |
| parser.add_argument("--questions", required=True, help="Questions JSON file") | |
| parser.add_argument("--responses-a", help="Responses A JSON file (for A/B test)") | |
| parser.add_argument("--responses-b", help="Responses B JSON file (for A/B test)") | |
| parser.add_argument("--responses", help="Responses JSON file (for rating test)") | |
| parser.add_argument("--output", default="evaluation/", help="Output directory") | |
| parser.add_argument("--rating-only", action="store_true", help="Generate rating test only") | |
| parser.add_argument("--seed", type=int, help="Random seed") | |
| args = parser.parse_args() | |
| # Load questions | |
| with open(args.questions, "r") as f: | |
| questions_data = json.load(f) | |
| questions = [q["question"] if isinstance(q, dict) else q for q in questions_data] | |
| generator = HumanEvalGenerator(seed=args.seed) | |
| if args.rating_only: | |
| # Rating test | |
| with open(args.responses, "r") as f: | |
| responses_data = json.load(f) | |
| responses = [r["response"] if isinstance(r, dict) else r for r in responses_data] | |
| items = generator.generate_rating_test(questions, responses) | |
| files = generator.save_evaluation_set(items, args.output) | |
| else: | |
| # A/B test | |
| with open(args.responses_a, "r") as f: | |
| resp_a_data = json.load(f) | |
| with open(args.responses_b, "r") as f: | |
| resp_b_data = json.load(f) | |
| resp_a = [r["response"] if isinstance(r, dict) else r for r in resp_a_data] | |
| resp_b = [r["response"] if isinstance(r, dict) else r for r in resp_b_data] | |
| items = generator.generate_blind_test(questions, resp_a, resp_b) | |
| files = generator.save_evaluation_set(items, args.output) | |
| print(f"\nGenerated evaluation set:") | |
| for name, path in files.items(): | |
| print(f" {name}: {path}") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) | |