File size: 18,969 Bytes
f3e6f32
 
 
 
e23f952
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e23f952
 
 
 
 
 
 
f3e6f32
 
 
 
 
 
 
 
e23f952
f3e6f32
 
 
 
e23f952
f3e6f32
 
bebca7c
f3e6f32
 
 
e23f952
f3e6f32
 
 
 
 
 
e23f952
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e23f952
 
 
 
 
f3e6f32
 
e23f952
f3e6f32
 
 
 
 
 
e23f952
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
 
e23f952
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
e23f952
f3e6f32
 
 
 
 
 
 
 
e23f952
 
 
 
 
 
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e23f952
f3e6f32
 
 
 
 
e23f952
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c26356
f3e6f32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
import json
import asyncio
import os
from statistics import mean
import traceback
from typing import List, Optional, Type, TypeVar
from tenacity import (
    retry,
    retry_if_exception_type,
    stop_after_attempt,
    wait_exponential,
)
from pydantic import BaseModel
from schemas import (
    Answer, EnsembleEvaluateScore, EvaluateData, QuestionData, BenchmarkItem, 
    EvaluateTarget, AnswerEvaluateResult, ReasoningEvaluateResult, ReasoningStep, ToolUse, ToolUseEvaluateResult,
    EvaluateScore
)
from openai import AsyncClient
from utils import count_tokens, truncate_text



T = TypeVar("T", bound=BaseModel)


llm_config = {
    "parse_llm_config": {
        "model_name": "gpt-4.1-mini-2025-04-14",
        "api_key": os.getenv("OPENAI_API_KEY", None),
        "model_params": {
            "temperature": 0
        }
    },
    "evaluate_llm_configs": [
        {
            "model_name": "o3-2025-04-16",
            "api_key": os.getenv("OPENAI_API_KEY", None),
            "model_params": {
                "reasoning_effort": "medium"
            }
        },
        {
            "model_name": "gpt-4.1",
            "api_key": os.getenv("OPENAI_API_KEY", None),
            "base_url": "https://api.openai.com/v1",
            "model_params": {
                "temperature": 0.2
            }
        },
        {
            "model_name": "deepseek-r1-250120",
            "api_key": os.getenv("DEEPSEEK_API_KEY", None),
            "base_url": os.getenv("DEEPSEEK_BASE_URL", None),
            "model_params": {
                "temperature": 0.2
            }
        }
    ]
}


