|
|
import json |
|
|
from copy import deepcopy |
|
|
from typing import Any, Dict, List |
|
|
|
|
|
from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow |
|
|
|
|
|
from dataclasses import dataclass |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Command: |
|
|
name: str |
|
|
description: str |
|
|
input_args: List[str] |
|
|
|
|
|
class PlanWriterCtrlFlow(ChatAtomicFlow): |
|
|
"""Refer to: https://huggingface.co/aiflows/JarvisFlowModule/blob/main/Controller_JarvisFlow.py |
|
|
This flow is a controller flow that controls the PlanWriterFlow. |
|
|
|
|
|
*Input Interface Non Initialized*: |
|
|
- `goal`: str |
|
|
|
|
|
*Input Interface Initialized*: |
|
|
- `feedback`: str |
|
|
- `goal`: str |
|
|
- `plan`: str |
|
|
|
|
|
*Output Interface*: |
|
|
- `command`: str |
|
|
- `command_args`: Dict[str, Any] |
|
|
|
|
|
*Configuration Parameters*: |
|
|
- `input_interface_non_initialized`: List[str] = ["goal"] |
|
|
- `input_interface_initialized`: List[str] = ["feedback", "goal", "plan"] |
|
|
- `output_interface`: List[str] = ["command", "command_args"] |
|
|
- `backend`: Dict[str, Any] : backend of the LLM |
|
|
- `commands`: List[Dict[str, Any]] : commands that the LLM can execute |
|
|
- `system_message_prompt_template`: str : the template of the system message prompt |
|
|
- `init_human_message_prompt_template`: str : the template of the initial human message prompt |
|
|
- `human_message_prompt_template`: str : the template of the human message prompt |
|
|
- `previous_messages`: Dict[str, Any] : the previous messages of the conversation (sliding window) |
|
|
|
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
commands: List[Command], |
|
|
**kwargs): |
|
|
""" |
|
|
This function initializes the flow. |
|
|
:param commands: List[Command] : commands that the LLM can execute |
|
|
:type commands: List[Command] |
|
|
:param kwargs: other parameters |
|
|
:type kwargs: Dict[str, Any] |
|
|
""" |
|
|
super().__init__(**kwargs) |
|
|
self.system_message_prompt_template = self.system_message_prompt_template.partial( |
|
|
commands=self._build_commands_manual(commands), |
|
|
) |
|
|
self.hint_for_model = """ |
|
|
Make sure your response is in the following format: |
|
|
Response Format: |
|
|
{ |
|
|
"command": "call plan writer, or to finish with a summary", |
|
|
"command_args": { |
|
|
"arg name": "value" |
|
|
} |
|
|
} |
|
|
""" |
|
|
|
|
|
@staticmethod |
|
|
def _build_commands_manual(commands: List[Command]) -> str: |
|
|
""" |
|
|
This function builds the manual for the commands. |
|
|
:param commands: List[Command] : commands that the LLM can execute |
|
|
:type commands: List[Command] |
|
|
:return: the manual for the commands |
|
|
:rtype: str |
|
|
""" |
|
|
ret = "" |
|
|
for i, command in enumerate(commands): |
|
|
command_input_json_schema = json.dumps( |
|
|
{input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args}) |
|
|
ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n" |
|
|
return ret |
|
|
|
|
|
@classmethod |
|
|
def instantiate_from_config(cls, config): |
|
|
""" |
|
|
This function instantiates the flow from the config. |
|
|
:param config: the config of the flow |
|
|
:type config: Dict[str, Any] |
|
|
:return: the instantiated flow |
|
|
:rtype: ChatAtomicFlow |
|
|
""" |
|
|
flow_config = deepcopy(config) |
|
|
|
|
|
kwargs = {"flow_config": flow_config} |
|
|
|
|
|
|
|
|
kwargs.update(cls._set_up_prompts(flow_config)) |
|
|
|
|
|
|
|
|
kwargs.update(cls._set_up_backend(flow_config)) |
|
|
|
|
|
|
|
|
commands = flow_config["commands"] |
|
|
commands = [ |
|
|
Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in |
|
|
commands.items() |
|
|
] |
|
|
kwargs.update({"commands": commands}) |
|
|
|
|
|
|
|
|
return cls(**kwargs) |
|
|
|
|
|
def _update_prompts_and_input(self, input_data: Dict[str, Any]): |
|
|
""" |
|
|
This function updates the prompts and input data. |
|
|
:param input_data: the input data |
|
|
:type input_data: Dict[str, Any] |
|
|
:return: None |
|
|
:rtype: None |
|
|
""" |
|
|
if 'goal' in input_data: |
|
|
input_data['goal'] += self.hint_for_model |
|
|
if 'feedback' in input_data: |
|
|
input_data['feedback'] += self.hint_for_model |
|
|
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
This function runs the flow. |
|
|
:param input_data: the input data |
|
|
:type input_data: Dict[str, Any] |
|
|
:return: the output of the flow |
|
|
:rtype: Dict[str, Any] |
|
|
""" |
|
|
self._update_prompts_and_input(input_data) |
|
|
|
|
|
|
|
|
if self._is_conversation_initialized(): |
|
|
updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data) |
|
|
self._state_update_add_chat_message(content=updated_system_message_content, |
|
|
role=self.flow_config["system_name"]) |
|
|
|
|
|
while True: |
|
|
api_output = super().run(input_data)["api_output"].strip() |
|
|
try: |
|
|
response = json.loads(api_output) |
|
|
return response |
|
|
except (json.decoder.JSONDecodeError, json.JSONDecodeError): |
|
|
updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data) |
|
|
self._state_update_add_chat_message(content=updated_system_message_content, |
|
|
role=self.flow_config["system_name"]) |
|
|
new_goal = "The previous respond cannot be parsed with json.loads. Next time, do not provide any comments or code blocks. Make sure your next response is purely json parsable." |
|
|
new_input_data = input_data.copy() |
|
|
new_input_data['feedback'] = new_goal |
|
|
input_data = new_input_data |
|
|
|