InteractivePlanGenFlowModule / InteractivePlanGenFlow.py
Tachi67's picture
Upload 3 files
5121f3a
raw
history blame
1.26 kB
from copy import deepcopy
from typing import Any, Dict
from flows.base_flows import SequentialFlow
from flows.utils import logging
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
class InteractivePlanGenFlow(SequentialFlow):
REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"]
def __init__(
self,
**kwargs
):
super().__init__(**kwargs)
@classmethod
def instantiate_from_config(cls, config):
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up subflows ~~~
kwargs.update({"subflows": cls._set_up_subflows(flow_config)})
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# ~~~ sets the input_data in the flow_state dict ~~~
self._state_update_dict(update_data=input_data)
max_rounds = self.flow_config.get("max_rounds", 1)
if max_rounds is None:
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")
self._sequential_run(max_rounds=max_rounds)
output = self._get_output_from_state()
return output