from copy import deepcopy from typing import Any, Dict from flows.base_flows import SequentialFlow from flows.utils import logging logging.set_verbosity_debug() log = logging.get_logger(__name__) class InteractivePlanGenFlow(SequentialFlow): REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology"] def __init__( self, **kwargs ): super().__init__(**kwargs) @classmethod def instantiate_from_config(cls, config): flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up subflows ~~~ kwargs.update({"subflows": cls._set_up_subflows(flow_config)}) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: # ~~~ sets the input_data in the flow_state dict ~~~ self._state_update_dict(update_data=input_data) max_rounds = self.flow_config.get("max_rounds", 1) if max_rounds is None: log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") self._sequential_run(max_rounds=max_rounds) output = self._get_output_from_state() return output