Spaces:
Runtime error
Runtime error
File size: 18,969 Bytes
f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 bebca7c f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 e23f952 f3e6f32 7c26356 f3e6f32 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
import json
import asyncio
import os
from statistics import mean
import traceback
from typing import List, Optional, Type, TypeVar
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from pydantic import BaseModel
from schemas import (
Answer, EnsembleEvaluateScore, EvaluateData, QuestionData, BenchmarkItem,
EvaluateTarget, AnswerEvaluateResult, ReasoningEvaluateResult, ReasoningStep, ToolUse, ToolUseEvaluateResult,
EvaluateScore
)
from openai import AsyncClient
from utils import count_tokens, truncate_text
T = TypeVar("T", bound=BaseModel)
llm_config = {
"parse_llm_config": {
"model_name": "gpt-4.1-mini-2025-04-14",
"api_key": os.getenv("OPENAI_API_KEY", None),
"model_params": {
"temperature": 0
}
},
"evaluate_llm_configs": [
{
"model_name": "o3-2025-04-16",
"api_key": os.getenv("OPENAI_API_KEY", None),
"model_params": {
"reasoning_effort": "medium"
}
},
{
"model_name": "gpt-4.1",
"api_key": os.getenv("OPENAI_API_KEY", None),
"base_url": "https://api.openai.com/v1",
"model_params": {
"temperature": 0.2
}
},
{
"model_name": "deepseek-r1-250120",
"api_key": os.getenv("DEEPSEEK_API_KEY", None),
"base_url": os.getenv("DEEPSEEK_BASE_URL", None),
"model_params": {
"temperature": 0.2
}
}
]
}
class Evaluator:
def __init__(self,
dataset:List[BenchmarkItem] = [],
api_key:Optional[str] = None,
model_name:str = "gpt-4.1",
base_url:Optional[str] = None,
parse_model:str = "gpt-4.1-mini",
parse_model_api_key:Optional[str] = None,
parse_model_base_url:Optional[str] = None,
**model_params):
if not api_key or not parse_model_api_key:
raise ValueError("api_key and parse_model_api_key are required")
self.system_prompt = """
You are a helpful assistant that can evaluate the quality of a given answer.
"""
# self.dataset_path = dataset_path
self.dataset = dataset
self.benchmark_data:List[BenchmarkItem] = []
self.model_name = model_name
self.base_url = base_url
self.parse_model = parse_model
self.model_params = model_params or {"temperature": 0.0} # 默认参数
self.parse_client = AsyncClient(api_key=parse_model_api_key, base_url=parse_model_base_url)
self.client = AsyncClient(api_key=api_key, base_url=self.base_url)
self.tool_output_max_tokens = 2000
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=15))
async def parse_str_to_format(self, string_output:Optional[str], target_data_class: Type[T]) -> Optional[T]:
if not string_output:
return None
try:
# 对于解析模型的参数,使用默认参数
response = await self.parse_client.beta.chat.completions.parse(
model=self.parse_model,
messages=[{"role": "user", "content": string_output}],
response_format=target_data_class,
temperature=0.0,
)
result = response.choices[0].message.parsed
if result:
return result
except Exception as e:
print(f"Error parsing string to format: {e}")
return None
async def summarize_tool_use_output(self, question:str, tool_use_list:List[ToolUse]) -> list[ToolUse]:
"""If the tool use output is too long, summarize the tool use output to keep the important information"""
system_prompt = f"""
You are a helpful assistant that can summarize the tool use output. Your output format should be in the following format:"In order to solve <Task>, Invoked <tool_name> with <tool_input> and got <summarized_tool_output>"
NOTE:
1. Ignore the noise in the tool_output, only keep the important information that might help to solve/improve the possibility of solving the task.
2. If the tool_output is not related to the question, just summarize the tool_output to "No relevant information Found"
"""
async def process_tool_use(tool_use: ToolUse) -> ToolUse:
if count_tokens(tool_use.tool_output, self.parse_model) > self.tool_output_max_tokens:
user_prompt = f"""
Question: {question}
Tool use:
{tool_use.to_prompt()}
"""
response = await self.parse_client.chat.completions.create(
model=self.parse_model,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
**self.model_params
)
content = response.choices[0].message.content
if content:
tool_use.tool_output = content
else:
tool_use.tool_output = truncate_text(tool_use.tool_output, self.parse_model, self.tool_output_max_tokens)
return tool_use
# 并行处理所有tool_use
tasks = [process_tool_use(tool_use) for tool_use in tool_use_list]
tool_use_list = await asyncio.gather(*tasks)
return tool_use_list
async def evaluate_reasoning(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[ReasoningEvaluateResult]]:
system_prompt = """You are a professional evaluator for AI assistants in the crypto domain. You need to score the assistant's reasoning ability based on the given evaluation criteria and reasoning process. Please follow these steps during evaluation:
1. Review the reasoning steps and understand whether each step's logic is relevant to the task and helps solve the problem.
2. If there are no explicit reasoning steps, treat tool calls as an alternative form of reasoning steps and consider the reasoning process represented by the tool usage.
3. Assess the completeness and rigor of the reasoning chain, judging whether each step is reasonable and accurate, and whether there are logical flaws or missing steps.
4. Consider the information references and tool calls in the reasoning process, judge whether the information sources are sufficient, whether the tool usage is appropriate, and analyze the connections and dependencies between each step in the reasoning chain.
5. According to the evaluation criteria, give a score for each criterion, with the score ranging from 0 to the maximum points for that criterion.
"""
reasoning_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.REASONING]
if not reasoning_items:
return 0.0, None
prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated Reasoning Steps:
```
{"\n".join([step.to_prompt() for step in output_answer.reasoning_steps])}
```
In addition, the following function calls are also part of the reasoning steps. The choose of the tool use and the arguments should be taken into account:
```
{"\n".join([step.to_prompt(ignore_output=True) for step in output_answer.function_calls])}
```
Evaluation Rules:\n"""
for item in reasoning_items:
prompt += f"{item.to_prompt()}\n"
prompt += f"Now evaluate the reasoning steps based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
# print(prompt)
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
**self.model_params
)
content = response.choices[0].message.content
result = await self.parse_str_to_format(content, ReasoningEvaluateResult)
if not result:
retry_count += 1
continue
if sum([item.score for item in result.items]) > sum([item.points for item in reasoning_items]):
retry_count += 1
continue
return sum([item.score for item in result.items]), result
except Exception as e:
print(f"Error evaluating reasoning (attempt {retry_count + 1}/{max_retries}): {e}")
retry_count += 1
if retry_count == max_retries:
return 0.0, None
await asyncio.sleep(1) # 添加重试间隔
return 0.0, None
async def evaluate_tool_use(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[ToolUseEvaluateResult]]:
system_prompt = """You are a professional crypto AI assistant evaluator. You need to score the assistant's tools using ability according to the given criterias and the tool use output. When evaluating, you should follow the following steps:
1. Take a brief look at the tool using, descriptions and input args, to make sure the tool using is correct/related to solving the task.
2. Evaluate each step of the tool use to estimate the efficiency and accuracy of the tool use.
3. Consider the continuity of tool calls: The return result of the previous tool call may affect the input arguments of the next tool call.
"""
tool_use_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.TOOL_USE]
if not tool_use_items:
print(f"No tool use items for task {benchmark_item.task_id}")
return 0.0, None
prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated tool use:
```
{"\n".join([step.to_prompt() for step in output_answer.function_calls])}
```
Evaluation Rules:
"""
for item in tool_use_items:
prompt += f"{item.to_prompt()}\n"
prompt += f"Now evaluate the tool use based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
**self.model_params
)
content = response.choices[0].message.content
result = await self.parse_str_to_format(content, ToolUseEvaluateResult)
if not result:
retry_count += 1
continue
if sum([item.score for item in result.items]) > sum([item.points for item in tool_use_items]):
retry_count += 1
continue
return sum([item.score for item in result.items]), result
except Exception as e:
print(f"Error evaluating tool use (attempt {retry_count + 1}/{max_retries}): {traceback.format_exc()}")
retry_count += 1
if retry_count == max_retries:
return 0.0, None
await asyncio.sleep(1) # 添加重试间隔
return 0.0, None
async def evaluate_answer(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[AnswerEvaluateResult]]:
system_prompt = """You are a professional evaluator for crypto AI assistant answers. You need to score the AI assistant's final answer according to the given evaluation criteria. Please follow these steps during evaluation:
1. Carefully read the task question and the AI assistant's final output, and determine whether the answer accurately and completely solves the task requirements and conforms to basic common sense.
2. Check whether the facts, data, and reasoning process in the answer are correct, and whether there are logical errors, numerical errors, or fabricated facts.
3. For specific numerical values, allow a certain range of error. If the criteria do not specify the error range, use a ±5% margin.
4. For each evaluation criterion, give a score for each item, with the score ranging from 0 to the full score for that criterion.
Please strictly follow the evaluation criteria to provide objective and fair scoring, and briefly explain your reasoning for the scores."""
evaluate_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.ANSWER]
if not evaluate_items:
return 0.0, None
prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated output:
```
{output_answer.to_prompt()}
```
Evaluation Rules:
"""
for item in evaluate_items:
prompt += f"{item.to_prompt()}\n"
prompt += f"Now evaluate the output answer based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
# print(prompt)
max_retry = 3
for _ in range(max_retry):
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
**self.model_params
)
result = await self.parse_str_to_format(response.choices[0].message.content, AnswerEvaluateResult)
if not result:
continue
if result.score > sum([item.points for item in evaluate_items]):
continue
return result.score, result
except Exception as e:
print(f"Error evaluating answer: {e}")
continue
return 0.0, None
async def a_evaluate(self, task_id:str, answer:Answer, to_evaluate_item: BenchmarkItem) -> EvaluateScore | None:
import asyncio
tasks = [
self.evaluate_answer(answer, to_evaluate_item),
self.evaluate_reasoning(answer, to_evaluate_item),
self.evaluate_tool_use(answer, to_evaluate_item),
]
[(answer_score, answer_evaulate_result), (reasoning_score, reasoning_evaulate_result), (tool_use_score, tool_use_evaulate_result)] = await asyncio.gather(*tasks)
analysis = await self.analyze_evaulate_result(answer_evaulate_result, reasoning_evaulate_result, tool_use_evaulate_result, to_evaluate_item)
return analysis
async def analyze_evaulate_result(self,
answer_evaulate_result:AnswerEvaluateResult,
reasoning_evaulate_result:ReasoningEvaluateResult,
tool_use_evaulate_result:ToolUseEvaluateResult,
to_evaluate_item:BenchmarkItem) -> EvaluateScore:
"""Analyze the evaulate result and give the analysis"""
benchmark_answer_item = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.ANSWER][0]
benchmark_reasoning_items = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.REASONING]
benchmark_tool_use_items = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.TOOL_USE]
detail = ""
detail += f"Answer score: {answer_evaulate_result.score} / {benchmark_answer_item.points}\n"
detail += f"Reason: {answer_evaulate_result.reason}\n"
detail += f"Reasoning score: {sum([item.score for item in reasoning_evaulate_result.items])} / {sum([item.points for item in benchmark_reasoning_items])}\n"
for item in reasoning_evaulate_result.items:
detail += f"Reasoning step {item.step}: {item.reason} score: {item.score} / {benchmark_reasoning_items[item.step-1].points}\n"
detail += f"Tool use score: {sum([item.score for item in tool_use_evaulate_result.items])} / {sum([item.points for item in benchmark_tool_use_items])}\n"
for item in tool_use_evaulate_result.items:
detail += f"{item.reason}\n"
# print(detail)
return EvaluateScore(
model_name=self.model_name,
answer_score=answer_evaulate_result.score,
answer_total_score=benchmark_answer_item.points,
reasoning_score=sum([item.score for item in reasoning_evaulate_result.items]),
reasoning_total_score=sum([item.points for item in benchmark_reasoning_items]),
tool_use_score=sum([item.score for item in tool_use_evaulate_result.items]),
tool_use_total_score=sum([item.points for item in benchmark_tool_use_items]),
total_score=answer_evaulate_result.score + sum([item.score for item in reasoning_evaulate_result.items]) + sum([item.score for item in tool_use_evaulate_result.items]),
evaluate_detail=detail,
task_id=to_evaluate_item.task_id,
level=to_evaluate_item.level or 1,
category=to_evaluate_item.category
)
async def ensemble_evaluate(evaulator_list:list[Evaluator], answer:Answer, to_evaluate_item:BenchmarkItem) -> EnsembleEvaluateScore:
# for evaluator in evaulator_list:
# await evaluator.load_validate_data()
results:list[EvaluateScore|None] = await asyncio.gather(*[evaluator.a_evaluate(to_evaluate_item.task_id, answer, to_evaluate_item) for evaluator in evaulator_list])
results = [item for item in results if item]
return EnsembleEvaluateScore(
task_id=to_evaluate_item.task_id,
answer_total_score=mean([item.answer_total_score for item in results if item]),
reasoning_total_score=mean([item.reasoning_total_score for item in results if item]),
tool_use_total_score=mean([item.tool_use_total_score for item in results if item]),
total_score=mean([item.total_score for item in results if item]),
evaluate_detail="no detail",
answer_score=mean([item.answer_score for item in results if item]),
reasoning_score=mean([item.reasoning_score for item in results if item]),
tool_use_score=mean([item.tool_use_score for item in results if item]),
level=to_evaluate_item.level or 1,
category=to_evaluate_item.category,
model_name="ensemble result"
)
|