File size: 1,256 Bytes
51219b7 5121f3a 51219b7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
from copy import deepcopy
from typing import Any, Dict
from flows.base_flows import SequentialFlow
from flows.utils import logging
logging.set_verbosity_debug()
log = logging.get_logger(__name__)
class InteractivePlanGenFlow(SequentialFlow):
REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"]
def __init__(
self,
**kwargs
):
super().__init__(**kwargs)
@classmethod
def instantiate_from_config(cls, config):
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up subflows ~~~
kwargs.update({"subflows": cls._set_up_subflows(flow_config)})
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# ~~~ sets the input_data in the flow_state dict ~~~
self._state_update_dict(update_data=input_data)
max_rounds = self.flow_config.get("max_rounds", 1)
if max_rounds is None:
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")
self._sequential_run(max_rounds=max_rounds)
output = self._get_output_from_state()
return output
|