ai_exec / src /evaluation /human_eval_generator.py
Chaitanya-aitf's picture
Upload 38 files
45ee481 verified
"""
Human Evaluation Generator Module
Generate evaluation sets for blind testing by human evaluators.
Creates paired comparisons and rating tasks.
Example usage:
generator = HumanEvalGenerator()
eval_set = generator.generate_blind_test(
questions=test_questions,
responses_a=model_a_responses,
responses_b=model_b_responses,
)
"""
import json
import random
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
from loguru import logger
@dataclass
class EvaluationItem:
"""A single item for human evaluation."""
item_id: str
question: str
response_a: str
response_b: str
response_a_source: str # Hidden from evaluator
response_b_source: str # Hidden from evaluator
category: str
metadata: dict = field(default_factory=dict)
def to_evaluator_dict(self) -> dict:
"""Return dict for evaluator (without source info)."""
return {
"item_id": self.item_id,
"question": self.question,
"response_a": self.response_a,
"response_b": self.response_b,
"category": self.category,
}
def to_full_dict(self) -> dict:
"""Return full dict with sources."""
return {
"item_id": self.item_id,
"question": self.question,
"response_a": self.response_a,
"response_b": self.response_b,
"response_a_source": self.response_a_source,
"response_b_source": self.response_b_source,
"category": self.category,
"metadata": self.metadata,
}
@dataclass
class RatingItem:
"""A single item for absolute rating."""
item_id: str
question: str
response: str
source: str # Hidden
category: str
criteria: list = field(default_factory=list)
metadata: dict = field(default_factory=dict)
def to_evaluator_dict(self) -> dict:
"""Return dict for evaluator."""
return {
"item_id": self.item_id,
"question": self.question,
"response": self.response,
"category": self.category,
"criteria": self.criteria,
}
class HumanEvalGenerator:
"""
Generate evaluation sets for human evaluation.
Creates:
- Blind A/B comparisons
- Absolute rating tasks
- Multi-criteria evaluations
Example:
>>> generator = HumanEvalGenerator()
>>> eval_set = generator.generate_blind_test(questions, resp_a, resp_b)
>>> generator.save_evaluation_set(eval_set, "evaluation/")
"""
# Default evaluation criteria
DEFAULT_CRITERIA = [
{
"name": "voice_authenticity",
"description": "How well does the response capture the CEO's authentic voice and communication style?",
"scale": "1-5 (1=Not at all, 5=Perfectly authentic)",
},
{
"name": "helpfulness",
"description": "How helpful and substantive is the response in addressing the question?",
"scale": "1-5 (1=Not helpful, 5=Extremely helpful)",
},
{
"name": "clarity",
"description": "How clear and well-organized is the response?",
"scale": "1-5 (1=Confusing, 5=Crystal clear)",
},
{
"name": "professionalism",
"description": "How appropriate is the response for professional business communication?",
"scale": "1-5 (1=Inappropriate, 5=Highly professional)",
},
]
# Question categories for balanced evaluation
CATEGORIES = [
"opinion",
"strategic",
"factual",
"personal_philosophy",
"challenge",
]
def __init__(
self,
criteria: Optional[list] = None,
randomize: bool = True,
seed: Optional[int] = None,
):
"""
Initialize the generator.
Args:
criteria: Custom evaluation criteria
randomize: Whether to randomize response order
seed: Random seed for reproducibility
"""
self.criteria = criteria or self.DEFAULT_CRITERIA
self.randomize = randomize
if seed is not None:
random.seed(seed)
def generate_blind_test(
self,
questions: list[str],
responses_a: list[str],
responses_b: list[str],
source_a_name: str = "Model A",
source_b_name: str = "Model B",
categories: Optional[list[str]] = None,
) -> list[EvaluationItem]:
"""
Generate a blind A/B comparison test.
Args:
questions: List of questions
responses_a: Responses from source A
responses_b: Responses from source B
source_a_name: Name of source A (hidden from evaluator)
source_b_name: Name of source B (hidden from evaluator)
categories: Optional category for each question
Returns:
List of EvaluationItem objects
"""
if len(questions) != len(responses_a) or len(questions) != len(responses_b):
raise ValueError("Questions and responses must have same length")
categories = categories or ["general"] * len(questions)
items = []
for i, (q, ra, rb, cat) in enumerate(zip(questions, responses_a, responses_b, categories)):
# Generate unique ID
item_id = self._generate_id(q, ra, rb)
# Randomize order if enabled
if self.randomize and random.random() > 0.5:
ra, rb = rb, ra
source_a_name, source_b_name = source_b_name, source_a_name
items.append(EvaluationItem(
item_id=item_id,
question=q,
response_a=ra,
response_b=rb,
response_a_source=source_a_name,
response_b_source=source_b_name,
category=cat,
metadata={"index": i},
))
if self.randomize:
random.shuffle(items)
return items
def generate_rating_test(
self,
questions: list[str],
responses: list[str],
source_name: str = "Model",
categories: Optional[list[str]] = None,
criteria: Optional[list] = None,
) -> list[RatingItem]:
"""
Generate an absolute rating test.
Args:
questions: List of questions
responses: List of responses
source_name: Name of source (hidden from evaluator)
categories: Optional category for each question
criteria: Evaluation criteria to use
Returns:
List of RatingItem objects
"""
if len(questions) != len(responses):
raise ValueError("Questions and responses must have same length")
categories = categories or ["general"] * len(questions)
criteria = criteria or self.criteria
items = []
for i, (q, r, cat) in enumerate(zip(questions, responses, categories)):
item_id = self._generate_id(q, r)
items.append(RatingItem(
item_id=item_id,
question=q,
response=r,
source=source_name,
category=cat,
criteria=criteria,
metadata={"index": i},
))
if self.randomize:
random.shuffle(items)
return items
def generate_multi_model_test(
self,
questions: list[str],
model_responses: dict[str, list[str]],
categories: Optional[list[str]] = None,
) -> list[EvaluationItem]:
"""
Generate pairwise comparisons for multiple models.
Args:
questions: List of questions
model_responses: Dict of {model_name: responses}
categories: Optional categories
Returns:
List of pairwise EvaluationItem objects
"""
model_names = list(model_responses.keys())
if len(model_names) < 2:
raise ValueError("Need at least 2 models for comparison")
items = []
# Generate all pairs
for i in range(len(model_names)):
for j in range(i + 1, len(model_names)):
name_a = model_names[i]
name_b = model_names[j]
pair_items = self.generate_blind_test(
questions=questions,
responses_a=model_responses[name_a],
responses_b=model_responses[name_b],
source_a_name=name_a,
source_b_name=name_b,
categories=categories,
)
items.extend(pair_items)
if self.randomize:
random.shuffle(items)
return items
def save_evaluation_set(
self,
items: list,
output_dir: str | Path,
include_sources: bool = False,
) -> dict:
"""
Save evaluation set to files.
Args:
items: Evaluation items
output_dir: Output directory
include_sources: Include source info in evaluator file
Returns:
Dict with file paths
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Evaluator file (without source info)
evaluator_data = {
"metadata": {
"created": timestamp,
"num_items": len(items),
"criteria": self.criteria,
},
"items": [
item.to_evaluator_dict() for item in items
],
}
evaluator_path = output_dir / f"evaluation_set_{timestamp}.json"
with open(evaluator_path, "w", encoding="utf-8") as f:
json.dump(evaluator_data, f, indent=2, ensure_ascii=False)
# Answer key (with source info)
answer_key_data = {
"metadata": {
"created": timestamp,
"num_items": len(items),
},
"items": [
item.to_full_dict() for item in items
],
}
answer_key_path = output_dir / f"answer_key_{timestamp}.json"
with open(answer_key_path, "w", encoding="utf-8") as f:
json.dump(answer_key_data, f, indent=2, ensure_ascii=False)
# Instructions file
instructions = self._generate_instructions()
instructions_path = output_dir / "evaluation_instructions.md"
with open(instructions_path, "w", encoding="utf-8") as f:
f.write(instructions)
logger.info(f"Saved evaluation set to: {output_dir}")
return {
"evaluator_file": str(evaluator_path),
"answer_key": str(answer_key_path),
"instructions": str(instructions_path),
}
def load_results(
self,
results_path: str | Path,
answer_key_path: str | Path,
) -> dict:
"""
Load and analyze evaluation results.
Args:
results_path: Path to evaluator results
answer_key_path: Path to answer key
Returns:
Analysis results
"""
with open(results_path, "r") as f:
results = json.load(f)
with open(answer_key_path, "r") as f:
answer_key = json.load(f)
# Build lookup
key_lookup = {item["item_id"]: item for item in answer_key["items"]}
# Analyze
model_wins = {}
model_ratings = {}
for result in results.get("responses", []):
item_id = result.get("item_id")
if item_id not in key_lookup:
continue
key = key_lookup[item_id]
# For A/B comparisons
if "preference" in result:
pref = result["preference"]
if pref == "A":
winner = key.get("response_a_source", "A")
elif pref == "B":
winner = key.get("response_b_source", "B")
else:
winner = "tie"
model_wins[winner] = model_wins.get(winner, 0) + 1
# For ratings
if "ratings" in result:
source = key.get("source", "unknown")
if source not in model_ratings:
model_ratings[source] = {c["name"]: [] for c in self.criteria}
for criterion, rating in result["ratings"].items():
if criterion in model_ratings[source]:
model_ratings[source][criterion].append(rating)
# Calculate averages
avg_ratings = {}
for source, ratings in model_ratings.items():
avg_ratings[source] = {
criterion: sum(scores) / len(scores) if scores else 0
for criterion, scores in ratings.items()
}
return {
"comparison_wins": model_wins,
"average_ratings": avg_ratings,
"num_responses": len(results.get("responses", [])),
}
def _generate_id(self, *args) -> str:
"""Generate unique ID from content."""
content = "|".join(str(a)[:50] for a in args)
return hashlib.md5(content.encode()).hexdigest()[:12]
def _generate_instructions(self) -> str:
"""Generate evaluation instructions."""
criteria_text = "\n".join(
f"- **{c['name']}**: {c['description']} ({c['scale']})"
for c in self.criteria
)
return f"""# Human Evaluation Instructions
## Overview
You will be evaluating AI-generated responses to various questions. Your task is to assess the quality of these responses based on specific criteria.
## Evaluation Criteria
{criteria_text}
## Guidelines
1. **Read Carefully**: Read each question and response thoroughly before evaluating.
2. **Be Consistent**: Apply the same standards across all evaluations.
3. **Consider Context**: The responses are meant to represent a CEO's communication style for a technology company.
4. **A/B Comparisons**: When comparing two responses, select the one that better addresses the question while maintaining an authentic CEO voice.
5. **Rating Scale**: Use the full range of the rating scale. Reserve 5s for truly exceptional responses and 1s for clearly deficient ones.
## What to Look For
### Voice Authenticity
- Does it sound like a real CEO speaking?
- Is the tone confident but not arrogant?
- Does it reflect genuine experience and insight?
### Helpfulness
- Does it actually answer the question?
- Does it provide actionable insights?
- Is it substantive rather than generic?
### Clarity
- Is the response well-organized?
- Are ideas expressed clearly?
- Is it easy to follow the reasoning?
### Professionalism
- Is it appropriate for business communication?
- Does it maintain proper decorum?
- Is it culturally appropriate?
## Notes
- Take your time with each evaluation
- If unsure, re-read and consider both options carefully
- Your honest assessment is valuable
Thank you for your participation!
"""
def main():
"""CLI entry point for generating evaluation sets."""
import argparse
parser = argparse.ArgumentParser(
description="Generate human evaluation sets",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python human_eval_generator.py --questions q.json --responses-a a.json --responses-b b.json
python human_eval_generator.py --questions q.json --responses r.json --rating-only
""",
)
parser.add_argument("--questions", required=True, help="Questions JSON file")
parser.add_argument("--responses-a", help="Responses A JSON file (for A/B test)")
parser.add_argument("--responses-b", help="Responses B JSON file (for A/B test)")
parser.add_argument("--responses", help="Responses JSON file (for rating test)")
parser.add_argument("--output", default="evaluation/", help="Output directory")
parser.add_argument("--rating-only", action="store_true", help="Generate rating test only")
parser.add_argument("--seed", type=int, help="Random seed")
args = parser.parse_args()
# Load questions
with open(args.questions, "r") as f:
questions_data = json.load(f)
questions = [q["question"] if isinstance(q, dict) else q for q in questions_data]
generator = HumanEvalGenerator(seed=args.seed)
if args.rating_only:
# Rating test
with open(args.responses, "r") as f:
responses_data = json.load(f)
responses = [r["response"] if isinstance(r, dict) else r for r in responses_data]
items = generator.generate_rating_test(questions, responses)
files = generator.save_evaluation_set(items, args.output)
else:
# A/B test
with open(args.responses_a, "r") as f:
resp_a_data = json.load(f)
with open(args.responses_b, "r") as f:
resp_b_data = json.load(f)
resp_a = [r["response"] if isinstance(r, dict) else r for r in resp_a_data]
resp_b = [r["response"] if isinstance(r, dict) else r for r in resp_b_data]
items = generator.generate_blind_test(questions, resp_a, resp_b)
files = generator.save_evaluation_set(items, args.output)
print(f"\nGenerated evaluation set:")
for name, path in files.items():
print(f" {name}: {path}")
return 0
if __name__ == "__main__":
exit(main())