from aiflows.base_flows import CompositeFlow from aiflows.utils import logging from aiflows.interfaces import KeyInterface logging.set_verbosity_debug() log = logging.get_logger(__name__) class ChatWithDemonstrationsFlow(CompositeFlow): """ A Chat with Demonstrations Flow. It is a flow that consists of multiple sub-flows that are executed sequentially. It's parent class is SequentialFlow. It Contains the following subflows: - A Demonstration Flow: It is a flow that passes demonstrations to the ChatFlow - A Chat Flow: It is a flow that uses the demonstrations to answer queries asked by the user/human. An illustration of the flow is as follows: -------> Demonstration Flow -------> Chat Flow -------> *Configuration Parameters*: - `name` (str): The name of the flow. Default: "ChatAtomic_Flow_with_Demonstrations" - `description` (str): A description of the flow. This description is used to generate the help message of the flow. Default: "A sequential flow that answers questions with demonstrations" - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations of the sequential Flow. Default: - `Demonstration Flow`: The configuration of the Demonstration Flow. By default, it a DemonstrationsAtomicFlow. Its default parmaters are defined in DemonstrationsAtomicFlow). - `Chat Flow`: The configuration of the Chat Flow. By default, its a ChatAtomicFlow. Its default parmaters are defined in ChatAtomicFlowModule (see Flowcard, i.e. README.md, of ChatAtomicFlowModule). - `topology` (str): The topology of the flow which is "sequential". By default, the topology is the one shown in the illustration above (the topology is also described in ChatWithDemonstrationsFlow.yaml). *Input Interface*: - `query` (str): A query asked to the flow (e.g. "What is the capital of France?") Output Interface: - `answer` (str): The answer of the flow to the query :param \**kwargs: Arguments to be passed to the parent class SequentialFlow constructor. """ def __init__(self,**kwargs): super().__init__(**kwargs) self.output_interface = KeyInterface( keys_to_rename={"api_output": "answer"} ) def set_up_flow_state(self): super().set_up_flow_state() self.flow_state["last_flow_called"] = None def run(self,input_message): # #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~ # future = self.subflows["demonstration_flow"].send_message_blocking(input_message) # answer = self.subflows["chat_flow"].send_message_blocking(future.get_message()) # self.reply_to_message(reply =self.output_interface(answer.get_message()), to=input_message) # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~ if self.flow_state["last_flow_called"] is None: self.flow_state["input_message"] = input_message self.subflows["demonstration_flow"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"]) self.flow_state["last_flow_called"] = "demonstration_flow" elif self.flow_state["last_flow_called"] == "demonstration_flow": self.subflows["chat_flow"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"]) self.flow_state["last_flow_called"] = "chat_flow" else: self.flow_state["last_flow_called"] = None self.reply_to_message( reply =self.output_interface(input_message), to=self.flow_state["input_message"] )