class Evaluator:
    def __init__(self, 
                 dataset:List[BenchmarkItem] = [],
                 api_key:Optional[str] = None,
                 model_name:str = "gpt-4.1",
                 base_url:Optional[str] = None,
                 parse_model:str = "gpt-4.1-mini",
                 parse_model_api_key:Optional[str] = None,
                 parse_model_base_url:Optional[str] = None,
                 **model_params):
        if not api_key or not parse_model_api_key:
            raise ValueError("api_key and parse_model_api_key are required")
        self.system_prompt = """
        You are a helpful assistant that can evaluate the quality of a given answer.
        """
        # self.dataset_path = dataset_path
        self.dataset = dataset
        self.benchmark_data:List[BenchmarkItem] = []
        self.model_name = model_name
        self.base_url = base_url
        self.parse_model = parse_model
        self.model_params = model_params or {"temperature": 0.0}  # 默认参数
        self.parse_client = AsyncClient(api_key=parse_model_api_key, base_url=parse_model_base_url)
        self.client = AsyncClient(api_key=api_key, base_url=self.base_url)
        self.tool_output_max_tokens = 2000

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=15))
    async def parse_str_to_format(self, string_output:Optional[str], target_data_class:  Type[T]) -> Optional[T]:
        if not string_output:
            return None
        try:
            # 对于解析模型的参数,使用默认参数
            response = await self.parse_client.beta.chat.completions.parse(
                model=self.parse_model,
                messages=[{"role": "user", "content": string_output}],
                response_format=target_data_class,
                temperature=0.0,
            )
            result = response.choices[0].message.parsed
            if result:
                return result
        except Exception as e:
            print(f"Error parsing string to format: {e}")
            return None


    
    async def summarize_tool_use_output(self, question:str, tool_use_list:List[ToolUse]) -> list[ToolUse]:
        """If the tool use output is too long, summarize the tool use output to keep the important information"""
        system_prompt = f"""
        You are a helpful assistant that can summarize the tool use output. Your output format should be in the following format:"In order to solve <Task>, Invoked <tool_name> with <tool_input> and got <summarized_tool_output>"
NOTE: 
1. Ignore the noise in the tool_output, only keep the important information that might help to solve/improve the possibility of solving the task. 
2. If the tool_output is not related to the question, just summarize the tool_output to "No relevant information Found"
        """
        async def process_tool_use(tool_use: ToolUse) -> ToolUse:
            if count_tokens(tool_use.tool_output, self.parse_model) > self.tool_output_max_tokens:
                user_prompt = f"""
                Question: {question}
                Tool use:
                {tool_use.to_prompt()}
                """
                
                response = await self.parse_client.chat.completions.create(
                    model=self.parse_model,
                    messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}],
                    **self.model_params
                )
                
                content = response.choices[0].message.content
                if content:
                    tool_use.tool_output = content
                else:
                    tool_use.tool_output = truncate_text(tool_use.tool_output, self.parse_model, self.tool_output_max_tokens)
            
            return tool_use

        # 并行处理所有tool_use
        tasks = [process_tool_use(tool_use) for tool_use in tool_use_list]
        tool_use_list = await asyncio.gather(*tasks)
        return tool_use_list


    async def evaluate_reasoning(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[ReasoningEvaluateResult]]:
        system_prompt = """You are a professional evaluator for AI assistants in the crypto domain. You need to score the assistant's reasoning ability based on the given evaluation criteria and reasoning process. Please follow these steps during evaluation:
1. Review the reasoning steps and understand whether each step's logic is relevant to the task and helps solve the problem.
2. If there are no explicit reasoning steps, treat tool calls as an alternative form of reasoning steps and consider the reasoning process represented by the tool usage.
3. Assess the completeness and rigor of the reasoning chain, judging whether each step is reasonable and accurate, and whether there are logical flaws or missing steps.
4. Consider the information references and tool calls in the reasoning process, judge whether the information sources are sufficient, whether the tool usage is appropriate, and analyze the connections and dependencies between each step in the reasoning chain.
5. According to the evaluation criteria, give a score for each criterion, with the score ranging from 0 to the maximum points for that criterion.
"""
        reasoning_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.REASONING]
        if not reasoning_items:
            return 0.0, None
        prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated Reasoning Steps:
```
{"\n".join([step.to_prompt() for step in output_answer.reasoning_steps])}
```

In addition, the following function calls are also part of the reasoning steps. The choose of the tool use and the arguments should be taken into account:
```
{"\n".join([step.to_prompt(ignore_output=True) for step in output_answer.function_calls])}
```

Evaluation Rules:\n"""
        for item in reasoning_items:
            prompt += f"{item.to_prompt()}\n"
        prompt += f"Now evaluate the reasoning steps based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
        # print(prompt)
        max_retries = 3
        retry_count = 0
        while retry_count < max_retries:
            try:
                response = await self.client.chat.completions.create(
                    model=self.model_name,
                    messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
                    **self.model_params
                )
                content = response.choices[0].message.content
                result = await self.parse_str_to_format(content, ReasoningEvaluateResult)
                if not result:
                    retry_count += 1
                    continue
                if sum([item.score for item in result.items]) > sum([item.points for item in reasoning_items]):
                    retry_count += 1
                    continue
                return sum([item.score for item in result.items]), result
            except Exception as e:
                print(f"Error evaluating reasoning (attempt {retry_count + 1}/{max_retries}): {e}")
                retry_count += 1
                if retry_count == max_retries:
                    return 0.0, None
                await asyncio.sleep(1)  # 添加重试间隔
        return 0.0, None

    async def evaluate_tool_use(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[ToolUseEvaluateResult]]:
        system_prompt = """You are a professional crypto AI assistant evaluator. You need to score the assistant's tools using ability according to the given criterias and the tool use output. When evaluating, you should follow the following steps:
