File size: 4,189 Bytes
5374a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Acknowledgement: Modified from AFlow (https://github.com/geekan/MetaGPT/blob/main/metagpt/ext/aflow/scripts/evaluator.py) under MIT License

import asyncio
from tqdm.asyncio import tqdm_asyncio
from typing import Tuple, Optional, Callable
from ..benchmark.benchmark import Benchmark
from ..models.base_model import BaseLLM
from ..core.logging import logger
from ..models.model_utils import cost_manager


# If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above

class AFlowEvaluator:

    """
    AFlow-specific evaluator for workflow performance assessment. 
    This evaluator measures the performance of AFlow workflow graphs against benchmarks.
    
    Attributes:
        llm: The language model to use for evaluation, if needed by the graph
    """

    def __init__( self, llm: Optional[BaseLLM] = None):
        self.llm = llm 
    
    def _configure_graph(self, graph, benchmark):
        return graph(name=benchmark.name, llm_config=self.llm.config, benchmark=benchmark)
    
    async def graph_evaluate_async(self, benchmark: Benchmark, graph: Callable, is_test: bool = False, max_concurrent_tasks: int = 20) -> Tuple[float, float, float]:
        """Asynchronously evaluate a workflow graph against a benchmark.
        
        Configures the graph with benchmark settings, processes all examples in the
        dataset concurrently (up to max_concurrent_tasks), and calculates
        performance metrics including average score, cost per example, and total cost.
        
        Args:
            benchmark: The benchmark to evaluate against
            graph: The workflow graph to evaluate
            is_test: Whether to use test data (True) or validation data (False)
            max_concurrent_tasks: Maximum number of concurrent evaluation tasks
            
        Returns:
            A tuple containing:
              - average_metrics: Mean performance score across all examples
              - avg_cost: Average cost per example
              - total_cost: Total cost for all examples
              - all_failed: Boolean indicating if all evaluations failed
        """
        
        configured_graph = self._configure_graph(graph=graph, benchmark=benchmark)

        # Get evaluation data
        data = benchmark.get_test_data() if is_test else benchmark.get_dev_data()
        if not data:
            logger.warning("No data to evaluate. Returning zeros.")
            return (0.0, 0.0, 0.0, True)
        
        # get total cost before evaluation
        cost_before = cost_manager.get_total_cost()
        
        # Create a shared semaphore
        semaphore = asyncio.Semaphore(max_concurrent_tasks)
        
        async def evaluate_with_semaphore(example):
            async with semaphore:
                try:
                    return await benchmark.async_evaluate(configured_graph, example)
                except Exception as e:
                    logger.warning(f"Evaluation failed: {str(e)}")
                    return None
#         async def evaluate_with_semaphore(example):
#             async with semaphore:
#                 return await benchmark.async_evaluate(configured_graph, example)
        # Create tasks for concurrent execution with semaphore
        tasks = [evaluate_with_semaphore(example) for example in data]

        # Wait for all tasks to complete
        results = await tqdm_asyncio.gather(
            *tasks,
            desc=f"Evaluating {benchmark.name} problems",
            total=len(data)
        )

        # Replace failed evaluations (None results) with 0
        valid_results = [0.0 if r is None else r for r in results]
        all_failed = all(r is None for r in results)

        # get total cost after evaluation
        total_cost = cost_manager.get_total_cost() - cost_before
        avg_cost = total_cost / len(data)

        if not valid_results:
            logger.warning("No valid results. Returning zeros.")
            avg_metrics = 0.0
        else:
            avg_metrics = sum(valid_results) / len(valid_results)
        
        return avg_metrics, avg_cost, total_cost, all_failed