|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import shutil |
|
|
import asyncio |
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
from typing import List, Any |
|
|
from pydantic import Field |
|
|
|
|
|
from ..core.logging import logger |
|
|
from ..core.module import BaseModule |
|
|
from ..models.base_model import BaseLLM, LLMOutputParser |
|
|
from ..benchmark.benchmark import Benchmark |
|
|
from ..utils.aflow_utils.data_utils import DataUtils |
|
|
from ..utils.aflow_utils.experience_utils import ExperienceUtils |
|
|
from ..utils.aflow_utils.evaluation_utils import EvaluationUtils |
|
|
from ..utils.aflow_utils.graph_utils import GraphUtils, OPERATOR_MAP |
|
|
from ..utils.aflow_utils.convergence_utils import ConvergenceUtils |
|
|
|
|
|
|
|
|
class GraphOptimizeOutput(LLMOutputParser): |
|
|
|
|
|
modification: str = Field(default="", description="modification") |
|
|
graph: str = Field(default="", description="graph") |
|
|
prompt: str = Field(default="", description="prompt") |
|
|
|
|
|
|
|
|
class AFlowOptimizer(BaseModule): |
|
|
|
|
|
""" |
|
|
AFlow Optimizer for workflow optimization. |
|
|
|
|
|
This optimizer iteratively improves workflows through multiple rounds of optimization |
|
|
using large language models. It evaluates workflow performance, identifies improvement |
|
|
opportunities, and applies optimizations based on experience and convergence metrics. |
|
|
|
|
|
Attributes: |
|
|
question_type: Type of task to optimize for (e.g., qa, match, code) |
|
|
graph_path: Path to the workflow graph directory (must contain graph.py and prompt.py) |
|
|
optimized_path: Path to save optimized workflows (defaults to graph_path) |
|
|
initial_round: Starting round number for optimization |
|
|
optimizer_llm: LLM used for generating optimizations |
|
|
executor_llm: LLM used for executing the workflow |
|
|
operators: List of operators available for optimization |
|
|
sample: Number of rounds to sample from for optimization |
|
|
max_rounds: Maximum number of optimization rounds to perform |
|
|
validation_rounds: Number of validation runs per optimization round |
|
|
eval_rounds: Number of evaluation runs for test mode |
|
|
check_convergence: Whether to check for optimization convergence |
|
|
""" |
|
|
question_type: str = Field(description="The type of question to optimize the workflow for, e.g., qa, match, code, etc.") |
|
|
graph_path: str = Field(description="The folder of the workflow graph. This folder must contain a `graph.py` file that defines the workflow structure, and a `prompt.py` file that defines the prompt for the workflow.") |
|
|
optimized_path: str = Field(default=None, description="The path to save the optimized workflow. If not provided, the optimized path will be the same as the graph path.") |
|
|
initial_round: int = Field(default=0, description="The round number to start or continue optimization from. If not provided, will start from round 0 using the `graph.py` file in `graph_path`.") |
|
|
optimizer_llm: BaseLLM = Field(default=None, description="The LLM to use for optimization.") |
|
|
executor_llm: BaseLLM = Field(default=None, description="The LLM to use for execution.") |
|
|
|
|
|
operators: List[str] = Field(default_factory=lambda: list(OPERATOR_MAP.keys()), description="The operators to use for optimization. If not provided, will use all operators in OPERATOR_MAP.") |
|
|
sample: int = Field(default=4, description="The number of rounds to sample from the top scores.") |
|
|
max_rounds: int = Field(default=20, description="The maximum number of rounds to optimize the workflow.") |
|
|
validation_rounds: int = Field(default=5, description="Run the workflow for `validation_rounds` times to evaluate the performance on the validation set.") |
|
|
eval_rounds: int = Field(default=3, description="Run the workflow for `eval_rounds` times to evaluate the performance on the test set.") |
|
|
check_convergence: bool = Field(default=True, description="Whether to check for convergence.") |
|
|
|
|
|
def init_module(self, **kwargs): |
|
|
|
|
|
self.root_path = self.optimized_path or self.graph_path |
|
|
os.makedirs(self.root_path, exist_ok=True) |
|
|
|
|
|
|
|
|
self.graph_utils = GraphUtils(self.root_path) |
|
|
self.data_utils = DataUtils(self.root_path) |
|
|
self.evaluation_utils = EvaluationUtils(self.root_path) |
|
|
self.experience_utils = ExperienceUtils(self.root_path) |
|
|
self.convergence_utils = ConvergenceUtils(self.root_path) |
|
|
|
|
|
self.graph = None |
|
|
self.round = self.initial_round |
|
|
if self.round == 0: |
|
|
round_zero_path = os.path.join(self.root_path, f"round_{self.round}") |
|
|
os.makedirs(round_zero_path, exist_ok=True) |
|
|
shutil.copy2(os.path.join(self.graph_path, "graph.py"), os.path.join(round_zero_path, "graph.py")) |
|
|
shutil.copy2(os.path.join(self.graph_path, "prompt.py"), os.path.join(round_zero_path, "prompt.py")) |
|
|
self.graph_utils.update_prompt_import(os.path.join(round_zero_path, "graph.py"), round_zero_path) |
|
|
|
|
|
if not os.path.exists(os.path.join(self.root_path, f"round_{self.round}")): |
|
|
raise ValueError(f"Round {self.round} does not exist in {self.root_path}") |
|
|
|
|
|
if self.optimizer_llm is None: |
|
|
raise ValueError("optimizer_llm is not provided") |
|
|
if self.executor_llm is None: |
|
|
self.executor_llm = self.optimizer_llm |
|
|
|
|
|
def optimize(self, benchmark: Benchmark): |
|
|
"""Run the optimization process on the workflow. |
|
|
|
|
|
Performs multiple rounds of optimization, evaluating each round against |
|
|
the benchmark and checking for convergence. Continues until convergence |
|
|
is detected or the maximum number of rounds is reached. |
|
|
|
|
|
Args: |
|
|
benchmark: The benchmark to evaluate the workflow against |
|
|
""" |
|
|
self.benchmark = benchmark |
|
|
for _ in range(self.max_rounds): |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
score = loop.run_until_complete(self._execute_with_retry(self._optimize_graph)) |
|
|
self.round += 1 |
|
|
logger.info(f"Score for round {self.round}: {score}") |
|
|
if self._check_convergence(): |
|
|
break |
|
|
if self.round >= self.max_rounds: |
|
|
logger.info(f"Max rounds reached: {self.max_rounds}, stopping optimization.") |
|
|
break |
|
|
|
|
|
def test(self, benchmark: Benchmark, test_rounds: List[int] = None): |
|
|
"""Run the test evaluation on optimized workflows. |
|
|
|
|
|
Evaluates specified rounds (or the best round if none specified) against |
|
|
the benchmark multiple times and logs the results. |
|
|
|
|
|
Args: |
|
|
benchmark: The benchmark to evaluate against |
|
|
test_rounds: Specific round numbers to test, or None to use the best round |
|
|
""" |
|
|
self.benchmark = benchmark |
|
|
if test_rounds is None: |
|
|
best_round = self._load_best_round() |
|
|
logger.info(f"No test rounds provided, using best round: {best_round}") |
|
|
test_rounds = [best_round] |
|
|
for _ in tqdm(range(self.eval_rounds)): |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
loop.run_until_complete(self._run_test(test_rounds)) |
|
|
|
|
|
async def _execute_with_retry(self, func: callable, max_retries: int = 3) -> Any: |
|
|
|
|
|
retry_count = 0 |
|
|
while retry_count < max_retries: |
|
|
try: |
|
|
return await func() |
|
|
except Exception as e: |
|
|
retry_count += 1 |
|
|
logger.info(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})") |
|
|
if retry_count == max_retries: |
|
|
logger.info("Max retries reached.") |
|
|
return None |
|
|
await asyncio.sleep(5 * retry_count) |
|
|
|
|
|
return None |
|
|
|
|
|
def _check_convergence(self) -> bool: |
|
|
if not self.check_convergence: |
|
|
return False |
|
|
|
|
|
converged, convergence_round, final_round = self.convergence_utils.check_convergence(top_k=3) |
|
|
if converged: |
|
|
logger.info( |
|
|
f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}" |
|
|
) |
|
|
self.convergence_utils.print_results() |
|
|
return True |
|
|
return False |
|
|
|
|
|
async def _optimize_graph(self) -> float: |
|
|
"""Optimize the graph for one round""" |
|
|
validation_n = self.validation_rounds |
|
|
graph_path = self.root_path |
|
|
data = self.data_utils.load_results(graph_path) |
|
|
|
|
|
if self.round == 0: |
|
|
self.avg_score = await self._handle_initial_round(graph_path, validation_n, data) |
|
|
|
|
|
return await self._handle_optimization_round(graph_path, validation_n, data) |
|
|
|
|
|
async def _handle_initial_round(self, graph_path: str, validation_n: int, data: list) -> float: |
|
|
"""Handle the initial round of optimization""" |
|
|
self.graph_utils.create_round_directory(graph_path, self.round) |
|
|
self.graph = self.graph_utils.load_graph(self.round, graph_path) |
|
|
return await self.evaluation_utils.evaluate_graph_async(self, validation_n, data, initial=True) |
|
|
|
|
|
async def _handle_optimization_round(self, graph_path: str, validation_n: int, data: list) -> float: |
|
|
|
|
|
directory = self.graph_utils.create_round_directory(graph_path, self.round + 1) |
|
|
|
|
|
while True: |
|
|
sample = self._get_optimization_sample() |
|
|
prompt, graph_load = self.graph_utils.read_graph_files(sample["round"], graph_path) |
|
|
graph = self.graph_utils.extract_solve_graph(graph_load) |
|
|
processed_experience = self.experience_utils.load_experience() |
|
|
experience = self.experience_utils.format_experience(processed_experience, sample["round"]) |
|
|
operator_description = self.graph_utils.load_operators_description(self.operators, self.optimizer_llm) |
|
|
log_data = self.data_utils.load_log(sample["round"]) |
|
|
graph_optimize_prompt = self.graph_utils.create_graph_optimize_prompt( |
|
|
experience, sample["score"], graph[0], prompt, operator_description, self.question_type, log_data |
|
|
) |
|
|
|
|
|
|
|
|
response = await self.optimizer_llm.async_generate(prompt=graph_optimize_prompt, parse_mode="str") |
|
|
print(response.content) |
|
|
try: |
|
|
parsed_response = GraphOptimizeOutput.parse(response.content, parse_mode="xml") |
|
|
response = parsed_response.get_structured_data() |
|
|
except Exception: |
|
|
response = self._parse_optimizer_llm_output(response.content, orig_graph=graph[0], orig_prompt=prompt) |
|
|
|
|
|
if self.experience_utils.check_modification(processed_experience, response['modification'], sample["round"]): |
|
|
break |
|
|
|
|
|
|
|
|
avg_score = await self._evaluate_and_save_optimization_results(directory, response, sample, data, validation_n) |
|
|
return avg_score |
|
|
|
|
|
def _get_optimization_sample(self) -> dict: |
|
|
|
|
|
top_rounds = self.data_utils.get_top_rounds(self.sample) |
|
|
return self.data_utils.select_round(top_rounds) |
|
|
|
|
|
def _parse_optimizer_llm_output(self, content: str, orig_graph: str, orig_prompt: str) -> dict: |
|
|
|
|
|
response = {"modification": "", "graph": "", "prompt": ""} |
|
|
|
|
|
|
|
|
modification_pattern = r'<modification>(.*?)</modification>' |
|
|
modification_match = re.search(modification_pattern, content, re.DOTALL) |
|
|
if modification_match: |
|
|
response["modification"] = modification_match.group(1).strip() |
|
|
|
|
|
|
|
|
code_block_pattern = r'```(?:python)?(.*?)```' |
|
|
code_blocks = re.finditer(code_block_pattern, content, re.DOTALL) |
|
|
|
|
|
|
|
|
for block in code_blocks: |
|
|
code = block.group(1).strip() |
|
|
|
|
|
if 'class' in code or 'workflow' in code.lower(): |
|
|
response["graph"] = code |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
response["prompt"] = code |
|
|
|
|
|
if not response["graph"] and not response["prompt"]: |
|
|
response["modification"] = "No modification due to error in LLM output" |
|
|
response["graph"] = orig_graph |
|
|
response["prompt"] = orig_prompt |
|
|
|
|
|
return response |
|
|
|
|
|
async def _evaluate_and_save_optimization_results(self, directory: str, response: dict, sample: dict, data: list, validation_n: int): |
|
|
|
|
|
|
|
|
self.graph_utils.write_graph_files(directory, response) |
|
|
|
|
|
experience = self.experience_utils.create_experience_data(sample, response['modification']) |
|
|
|
|
|
self.graph = self.graph_utils.load_graph(self.round + 1, self.root_path) |
|
|
|
|
|
|
|
|
avg_score = await self.evaluation_utils.evaluate_graph_async(self, validation_n, data, initial=False) |
|
|
self.experience_utils.update_experience(directory, experience, avg_score) |
|
|
|
|
|
return avg_score |
|
|
|
|
|
def _load_best_round(self) -> int: |
|
|
"""Load the best round""" |
|
|
ranked_scores = self.data_utils._load_scores() |
|
|
return ranked_scores[0]["round"] |
|
|
|
|
|
async def _run_test(self, test_rounds: List[int]): |
|
|
"""Run test evaluation""" |
|
|
|
|
|
logger.info("Running test evaluation...") |
|
|
|
|
|
graph_path = self.root_path |
|
|
data = self.data_utils.load_results(graph_path) |
|
|
json_file_path = self.data_utils.get_results_file_path(graph_path) |
|
|
scores = [] |
|
|
|
|
|
|
|
|
for round in test_rounds: |
|
|
|
|
|
logger.info(f"Running test for round {round}...") |
|
|
|
|
|
self.graph = self.graph_utils.load_graph(round, graph_path) |
|
|
|
|
|
score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test_async(self) |
|
|
scores.append(score) |
|
|
|
|
|
new_data = self.data_utils.create_result_data(round, score, avg_cost, total_cost) |
|
|
data.append(new_data) |
|
|
|
|
|
logger.info(f"Test round {round} score: {score}, avg_cost: {avg_cost}, total_cost: {total_cost}") |
|
|
|
|
|
self.data_utils.save_results(json_file_path, data) |
|
|
|
|
|
logger.info(f"Test round {round} avg_score: {np.mean(scores)}") |
|
|
return np.mean(scores) |