|
|
from copy import deepcopy |
|
|
from typing import Dict, Any |
|
|
|
|
|
from aiflows.base_flows import SequentialFlow |
|
|
from aiflows.utils import logging |
|
|
from abc import ABC |
|
|
|
|
|
|
|
|
logging.set_verbosity_debug() |
|
|
log = logging.get_logger(__name__) |
|
|
|
|
|
class AbstractBossFlow(SequentialFlow, ABC): |
|
|
"""This class is an abstraction of memory-planner-controller-executor flow. At a higher level, it is |
|
|
an abstract agent empowered by multiple language models and subsequent tools like code interpreters, etc. |
|
|
It is designed to cooperate with memory management mechanisms, lm-powered planner and controller, and |
|
|
arbitrary executors. |
|
|
|
|
|
*Configuration Parameters* |
|
|
|
|
|
- `name` (str): Name of the flow. |
|
|
- `description` (str): Description of the flow. |
|
|
- `memory_files` (dict): A dictionary of memory files. The keys are the names of the memory files and the values |
|
|
are the path to the memory files. Typical memory files include plan, logs, code library. |
|
|
- `subflows_config`: |
|
|
- MemoryReading: reads the content of the memory files into the flow states for later use. |
|
|
- Planner: make a step-by-step plan based on the current goal. |
|
|
- CtrlExMem: controller-executor agent with memory reading and memory writing, it will execute the plan generated by the planner. |
|
|
- `early_exit_key` (str): The key in the flow state that indicates the early exit condition. |
|
|
- `topology` (list) : The topology of the flow. |
|
|
|
|
|
*Input Interface (expected input)* |
|
|
|
|
|
- `goal` (str): The goal from the caller (source flow) |
|
|
|
|
|
*Output Interface (expected output)* |
|
|
|
|
|
- `result` (str): The result of the flow, the result will be returned to the caller. |
|
|
- `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow. |
|
|
|
|
|
:param memory_files: A dictionary of memory files. The keys are the names of the memory files and the values are the path to the memory files. |
|
|
:type memory_files: dict |
|
|
""" |
|
|
REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
memory_files: Dict[str, Any], |
|
|
**kwargs |
|
|
): |
|
|
super().__init__(**kwargs) |
|
|
self.memory_files = memory_files |
|
|
|
|
|
@classmethod |
|
|
def instantiate_from_config(cls, config): |
|
|
"""This method instantiates the flow from a configuration dictionary. |
|
|
|
|
|
:param config: The configuration dictionary. |
|
|
:type config: dict |
|
|
""" |
|
|
flow_config = deepcopy(config) |
|
|
|
|
|
kwargs = {"flow_config": flow_config} |
|
|
|
|
|
|
|
|
memory_files = flow_config["memory_files"] |
|
|
kwargs.update({"memory_files": memory_files}) |
|
|
|
|
|
|
|
|
kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) |
|
|
|
|
|
|
|
|
return cls(**kwargs) |
|
|
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""This method runs the flow. |
|
|
|
|
|
:param input_data: The input data, the input_data is supposed to contain 'goal' |
|
|
:type input_data: dict |
|
|
""" |
|
|
|
|
|
self._state_update_dict(update_data=input_data) |
|
|
|
|
|
|
|
|
self._state_update_dict(update_data={"memory_files": self.memory_files}) |
|
|
|
|
|
max_rounds = self.flow_config.get("max_rounds", 1) |
|
|
if max_rounds is None: |
|
|
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
|
|
|
|
|
self._sequential_run(max_rounds=max_rounds) |
|
|
|
|
|
output = self._get_output_from_state() |
|
|
|
|
|
return output |