CAIA-evaluate / score.py
Zhejian
init
f3e6f32
import asyncio
from concurrent.futures import ThreadPoolExecutor
import json
from typing import List
from evaluator import Evaluator, ensemble_evaluate
from schemas import AgentOutputItem, Answer, BenchmarkItem, EvaluateScore, EnsembleEvaluateScore
def init_evaluators(dataset:List[BenchmarkItem], llm_configs:dict) -> list[Evaluator]:
parse_llm_config = llm_configs["parse_llm_config"]
evaluate_llm_configs = llm_configs["evaluate_llm_configs"]
evaluator_list: list[Evaluator] = []
for evaluate_llm_config in evaluate_llm_configs:
for _ in range(3):
evaluator = Evaluator(
dataset=dataset,
parse_model=parse_llm_config["model_name"],
parse_model_api_key=parse_llm_config.get("api_key", None),
parse_model_base_url=parse_llm_config.get("base_url", None),
api_key=evaluate_llm_config.get("api_key", None),
model_name=evaluate_llm_config["model_name"],
base_url=evaluate_llm_config.get("base_url", None),
**evaluate_llm_config.get("model_params",{})
)
evaluator_list.append(evaluator)
return evaluator_list
def load_agent_output_dataset(dataset_path:str = "dataset/example_agent_output.json") -> list[AgentOutputItem]:
with open(dataset_path, "r") as f:
agent_output_dataset = json.load(f)
return [AgentOutputItem(**item) for item in agent_output_dataset]
async def run_evaluate(evaluator_list:list[Evaluator], agent_output_item:AgentOutputItem, to_evaluate_item:BenchmarkItem):
answer = Answer(
answer=agent_output_item.answer,
reasoning_steps=agent_output_item.reasoning_list,
function_calls=agent_output_item.tool_use_list
)
return await ensemble_evaluate(evaluator_list, answer, to_evaluate_item)
async def score_item(evaluator_list:list[Evaluator], agent_output_item:AgentOutputItem, to_evaluate_item:BenchmarkItem) -> EnsembleEvaluateScore:
answer = Answer(
answer=agent_output_item.answer,
reasoning_steps=agent_output_item.reasoning_list,
function_calls=agent_output_item.tool_use_list
)
return await ensemble_evaluate(evaluator_list, answer, to_evaluate_item)
async def score_in_threadpool(evaluator_list:list[Evaluator], agent_output_list:list[AgentOutputItem], benchmark_data:list[BenchmarkItem]) -> list[EnsembleEvaluateScore]:
with ThreadPoolExecutor(max_workers=max(1, min(5, len(agent_output_list)))) as executor:
futures = []
for agent_output_item in agent_output_list:
task_id = agent_output_item.task_id
to_evaluate_item = next((item for item in benchmark_data if item.task_id == task_id), None)
if to_evaluate_item:
future = executor.submit(
asyncio.run,
score_item(
evaluator_list=evaluator_list,
agent_output_item=agent_output_item,
to_evaluate_item=to_evaluate_item
)
)
futures.append(future)
return [future.result() for future in futures]