Spaces:
Runtime error
Runtime error
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import json | |
| from typing import List | |
| from evaluator import Evaluator, ensemble_evaluate | |
| from schemas import AgentOutputItem, Answer, BenchmarkItem, EvaluateScore, EnsembleEvaluateScore | |
| def init_evaluators(dataset:List[BenchmarkItem], llm_configs:dict) -> list[Evaluator]: | |
| parse_llm_config = llm_configs["parse_llm_config"] | |
| evaluate_llm_configs = llm_configs["evaluate_llm_configs"] | |
| evaluator_list: list[Evaluator] = [] | |
| for evaluate_llm_config in evaluate_llm_configs: | |
| for _ in range(3): | |
| evaluator = Evaluator( | |
| dataset=dataset, | |
| parse_model=parse_llm_config["model_name"], | |
| parse_model_api_key=parse_llm_config.get("api_key", None), | |
| parse_model_base_url=parse_llm_config.get("base_url", None), | |
| api_key=evaluate_llm_config.get("api_key", None), | |
| model_name=evaluate_llm_config["model_name"], | |
| base_url=evaluate_llm_config.get("base_url", None), | |
| **evaluate_llm_config.get("model_params",{}) | |
| ) | |
| evaluator_list.append(evaluator) | |
| return evaluator_list | |
| def load_agent_output_dataset(dataset_path:str = "dataset/example_agent_output.json") -> list[AgentOutputItem]: | |
| with open(dataset_path, "r") as f: | |
| agent_output_dataset = json.load(f) | |
| return [AgentOutputItem(**item) for item in agent_output_dataset] | |
| async def run_evaluate(evaluator_list:list[Evaluator], agent_output_item:AgentOutputItem, to_evaluate_item:BenchmarkItem): | |
| answer = Answer( | |
| answer=agent_output_item.answer, | |
| reasoning_steps=agent_output_item.reasoning_list, | |
| function_calls=agent_output_item.tool_use_list | |
| ) | |
| return await ensemble_evaluate(evaluator_list, answer, to_evaluate_item) | |
| async def score_item(evaluator_list:list[Evaluator], agent_output_item:AgentOutputItem, to_evaluate_item:BenchmarkItem) -> EnsembleEvaluateScore: | |
| answer = Answer( | |
| answer=agent_output_item.answer, | |
| reasoning_steps=agent_output_item.reasoning_list, | |
| function_calls=agent_output_item.tool_use_list | |
| ) | |
| return await ensemble_evaluate(evaluator_list, answer, to_evaluate_item) | |
| async def score_in_threadpool(evaluator_list:list[Evaluator], agent_output_list:list[AgentOutputItem], benchmark_data:list[BenchmarkItem]) -> list[EnsembleEvaluateScore]: | |
| with ThreadPoolExecutor(max_workers=max(1, min(5, len(agent_output_list)))) as executor: | |
| futures = [] | |
| for agent_output_item in agent_output_list: | |
| task_id = agent_output_item.task_id | |
| to_evaluate_item = next((item for item in benchmark_data if item.task_id == task_id), None) | |
| if to_evaluate_item: | |
| future = executor.submit( | |
| asyncio.run, | |
| score_item( | |
| evaluator_list=evaluator_list, | |
| agent_output_item=agent_output_item, | |
| to_evaluate_item=to_evaluate_item | |
| ) | |
| ) | |
| futures.append(future) | |
| return [future.result() for future in futures] |