selfevolveagent / evoagentx /evaluators /aflow_evaluator.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
# Acknowledgement: Modified from AFlow (https://github.com/geekan/MetaGPT/blob/main/metagpt/ext/aflow/scripts/evaluator.py) under MIT License
import asyncio
from tqdm.asyncio import tqdm_asyncio
from typing import Tuple, Optional, Callable
from ..benchmark.benchmark import Benchmark
from ..models.base_model import BaseLLM
from ..core.logging import logger
from ..models.model_utils import cost_manager
# If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above
class AFlowEvaluator:
"""
AFlow-specific evaluator for workflow performance assessment.
This evaluator measures the performance of AFlow workflow graphs against benchmarks.
Attributes:
llm: The language model to use for evaluation, if needed by the graph
"""
def __init__( self, llm: Optional[BaseLLM] = None):
self.llm = llm
def _configure_graph(self, graph, benchmark):
return graph(name=benchmark.name, llm_config=self.llm.config, benchmark=benchmark)
async def graph_evaluate_async(self, benchmark: Benchmark, graph: Callable, is_test: bool = False, max_concurrent_tasks: int = 20) -> Tuple[float, float, float]:
"""Asynchronously evaluate a workflow graph against a benchmark.
Configures the graph with benchmark settings, processes all examples in the
dataset concurrently (up to max_concurrent_tasks), and calculates
performance metrics including average score, cost per example, and total cost.
Args:
benchmark: The benchmark to evaluate against
graph: The workflow graph to evaluate
is_test: Whether to use test data (True) or validation data (False)
max_concurrent_tasks: Maximum number of concurrent evaluation tasks
Returns:
A tuple containing:
- average_metrics: Mean performance score across all examples
- avg_cost: Average cost per example
- total_cost: Total cost for all examples
- all_failed: Boolean indicating if all evaluations failed
"""
configured_graph = self._configure_graph(graph=graph, benchmark=benchmark)
# Get evaluation data
data = benchmark.get_test_data() if is_test else benchmark.get_dev_data()
if not data:
logger.warning("No data to evaluate. Returning zeros.")
return (0.0, 0.0, 0.0, True)
# get total cost before evaluation
cost_before = cost_manager.get_total_cost()
# Create a shared semaphore
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def evaluate_with_semaphore(example):
async with semaphore:
try:
return await benchmark.async_evaluate(configured_graph, example)
except Exception as e:
logger.warning(f"Evaluation failed: {str(e)}")
return None
# async def evaluate_with_semaphore(example):
# async with semaphore:
# return await benchmark.async_evaluate(configured_graph, example)
# Create tasks for concurrent execution with semaphore
tasks = [evaluate_with_semaphore(example) for example in data]
# Wait for all tasks to complete
results = await tqdm_asyncio.gather(
*tasks,
desc=f"Evaluating {benchmark.name} problems",
total=len(data)
)
# Replace failed evaluations (None results) with 0
valid_results = [0.0 if r is None else r for r in results]
all_failed = all(r is None for r in results)
# get total cost after evaluation
total_cost = cost_manager.get_total_cost() - cost_before
avg_cost = total_cost / len(data)
if not valid_results:
logger.warning("No valid results. Returning zeros.")
avg_metrics = 0.0
else:
avg_metrics = sum(valid_results) / len(valid_results)
return avg_metrics, avg_cost, total_cost, all_failed