CAIA-evaluate / evaluator.py
zhejianzhang's picture
fix
bebca7c
import json
import asyncio
import os
from statistics import mean
import traceback
from typing import List, Optional, Type, TypeVar
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from pydantic import BaseModel
from schemas import (
Answer, EnsembleEvaluateScore, EvaluateData, QuestionData, BenchmarkItem,
EvaluateTarget, AnswerEvaluateResult, ReasoningEvaluateResult, ReasoningStep, ToolUse, ToolUseEvaluateResult,
EvaluateScore
)
from openai import AsyncClient
from utils import count_tokens, truncate_text
T = TypeVar("T", bound=BaseModel)
llm_config = {
"parse_llm_config": {
"model_name": "gpt-4.1-mini-2025-04-14",
"api_key": os.getenv("OPENAI_API_KEY", None),
"model_params": {
"temperature": 0
}
},
"evaluate_llm_configs": [
{
"model_name": "o3-2025-04-16",
"api_key": os.getenv("OPENAI_API_KEY", None),
"model_params": {
"reasoning_effort": "medium"
}
},
{
"model_name": "gpt-4.1",
"api_key": os.getenv("OPENAI_API_KEY", None),
"base_url": "https://api.openai.com/v1",
"model_params": {
"temperature": 0.2
}
},
{
"model_name": "deepseek-r1-250120",
"api_key": os.getenv("DEEPSEEK_API_KEY", None),
"base_url": os.getenv("DEEPSEEK_BASE_URL", None),
"model_params": {
"temperature": 0.2
}
}
]
}
class Evaluator:
def __init__(self,
dataset:List[BenchmarkItem] = [],
api_key:Optional[str] = None,
model_name:str = "gpt-4.1",
base_url:Optional[str] = None,
parse_model:str = "gpt-4.1-mini",
parse_model_api_key:Optional[str] = None,
parse_model_base_url:Optional[str] = None,
**model_params):
if not api_key or not parse_model_api_key:
raise ValueError("api_key and parse_model_api_key are required")
self.system_prompt = """
You are a helpful assistant that can evaluate the quality of a given answer.
"""
# self.dataset_path = dataset_path
self.dataset = dataset
self.benchmark_data:List[BenchmarkItem] = []
self.model_name = model_name
self.base_url = base_url
self.parse_model = parse_model
self.model_params = model_params or {"temperature": 0.0} # 默认参数
self.parse_client = AsyncClient(api_key=parse_model_api_key, base_url=parse_model_base_url)
self.client = AsyncClient(api_key=api_key, base_url=self.base_url)
self.tool_output_max_tokens = 2000
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=15))
async def parse_str_to_format(self, string_output:Optional[str], target_data_class: Type[T]) -> Optional[T]:
if not string_output:
return None
try:
# 对于解析模型的参数,使用默认参数
response = await self.parse_client.beta.chat.completions.parse(
model=self.parse_model,
messages=[{"role": "user", "content": string_output}],
response_format=target_data_class,
temperature=0.0,
)
result = response.choices[0].message.parsed
if result:
return result
except Exception as e:
print(f"Error parsing string to format: {e}")
return None
async def summarize_tool_use_output(self, question:str, tool_use_list:List[ToolUse]) -> list[ToolUse]:
"""If the tool use output is too long, summarize the tool use output to keep the important information"""
system_prompt = f"""
You are a helpful assistant that can summarize the tool use output. Your output format should be in the following format:"In order to solve <Task>, Invoked <tool_name> with <tool_input> and got <summarized_tool_output>"
NOTE:
1. Ignore the noise in the tool_output, only keep the important information that might help to solve/improve the possibility of solving the task.
2. If the tool_output is not related to the question, just summarize the tool_output to "No relevant information Found"
"""
async def process_tool_use(tool_use: ToolUse) -> ToolUse:
if count_tokens(tool_use.tool_output, self.parse_model) > self.tool_output_max_tokens:
user_prompt = f"""
Question: {question}
Tool use:
{tool_use.to_prompt()}
"""
response = await self.parse_client.chat.completions.create(
model=self.parse_model,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
**self.model_params
)
content = response.choices[0].message.content
if content:
tool_use.tool_output = content
else:
tool_use.tool_output = truncate_text(tool_use.tool_output, self.parse_model, self.tool_output_max_tokens)
return tool_use
# 并行处理所有tool_use
tasks = [process_tool_use(tool_use) for tool_use in tool_use_list]
tool_use_list = await asyncio.gather(*tasks)
return tool_use_list
async def evaluate_reasoning(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[ReasoningEvaluateResult]]:
system_prompt = """You are a professional evaluator for AI assistants in the crypto domain. You need to score the assistant's reasoning ability based on the given evaluation criteria and reasoning process. Please follow these steps during evaluation:
1. Review the reasoning steps and understand whether each step's logic is relevant to the task and helps solve the problem.
2. If there are no explicit reasoning steps, treat tool calls as an alternative form of reasoning steps and consider the reasoning process represented by the tool usage.
3. Assess the completeness and rigor of the reasoning chain, judging whether each step is reasonable and accurate, and whether there are logical flaws or missing steps.
4. Consider the information references and tool calls in the reasoning process, judge whether the information sources are sufficient, whether the tool usage is appropriate, and analyze the connections and dependencies between each step in the reasoning chain.
5. According to the evaluation criteria, give a score for each criterion, with the score ranging from 0 to the maximum points for that criterion.
"""
reasoning_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.REASONING]
if not reasoning_items:
return 0.0, None
prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated Reasoning Steps:
```
{"\n".join([step.to_prompt() for step in output_answer.reasoning_steps])}
```
In addition, the following function calls are also part of the reasoning steps. The choose of the tool use and the arguments should be taken into account:
```
{"\n".join([step.to_prompt(ignore_output=True) for step in output_answer.function_calls])}
```
Evaluation Rules:\n"""
for item in reasoning_items:
prompt += f"{item.to_prompt()}\n"
prompt += f"Now evaluate the reasoning steps based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
# print(prompt)
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
**self.model_params
)
content = response.choices[0].message.content
result = await self.parse_str_to_format(content, ReasoningEvaluateResult)
if not result:
retry_count += 1
continue
if sum([item.score for item in result.items]) > sum([item.points for item in reasoning_items]):
retry_count += 1
continue
return sum([item.score for item in result.items]), result
except Exception as e:
print(f"Error evaluating reasoning (attempt {retry_count + 1}/{max_retries}): {e}")
retry_count += 1
if retry_count == max_retries:
return 0.0, None
await asyncio.sleep(1) # 添加重试间隔
return 0.0, None
async def evaluate_tool_use(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[ToolUseEvaluateResult]]:
system_prompt = """You are a professional crypto AI assistant evaluator. You need to score the assistant's tools using ability according to the given criterias and the tool use output. When evaluating, you should follow the following steps:
1. Take a brief look at the tool using, descriptions and input args, to make sure the tool using is correct/related to solving the task.
2. Evaluate each step of the tool use to estimate the efficiency and accuracy of the tool use.
3. Consider the continuity of tool calls: The return result of the previous tool call may affect the input arguments of the next tool call.
"""
tool_use_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.TOOL_USE]
if not tool_use_items:
print(f"No tool use items for task {benchmark_item.task_id}")
return 0.0, None
prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated tool use:
```
{"\n".join([step.to_prompt() for step in output_answer.function_calls])}
```
Evaluation Rules:
"""
for item in tool_use_items:
prompt += f"{item.to_prompt()}\n"
prompt += f"Now evaluate the tool use based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
**self.model_params
)
content = response.choices[0].message.content
result = await self.parse_str_to_format(content, ToolUseEvaluateResult)
if not result:
retry_count += 1
continue
if sum([item.score for item in result.items]) > sum([item.points for item in tool_use_items]):
retry_count += 1
continue
return sum([item.score for item in result.items]), result
except Exception as e:
print(f"Error evaluating tool use (attempt {retry_count + 1}/{max_retries}): {traceback.format_exc()}")
retry_count += 1
if retry_count == max_retries:
return 0.0, None
await asyncio.sleep(1) # 添加重试间隔
return 0.0, None
async def evaluate_answer(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[AnswerEvaluateResult]]:
system_prompt = """You are a professional evaluator for crypto AI assistant answers. You need to score the AI assistant's final answer according to the given evaluation criteria. Please follow these steps during evaluation:
1. Carefully read the task question and the AI assistant's final output, and determine whether the answer accurately and completely solves the task requirements and conforms to basic common sense.
2. Check whether the facts, data, and reasoning process in the answer are correct, and whether there are logical errors, numerical errors, or fabricated facts.
3. For specific numerical values, allow a certain range of error. If the criteria do not specify the error range, use a ±5% margin.
4. For each evaluation criterion, give a score for each item, with the score ranging from 0 to the full score for that criterion.
Please strictly follow the evaluation criteria to provide objective and fair scoring, and briefly explain your reasoning for the scores."""
evaluate_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.ANSWER]
if not evaluate_items:
return 0.0, None
prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated output:
```
{output_answer.to_prompt()}
```
Evaluation Rules:
"""
for item in evaluate_items:
prompt += f"{item.to_prompt()}\n"
prompt += f"Now evaluate the output answer based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
# print(prompt)
max_retry = 3
for _ in range(max_retry):
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
**self.model_params
)
result = await self.parse_str_to_format(response.choices[0].message.content, AnswerEvaluateResult)
if not result:
continue
if result.score > sum([item.points for item in evaluate_items]):
continue
return result.score, result
except Exception as e:
print(f"Error evaluating answer: {e}")
continue
return 0.0, None
async def a_evaluate(self, task_id:str, answer:Answer, to_evaluate_item: BenchmarkItem) -> EvaluateScore | None:
import asyncio
tasks = [
self.evaluate_answer(answer, to_evaluate_item),
self.evaluate_reasoning(answer, to_evaluate_item),
self.evaluate_tool_use(answer, to_evaluate_item),
]
[(answer_score, answer_evaulate_result), (reasoning_score, reasoning_evaulate_result), (tool_use_score, tool_use_evaulate_result)] = await asyncio.gather(*tasks)
analysis = await self.analyze_evaulate_result(answer_evaulate_result, reasoning_evaulate_result, tool_use_evaulate_result, to_evaluate_item)
return analysis
async def analyze_evaulate_result(self,
answer_evaulate_result:AnswerEvaluateResult,
reasoning_evaulate_result:ReasoningEvaluateResult,
tool_use_evaulate_result:ToolUseEvaluateResult,
to_evaluate_item:BenchmarkItem) -> EvaluateScore:
"""Analyze the evaulate result and give the analysis"""
benchmark_answer_item = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.ANSWER][0]
benchmark_reasoning_items = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.REASONING]
benchmark_tool_use_items = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.TOOL_USE]
detail = ""
detail += f"Answer score: {answer_evaulate_result.score} / {benchmark_answer_item.points}\n"
detail += f"Reason: {answer_evaulate_result.reason}\n"
detail += f"Reasoning score: {sum([item.score for item in reasoning_evaulate_result.items])} / {sum([item.points for item in benchmark_reasoning_items])}\n"
for item in reasoning_evaulate_result.items:
detail += f"Reasoning step {item.step}: {item.reason} score: {item.score} / {benchmark_reasoning_items[item.step-1].points}\n"
detail += f"Tool use score: {sum([item.score for item in tool_use_evaulate_result.items])} / {sum([item.points for item in benchmark_tool_use_items])}\n"
for item in tool_use_evaulate_result.items:
detail += f"{item.reason}\n"
# print(detail)
return EvaluateScore(
model_name=self.model_name,
answer_score=answer_evaulate_result.score,
answer_total_score=benchmark_answer_item.points,
reasoning_score=sum([item.score for item in reasoning_evaulate_result.items]),
reasoning_total_score=sum([item.points for item in benchmark_reasoning_items]),
tool_use_score=sum([item.score for item in tool_use_evaulate_result.items]),
tool_use_total_score=sum([item.points for item in benchmark_tool_use_items]),
total_score=answer_evaulate_result.score + sum([item.score for item in reasoning_evaulate_result.items]) + sum([item.score for item in tool_use_evaulate_result.items]),
evaluate_detail=detail,
task_id=to_evaluate_item.task_id,
level=to_evaluate_item.level or 1,
category=to_evaluate_item.category
)
async def ensemble_evaluate(evaulator_list:list[Evaluator], answer:Answer, to_evaluate_item:BenchmarkItem) -> EnsembleEvaluateScore:
# for evaluator in evaulator_list:
# await evaluator.load_validate_data()
results:list[EvaluateScore|None] = await asyncio.gather(*[evaluator.a_evaluate(to_evaluate_item.task_id, answer, to_evaluate_item) for evaluator in evaulator_list])
results = [item for item in results if item]
return EnsembleEvaluateScore(
task_id=to_evaluate_item.task_id,
answer_total_score=mean([item.answer_total_score for item in results if item]),
reasoning_total_score=mean([item.reasoning_total_score for item in results if item]),
tool_use_total_score=mean([item.tool_use_total_score for item in results if item]),
total_score=mean([item.total_score for item in results if item]),
evaluate_detail="no detail",
answer_score=mean([item.answer_score for item in results if item]),
reasoning_score=mean([item.reasoning_score for item in results if item]),
tool_use_score=mean([item.tool_use_score for item in results if item]),
level=to_evaluate_item.level or 1,
category=to_evaluate_item.category,
model_name="ensemble result"
)