File size: 6,448 Bytes
6211d74 035821c 6211d74 035821c f2030ec 6211d74 f2030ec 6211d74 f2030ec 6211d74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction
log = logging.get_logger(f"aiflows.{__name__}")
class ChatHumanFlowModule(CompositeFlow):
""" This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows:
- A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question.
- A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question.
To end the interaction, the user must type "\<END\>"
An illustration of the flow is as follows:
|------> User Flow -----------> |
^ |
| |
| v
|<------ Assistant Flow <-------|
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "ChatHumanFlowModule"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input."
- `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds.
- `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction"
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default:
- `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule.
- `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule.
- `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular".
By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml).
*Input Interface*:
- None. By default, the input interface doesn't expect any input.
*Output Interface*:
- `end_of_interaction` (bool): Whether the interaction is finished or not.
:param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor.
:type \**kwargs: Dict[str, Any]
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"])
self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"])
self.input_interface_assistant = KeyInterface(
keys_to_rename = {"human_input": "query"},
additional_transformations = [self.regex_extractor, self.end_of_interaction]
)
def set_up_flow_state(self):
""" This method sets up the flow state. It is called when the flow is executed."""
super().set_up_flow_state()
self.flow_state["last_flow_called"] = None
self.flow_state["current_round"] = 0
self.flow_state["user_inputs"] = []
self.flow_state["assistant_outputs"] = []
self.flow_state["input_message"] = None
self.flow_state["end_of_interaction"] = False
@classmethod
def type(cls):
""" This method returns the type of the flow."""
return "OpenAIChatHumanFlowModule"
def max_rounds_reached(self):
return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]
def generate_reply(self):
reply = self._package_output_message(
input_message = self.flow_state["input_message"],
response = {
"user_inputs": self.flow_state["user_inputs"],
"assistant_outputs": self.flow_state["assistant_outputs"],
"end_of_interaction": self.flow_state["end_of_interaction"]
},
)
self.reply_to_message(
reply = reply,
to = self.flow_state["input_message"]
)
def call_to_user(self,input_message):
self.flow_state["assistant_outputs"].append(input_message.data["api_output"])
if self.max_rounds_reached():
self.generate_reply()
else:
self.subflows["User"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"])
self.flow_state["last_flow_called"] = "User"
self.flow_state["current_round"] += 1
def call_to_assistant(self,input_message):
message = self.input_interface_assistant(input_message)
if self.flow_state["last_flow_called"] is None:
self.flow_state["input_message"] = input_message
else:
self.flow_state["user_inputs"].append(input_message.data["query"])
if message.data["end_of_interaction"]:
self.flow_state["end_of_interaction"] = True
self.generate_reply()
else:
self.subflows["Assistant"].send_message_async(message,pipe_to=self.flow_config["flow_ref"])
self.flow_state["last_flow_called"] = "Assistant"
def run(self,input_message: FlowMessage):
""" This method runs the flow. It is the main method of the flow and it is called when the flow is executed.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
last_flow_called = self.flow_state["last_flow_called"]
if last_flow_called is None or last_flow_called == "User":
self.call_to_assistant(input_message=input_message)
elif last_flow_called == "Assistant":
self.call_to_user(input_message=input_message)
|