|
|
|
|
|
|
|
|
import asyncio |
|
|
from tqdm.asyncio import tqdm_asyncio |
|
|
from typing import Tuple, Optional, Callable |
|
|
from ..benchmark.benchmark import Benchmark |
|
|
from ..models.base_model import BaseLLM |
|
|
from ..core.logging import logger |
|
|
from ..models.model_utils import cost_manager |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AFlowEvaluator: |
|
|
|
|
|
""" |
|
|
AFlow-specific evaluator for workflow performance assessment. |
|
|
This evaluator measures the performance of AFlow workflow graphs against benchmarks. |
|
|
|
|
|
Attributes: |
|
|
llm: The language model to use for evaluation, if needed by the graph |
|
|
""" |
|
|
|
|
|
def __init__( self, llm: Optional[BaseLLM] = None): |
|
|
self.llm = llm |
|
|
|
|
|
def _configure_graph(self, graph, benchmark): |
|
|
return graph(name=benchmark.name, llm_config=self.llm.config, benchmark=benchmark) |
|
|
|
|
|
async def graph_evaluate_async(self, benchmark: Benchmark, graph: Callable, is_test: bool = False, max_concurrent_tasks: int = 20) -> Tuple[float, float, float]: |
|
|
"""Asynchronously evaluate a workflow graph against a benchmark. |
|
|
|
|
|
Configures the graph with benchmark settings, processes all examples in the |
|
|
dataset concurrently (up to max_concurrent_tasks), and calculates |
|
|
performance metrics including average score, cost per example, and total cost. |
|
|
|
|
|
Args: |
|
|
benchmark: The benchmark to evaluate against |
|
|
graph: The workflow graph to evaluate |
|
|
is_test: Whether to use test data (True) or validation data (False) |
|
|
max_concurrent_tasks: Maximum number of concurrent evaluation tasks |
|
|
|
|
|
Returns: |
|
|
A tuple containing: |
|
|
- average_metrics: Mean performance score across all examples |
|
|
- avg_cost: Average cost per example |
|
|
- total_cost: Total cost for all examples |
|
|
- all_failed: Boolean indicating if all evaluations failed |
|
|
""" |
|
|
|
|
|
configured_graph = self._configure_graph(graph=graph, benchmark=benchmark) |
|
|
|
|
|
|
|
|
data = benchmark.get_test_data() if is_test else benchmark.get_dev_data() |
|
|
if not data: |
|
|
logger.warning("No data to evaluate. Returning zeros.") |
|
|
return (0.0, 0.0, 0.0, True) |
|
|
|
|
|
|
|
|
cost_before = cost_manager.get_total_cost() |
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(max_concurrent_tasks) |
|
|
|
|
|
async def evaluate_with_semaphore(example): |
|
|
async with semaphore: |
|
|
try: |
|
|
return await benchmark.async_evaluate(configured_graph, example) |
|
|
except Exception as e: |
|
|
logger.warning(f"Evaluation failed: {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tasks = [evaluate_with_semaphore(example) for example in data] |
|
|
|
|
|
|
|
|
results = await tqdm_asyncio.gather( |
|
|
*tasks, |
|
|
desc=f"Evaluating {benchmark.name} problems", |
|
|
total=len(data) |
|
|
) |
|
|
|
|
|
|
|
|
valid_results = [0.0 if r is None else r for r in results] |
|
|
all_failed = all(r is None for r in results) |
|
|
|
|
|
|
|
|
total_cost = cost_manager.get_total_cost() - cost_before |
|
|
avg_cost = total_cost / len(data) |
|
|
|
|
|
if not valid_results: |
|
|
logger.warning("No valid results. Returning zeros.") |
|
|
avg_metrics = 0.0 |
|
|
else: |
|
|
avg_metrics = sum(valid_results) / len(valid_results) |
|
|
|
|
|
return avg_metrics, avg_cost, total_cost, all_failed |
|
|
|
|
|
|
|
|
|
|
|
|