import asyncio from concurrent.futures import ThreadPoolExecutor import json from typing import List from evaluator import Evaluator, ensemble_evaluate from schemas import AgentOutputItem, Answer, BenchmarkItem, EvaluateScore, EnsembleEvaluateScore def init_evaluators(dataset:List[BenchmarkItem], llm_configs:dict) -> list[Evaluator]: parse_llm_config = llm_configs["parse_llm_config"] evaluate_llm_configs = llm_configs["evaluate_llm_configs"] evaluator_list: list[Evaluator] = [] for evaluate_llm_config in evaluate_llm_configs: for _ in range(3): evaluator = Evaluator( dataset=dataset, parse_model=parse_llm_config["model_name"], parse_model_api_key=parse_llm_config.get("api_key", None), parse_model_base_url=parse_llm_config.get("base_url", None), api_key=evaluate_llm_config.get("api_key", None), model_name=evaluate_llm_config["model_name"], base_url=evaluate_llm_config.get("base_url", None), **evaluate_llm_config.get("model_params",{}) ) evaluator_list.append(evaluator) return evaluator_list def load_agent_output_dataset(dataset_path:str = "dataset/example_agent_output.json") -> list[AgentOutputItem]: with open(dataset_path, "r") as f: agent_output_dataset = json.load(f) return [AgentOutputItem(**item) for item in agent_output_dataset] async def run_evaluate(evaluator_list:list[Evaluator], agent_output_item:AgentOutputItem, to_evaluate_item:BenchmarkItem): answer = Answer( answer=agent_output_item.answer, reasoning_steps=agent_output_item.reasoning_list, function_calls=agent_output_item.tool_use_list ) return await ensemble_evaluate(evaluator_list, answer, to_evaluate_item) async def score_item(evaluator_list:list[Evaluator], agent_output_item:AgentOutputItem, to_evaluate_item:BenchmarkItem) -> EnsembleEvaluateScore: answer = Answer( answer=agent_output_item.answer, reasoning_steps=agent_output_item.reasoning_list, function_calls=agent_output_item.tool_use_list ) return await ensemble_evaluate(evaluator_list, answer, to_evaluate_item) async def score_in_threadpool(evaluator_list:list[Evaluator], agent_output_list:list[AgentOutputItem], benchmark_data:list[BenchmarkItem]) -> list[EnsembleEvaluateScore]: with ThreadPoolExecutor(max_workers=max(1, min(5, len(agent_output_list)))) as executor: futures = [] for agent_output_item in agent_output_list: task_id = agent_output_item.task_id to_evaluate_item = next((item for item in benchmark_data if item.task_id == task_id), None) if to_evaluate_item: future = executor.submit( asyncio.run, score_item( evaluator_list=evaluator_list, agent_output_item=agent_output_item, to_evaluate_item=to_evaluate_item ) ) futures.append(future) return [future.result() for future in futures]