1. Take a brief look at the tool using, descriptions and input args, to make sure the tool using is correct/related to solving the task.
2. Evaluate each step of the tool use to estimate the efficiency and accuracy of the tool use.
3. Consider the continuity of tool calls: The return result of the previous tool call may affect the input arguments of the next tool call.
"""
        tool_use_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.TOOL_USE]
        if not tool_use_items:
            print(f"No tool use items for task {benchmark_item.task_id}")
            return 0.0, None
        prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated tool use:
```
{"\n".join([step.to_prompt() for step in output_answer.function_calls])}
```

Evaluation Rules:
"""
        for item in tool_use_items:
            prompt += f"{item.to_prompt()}\n"
        prompt += f"Now evaluate the tool use based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
        max_retries = 3
        retry_count = 0
        while retry_count < max_retries:
            try:
                response = await self.client.chat.completions.create(
                    model=self.model_name,
                    messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
                    **self.model_params
                )
                content = response.choices[0].message.content
                result = await self.parse_str_to_format(content, ToolUseEvaluateResult)
                if not result:
                    retry_count += 1
                    continue
                if sum([item.score for item in result.items]) > sum([item.points for item in tool_use_items]):
                    retry_count += 1
                    continue
                return sum([item.score for item in result.items]), result
            except Exception as e:
                print(f"Error evaluating tool use (attempt {retry_count + 1}/{max_retries}): {traceback.format_exc()}")
                retry_count += 1
                if retry_count == max_retries:
                    return 0.0, None
                await asyncio.sleep(1)  # 添加重试间隔
        return 0.0, None

    
    async def evaluate_answer(self, output_answer:Answer, benchmark_item:BenchmarkItem) -> tuple[float, Optional[AnswerEvaluateResult]]:
        system_prompt = """You are a professional evaluator for crypto AI assistant answers. You need to score the AI assistant's final answer according to the given evaluation criteria. Please follow these steps during evaluation:
1. Carefully read the task question and the AI assistant's final output, and determine whether the answer accurately and completely solves the task requirements and conforms to basic common sense.
2. Check whether the facts, data, and reasoning process in the answer are correct, and whether there are logical errors, numerical errors, or fabricated facts.
3. For specific numerical values, allow a certain range of error. If the criteria do not specify the error range, use a ±5% margin.
4. For each evaluation criterion, give a score for each item, with the score ranging from 0 to the full score for that criterion.
Please strictly follow the evaluation criteria to provide objective and fair scoring, and briefly explain your reasoning for the scores."""
        evaluate_items = [item for item in benchmark_item.evaluate.items if item.target == EvaluateTarget.ANSWER]
        if not evaluate_items:
            return 0.0, None

        prompt = f"""
Task ID: {benchmark_item.task_id}
Question: {benchmark_item.question}
To be evaluated output:
```
{output_answer.to_prompt()}
```

Evaluation Rules:
"""
        for item in evaluate_items:
            prompt += f"{item.to_prompt()}\n"
        prompt += f"Now evaluate the output answer based on the evaluation criteria, and give the score for each item in the range of 0 to the point the criteria worth."
        # print(prompt)
        max_retry = 3
        for _ in range(max_retry):
            try:
                response = await self.client.chat.completions.create(
                    model=self.model_name,
                    messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}],
                    **self.model_params
                )


                result = await self.parse_str_to_format(response.choices[0].message.content, AnswerEvaluateResult)
                if not result:
                    continue
                if result.score > sum([item.points for item in evaluate_items]):
                    continue
                return result.score, result
            except Exception as e:
                print(f"Error evaluating answer: {e}")
                continue
        return 0.0, None

    async def a_evaluate(self, task_id:str, answer:Answer, to_evaluate_item: BenchmarkItem) -> EvaluateScore | None:
        import asyncio
        tasks = [
            self.evaluate_answer(answer, to_evaluate_item),
            self.evaluate_reasoning(answer, to_evaluate_item),
            self.evaluate_tool_use(answer, to_evaluate_item),
        ]
        [(answer_score, answer_evaulate_result), (reasoning_score, reasoning_evaulate_result), (tool_use_score, tool_use_evaulate_result)] = await asyncio.gather(*tasks)
        
        analysis = await self.analyze_evaulate_result(answer_evaulate_result, reasoning_evaulate_result, tool_use_evaulate_result, to_evaluate_item)
        return analysis
    

    async def analyze_evaulate_result(self, 
                                      answer_evaulate_result:AnswerEvaluateResult, 
                                      reasoning_evaulate_result:ReasoningEvaluateResult, 
                                      tool_use_evaulate_result:ToolUseEvaluateResult, 
                                      to_evaluate_item:BenchmarkItem) -> EvaluateScore:
        """Analyze the evaulate result and give the analysis"""
        benchmark_answer_item = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.ANSWER][0]
        benchmark_reasoning_items = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.REASONING]
        benchmark_tool_use_items = [item for item in to_evaluate_item.evaluate.items if item.target == EvaluateTarget.TOOL_USE]
        detail = ""
        detail += f"Answer score: {answer_evaulate_result.score} / {benchmark_answer_item.points}\n"
        detail += f"Reason: {answer_evaulate_result.reason}\n"
        detail += f"Reasoning score: {sum([item.score for item in reasoning_evaulate_result.items])} / {sum([item.points for item in benchmark_reasoning_items])}\n"
        for item in reasoning_evaulate_result.items:
            detail += f"Reasoning step {item.step}: {item.reason} score: {item.score} / {benchmark_reasoning_items[item.step-1].points}\n"
        detail += f"Tool use score: {sum([item.score for item in tool_use_evaulate_result.items])} / {sum([item.points for item in benchmark_tool_use_items])}\n"
        for item in tool_use_evaulate_result.items:
            detail += f"{item.reason}\n"
        # print(detail)
        return EvaluateScore(
            model_name=self.model_name,
            answer_score=answer_evaulate_result.score,
            answer_total_score=benchmark_answer_item.points,
            reasoning_score=sum([item.score for item in reasoning_evaulate_result.items]),
            reasoning_total_score=sum([item.points for item in benchmark_reasoning_items]),
            tool_use_score=sum([item.score for item in tool_use_evaulate_result.items]),
            tool_use_total_score=sum([item.points for item in benchmark_tool_use_items]),
            total_score=answer_evaulate_result.score + sum([item.score for item in reasoning_evaulate_result.items]) + sum([item.score for item in tool_use_evaulate_result.items]),
            evaluate_detail=detail,
            task_id=to_evaluate_item.task_id,
            level=to_evaluate_item.level or 1,
            category=to_evaluate_item.category
        )
        

async def ensemble_evaluate(evaulator_list:list[Evaluator], answer:Answer, to_evaluate_item:BenchmarkItem) -> EnsembleEvaluateScore:
    # for evaluator in evaulator_list:
    #     await evaluator.load_validate_data()
    results:list[EvaluateScore|None] = await asyncio.gather(*[evaluator.a_evaluate(to_evaluate_item.task_id, answer, to_evaluate_item) for evaluator in evaulator_list])
    results = [item for item in results if item]
    return EnsembleEvaluateScore(
        task_id=to_evaluate_item.task_id,
        answer_total_score=mean([item.answer_total_score for item in results if item]),
        reasoning_total_score=mean([item.reasoning_total_score for item in results if item]),
        tool_use_total_score=mean([item.tool_use_total_score for item in results if item]),
        total_score=mean([item.total_score for item in results if item]),
        evaluate_detail="no detail",
        answer_score=mean([item.answer_score for item in results if item]),
        reasoning_score=mean([item.reasoning_score for item in results if item]),
        tool_use_score=mean([item.tool_use_score for item in results if item]),
        level=to_evaluate_item.level or 1,
        category=to_evaluate_item.category,
        model_name="ensemble result"
    )