""" Human Evaluation Generator Module Generate evaluation sets for blind testing by human evaluators. Creates paired comparisons and rating tasks. Example usage: generator = HumanEvalGenerator() eval_set = generator.generate_blind_test( questions=test_questions, responses_a=model_a_responses, responses_b=model_b_responses, ) """ import json import random import hashlib from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Optional from loguru import logger @dataclass class EvaluationItem: """A single item for human evaluation.""" item_id: str question: str response_a: str response_b: str response_a_source: str # Hidden from evaluator response_b_source: str # Hidden from evaluator category: str metadata: dict = field(default_factory=dict) def to_evaluator_dict(self) -> dict: """Return dict for evaluator (without source info).""" return { "item_id": self.item_id, "question": self.question, "response_a": self.response_a, "response_b": self.response_b, "category": self.category, } def to_full_dict(self) -> dict: """Return full dict with sources.""" return { "item_id": self.item_id, "question": self.question, "response_a": self.response_a, "response_b": self.response_b, "response_a_source": self.response_a_source, "response_b_source": self.response_b_source, "category": self.category, "metadata": self.metadata, } @dataclass class RatingItem: """A single item for absolute rating.""" item_id: str question: str response: str source: str # Hidden category: str criteria: list = field(default_factory=list) metadata: dict = field(default_factory=dict) def to_evaluator_dict(self) -> dict: """Return dict for evaluator.""" return { "item_id": self.item_id, "question": self.question, "response": self.response, "category": self.category, "criteria": self.criteria, } class HumanEvalGenerator: """ Generate evaluation sets for human evaluation. Creates: - Blind A/B comparisons - Absolute rating tasks - Multi-criteria evaluations Example: >>> generator = HumanEvalGenerator() >>> eval_set = generator.generate_blind_test(questions, resp_a, resp_b) >>> generator.save_evaluation_set(eval_set, "evaluation/") """ # Default evaluation criteria DEFAULT_CRITERIA = [ { "name": "voice_authenticity", "description": "How well does the response capture the CEO's authentic voice and communication style?", "scale": "1-5 (1=Not at all, 5=Perfectly authentic)", }, { "name": "helpfulness", "description": "How helpful and substantive is the response in addressing the question?", "scale": "1-5 (1=Not helpful, 5=Extremely helpful)", }, { "name": "clarity", "description": "How clear and well-organized is the response?", "scale": "1-5 (1=Confusing, 5=Crystal clear)", }, { "name": "professionalism", "description": "How appropriate is the response for professional business communication?", "scale": "1-5 (1=Inappropriate, 5=Highly professional)", }, ] # Question categories for balanced evaluation CATEGORIES = [ "opinion", "strategic", "factual", "personal_philosophy", "challenge", ] def __init__( self, criteria: Optional[list] = None, randomize: bool = True, seed: Optional[int] = None, ): """ Initialize the generator. Args: criteria: Custom evaluation criteria randomize: Whether to randomize response order seed: Random seed for reproducibility """ self.criteria = criteria or self.DEFAULT_CRITERIA self.randomize = randomize if seed is not None: random.seed(seed) def generate_blind_test( self, questions: list[str], responses_a: list[str], responses_b: list[str], source_a_name: str = "Model A", source_b_name: str = "Model B", categories: Optional[list[str]] = None, ) -> list[EvaluationItem]: """ Generate a blind A/B comparison test. Args: questions: List of questions responses_a: Responses from source A responses_b: Responses from source B source_a_name: Name of source A (hidden from evaluator) source_b_name: Name of source B (hidden from evaluator) categories: Optional category for each question Returns: List of EvaluationItem objects """ if len(questions) != len(responses_a) or len(questions) != len(responses_b): raise ValueError("Questions and responses must have same length") categories = categories or ["general"] * len(questions) items = [] for i, (q, ra, rb, cat) in enumerate(zip(questions, responses_a, responses_b, categories)): # Generate unique ID item_id = self._generate_id(q, ra, rb) # Randomize order if enabled if self.randomize and random.random() > 0.5: ra, rb = rb, ra source_a_name, source_b_name = source_b_name, source_a_name items.append(EvaluationItem( item_id=item_id, question=q, response_a=ra, response_b=rb, response_a_source=source_a_name, response_b_source=source_b_name, category=cat, metadata={"index": i}, )) if self.randomize: random.shuffle(items) return items def generate_rating_test( self, questions: list[str], responses: list[str], source_name: str = "Model", categories: Optional[list[str]] = None, criteria: Optional[list] = None, ) -> list[RatingItem]: """ Generate an absolute rating test. Args: questions: List of questions responses: List of responses source_name: Name of source (hidden from evaluator) categories: Optional category for each question criteria: Evaluation criteria to use Returns: List of RatingItem objects """ if len(questions) != len(responses): raise ValueError("Questions and responses must have same length") categories = categories or ["general"] * len(questions) criteria = criteria or self.criteria items = [] for i, (q, r, cat) in enumerate(zip(questions, responses, categories)): item_id = self._generate_id(q, r) items.append(RatingItem( item_id=item_id, question=q, response=r, source=source_name, category=cat, criteria=criteria, metadata={"index": i}, )) if self.randomize: random.shuffle(items) return items def generate_multi_model_test( self, questions: list[str], model_responses: dict[str, list[str]], categories: Optional[list[str]] = None, ) -> list[EvaluationItem]: """ Generate pairwise comparisons for multiple models. Args: questions: List of questions model_responses: Dict of {model_name: responses} categories: Optional categories Returns: List of pairwise EvaluationItem objects """ model_names = list(model_responses.keys()) if len(model_names) < 2: raise ValueError("Need at least 2 models for comparison") items = [] # Generate all pairs for i in range(len(model_names)): for j in range(i + 1, len(model_names)): name_a = model_names[i] name_b = model_names[j] pair_items = self.generate_blind_test( questions=questions, responses_a=model_responses[name_a], responses_b=model_responses[name_b], source_a_name=name_a, source_b_name=name_b, categories=categories, ) items.extend(pair_items) if self.randomize: random.shuffle(items) return items def save_evaluation_set( self, items: list, output_dir: str | Path, include_sources: bool = False, ) -> dict: """ Save evaluation set to files. Args: items: Evaluation items output_dir: Output directory include_sources: Include source info in evaluator file Returns: Dict with file paths """ output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Evaluator file (without source info) evaluator_data = { "metadata": { "created": timestamp, "num_items": len(items), "criteria": self.criteria, }, "items": [ item.to_evaluator_dict() for item in items ], } evaluator_path = output_dir / f"evaluation_set_{timestamp}.json" with open(evaluator_path, "w", encoding="utf-8") as f: json.dump(evaluator_data, f, indent=2, ensure_ascii=False) # Answer key (with source info) answer_key_data = { "metadata": { "created": timestamp, "num_items": len(items), }, "items": [ item.to_full_dict() for item in items ], } answer_key_path = output_dir / f"answer_key_{timestamp}.json" with open(answer_key_path, "w", encoding="utf-8") as f: json.dump(answer_key_data, f, indent=2, ensure_ascii=False) # Instructions file instructions = self._generate_instructions() instructions_path = output_dir / "evaluation_instructions.md" with open(instructions_path, "w", encoding="utf-8") as f: f.write(instructions) logger.info(f"Saved evaluation set to: {output_dir}") return { "evaluator_file": str(evaluator_path), "answer_key": str(answer_key_path), "instructions": str(instructions_path), } def load_results( self, results_path: str | Path, answer_key_path: str | Path, ) -> dict: """ Load and analyze evaluation results. Args: results_path: Path to evaluator results answer_key_path: Path to answer key Returns: Analysis results """ with open(results_path, "r") as f: results = json.load(f) with open(answer_key_path, "r") as f: answer_key = json.load(f) # Build lookup key_lookup = {item["item_id"]: item for item in answer_key["items"]} # Analyze model_wins = {} model_ratings = {} for result in results.get("responses", []): item_id = result.get("item_id") if item_id not in key_lookup: continue key = key_lookup[item_id] # For A/B comparisons if "preference" in result: pref = result["preference"] if pref == "A": winner = key.get("response_a_source", "A") elif pref == "B": winner = key.get("response_b_source", "B") else: winner = "tie" model_wins[winner] = model_wins.get(winner, 0) + 1 # For ratings if "ratings" in result: source = key.get("source", "unknown") if source not in model_ratings: model_ratings[source] = {c["name"]: [] for c in self.criteria} for criterion, rating in result["ratings"].items(): if criterion in model_ratings[source]: model_ratings[source][criterion].append(rating) # Calculate averages avg_ratings = {} for source, ratings in model_ratings.items(): avg_ratings[source] = { criterion: sum(scores) / len(scores) if scores else 0 for criterion, scores in ratings.items() } return { "comparison_wins": model_wins, "average_ratings": avg_ratings, "num_responses": len(results.get("responses", [])), } def _generate_id(self, *args) -> str: """Generate unique ID from content.""" content = "|".join(str(a)[:50] for a in args) return hashlib.md5(content.encode()).hexdigest()[:12] def _generate_instructions(self) -> str: """Generate evaluation instructions.""" criteria_text = "\n".join( f"- **{c['name']}**: {c['description']} ({c['scale']})" for c in self.criteria ) return f"""# Human Evaluation Instructions ## Overview You will be evaluating AI-generated responses to various questions. Your task is to assess the quality of these responses based on specific criteria. ## Evaluation Criteria {criteria_text} ## Guidelines 1. **Read Carefully**: Read each question and response thoroughly before evaluating. 2. **Be Consistent**: Apply the same standards across all evaluations. 3. **Consider Context**: The responses are meant to represent a CEO's communication style for a technology company. 4. **A/B Comparisons**: When comparing two responses, select the one that better addresses the question while maintaining an authentic CEO voice. 5. **Rating Scale**: Use the full range of the rating scale. Reserve 5s for truly exceptional responses and 1s for clearly deficient ones. ## What to Look For ### Voice Authenticity - Does it sound like a real CEO speaking? - Is the tone confident but not arrogant? - Does it reflect genuine experience and insight? ### Helpfulness - Does it actually answer the question? - Does it provide actionable insights? - Is it substantive rather than generic? ### Clarity - Is the response well-organized? - Are ideas expressed clearly? - Is it easy to follow the reasoning? ### Professionalism - Is it appropriate for business communication? - Does it maintain proper decorum? - Is it culturally appropriate? ## Notes - Take your time with each evaluation - If unsure, re-read and consider both options carefully - Your honest assessment is valuable Thank you for your participation! """ def main(): """CLI entry point for generating evaluation sets.""" import argparse parser = argparse.ArgumentParser( description="Generate human evaluation sets", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python human_eval_generator.py --questions q.json --responses-a a.json --responses-b b.json python human_eval_generator.py --questions q.json --responses r.json --rating-only """, ) parser.add_argument("--questions", required=True, help="Questions JSON file") parser.add_argument("--responses-a", help="Responses A JSON file (for A/B test)") parser.add_argument("--responses-b", help="Responses B JSON file (for A/B test)") parser.add_argument("--responses", help="Responses JSON file (for rating test)") parser.add_argument("--output", default="evaluation/", help="Output directory") parser.add_argument("--rating-only", action="store_true", help="Generate rating test only") parser.add_argument("--seed", type=int, help="Random seed") args = parser.parse_args() # Load questions with open(args.questions, "r") as f: questions_data = json.load(f) questions = [q["question"] if isinstance(q, dict) else q for q in questions_data] generator = HumanEvalGenerator(seed=args.seed) if args.rating_only: # Rating test with open(args.responses, "r") as f: responses_data = json.load(f) responses = [r["response"] if isinstance(r, dict) else r for r in responses_data] items = generator.generate_rating_test(questions, responses) files = generator.save_evaluation_set(items, args.output) else: # A/B test with open(args.responses_a, "r") as f: resp_a_data = json.load(f) with open(args.responses_b, "r") as f: resp_b_data = json.load(f) resp_a = [r["response"] if isinstance(r, dict) else r for r in resp_a_data] resp_b = [r["response"] if isinstance(r, dict) else r for r in resp_b_data] items = generator.generate_blind_test(questions, resp_a, resp_b) files = generator.save_evaluation_set(items, args.output) print(f"\nGenerated evaluation set:") for name, path in files.items(): print(f" {name}: {path}") return 0 if __name__ == "__main__": exit(main())