from flow_modules.aiflows.HumanStandardInputFlowModule import HumanStandardInputFlow from typing import Dict, Any from aiflows.messages import UpdateMessage_Generic from aiflows.utils import logging log = logging.get_logger(f"aiflows.{__name__}") class PlanWriterAskUserFlow(HumanStandardInputFlow): """ Refer to: https://huggingface.co/aiflows/ExtendLibraryFlowModule/blob/main/ExtLibAskUserFlow.py This flow is used to ask the user a question and get a response. *Input Interface*: - `question` *Output Interface*: - `feedback` - `plan` *Configuration Parameters*: - `query_message_prompt_template`: The message template to prompt the user for input. - `request_multi_line_input_flag`: Whether to request multi-line input from the user. - `end_of_input_string`: The string to enter to indicate the end of input. """ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """ Run the flow module. :param input_data: The input data to the flow module. :type input_data: Dict[str, Any] :return: The output data from the flow module. :rtype: Dict[str, Any] """ query_message = self._get_message(self.query_message_prompt_template, input_data) state_update_message = UpdateMessage_Generic( created_by=self.flow_config['name'], updated_flow=self.flow_config["name"], data={"query_message": query_message}, ) self._log_message(state_update_message) log.info(query_message) human_input = self._read_input() response = {} response["feedback"] = human_input response["plan"] = "no plan was written" return response