File size: 8,751 Bytes
e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 5af478e e9cd8d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
import json
from copy import deepcopy
from typing import Any, Dict, List
from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow
from dataclasses import dataclass
@dataclass
class Command:
name: str
description: str
input_args: List[str]
class Controller_CoderFlow(ChatAtomicFlow):
"""Refer to: https://huggingface.co/aiflows/JarvisFlowModule/blob/main/Controller_JarvisFlow.py
This flow is used to control the coder flow.
*Input Interface Non Initialized*:
- `goal`
- `plan`
- `code_library`
- `logs`
- `memory_files`
*Input Interface Initialized*:
- `goal`
- `plan`
- `code_library`
- `logs`
- `memory_files`
- `result`
*Output Interface*:
- `command`
- `command_args`
*Configuration Parameters*:
- `Input Interface Non Initialized`: Input interface before the conversation is initialized.
- `Input Interface Initialized`: Input interface after the conversation is initialized.
- `Output Interface`: Output interface.
- `backend`: The backend of the LLM.
- `command`: A list of available commands for the controller to call.
- `system_message_prompt_template`: The template of the system message prompt.
- `init_human_message_prompt_template`: The template of the initial human message prompt.
- `human_message_prompt_template`: The template of the human message prompt.
- `previous_messages`: The sliding window of previous messages.
"""
def __init__(
self,
commands: List[Command],
**kwargs):
"""Initialize the flow.
:param commands: A list of available commands for the controller to call.
:type commands: List[Command]
:param kwargs: Refer to the configuration parameters.
:type kwargs: Dict[str, Any]
"""
super().__init__(**kwargs)
self.system_message_prompt_template = self.system_message_prompt_template.partial(
commands=self._build_commands_manual(commands),
plan="no plans yet",
plan_file_location="no location yet",
code_library="no code library yet",
code_library_location="no location yet",
logs="no logs yet",
)
self.hint_for_model = """
Make sure your response is in the following format:
Response Format:
{
"command": "call one of the subordinates",
"command_args": {
"arg name": "value"
}
}
"""
@staticmethod
def _build_commands_manual(commands: List[Command]) -> str:
"""Build the manual for the commands.
:param commands: A list of available commands for the controller to call.
:type commands: List[Command]
:return: The manual for the commands.
:rtype: str
"""
ret = ""
for i, command in enumerate(commands):
command_input_json_schema = json.dumps(
{input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args})
ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n"
return ret
def _get_content_file_location(self, input_data, content_name):
"""Get the location of the file that contains the content: plan, logs, code_library
:param input_data: The input data.
:type input_data: Dict[str, Any]
:param content_name: The name of the content.
:type content_name: str
:raises AssertionError: If the content is not in the memory files.
:return: The location of the file that contains the content.
:rtype: str
"""
# get the location of the file that contains the content: plan, logs, code_library
assert "memory_files" in input_data, "memory_files not passed to Coder/Controller"
assert content_name in input_data["memory_files"], f"{content_name} not in memory files"
return input_data["memory_files"][content_name]
def _get_content(self, input_data, content_name):
"""Get the content of the file that contains the content: plan, logs, code_library
:param input_data: The input data.
:type input_data: Dict[str, Any]
:param content_name: The name of the content.
:type content_name: str
:raises AssertionError: If the content is not in the memory files.
:return: The content of the file that contains the content.
:rtype: str
"""
# get the content of the file that contains the content: plan, logs, code_library
assert content_name in input_data, f"{content_name} not passed to Coder/Controller"
content = input_data[content_name]
if len(content) == 0:
content = f'No {content_name} yet'
return content
@classmethod
def instantiate_from_config(cls, config):
"""Instantiate the flow from the configuration.
:param config: The configuration.
:type config: Dict[str, Any]
:return: The instantiated flow.
:rtype: Controller_CoderFlow
"""
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Set up prompts ~~~
kwargs.update(cls._set_up_prompts(flow_config))
# ~~~Set up backend ~~~
kwargs.update(cls._set_up_backend(flow_config))
# ~~~ Set up commands ~~~
commands = flow_config["commands"]
commands = [
Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in
commands.items()
]
kwargs.update({"commands": commands})
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def _update_prompts_and_input(self, input_data: Dict[str, Any]):
"""Update the prompts and input data.
:param input_data: The input data.
:type input_data: Dict[str, Any]
:raises AssertionError: If the input data is not valid.
"""
if 'goal' in input_data:
input_data['goal'] += self.hint_for_model
if 'result' in input_data:
input_data['result'] += self.hint_for_model
plan_file_location = self._get_content_file_location(input_data, "plan")
plan_content = self._get_content(input_data, "plan")
code_library_location = self._get_content_file_location(input_data, "code_library")
code_library_content = self._get_content(input_data, "code_library")
logs_content = self._get_content(input_data, "logs")
self.system_message_prompt_template = self.system_message_prompt_template.partial(
plan_file_location=plan_file_location,
plan=plan_content,
code_library_location=code_library_location,
code_library=code_library_content,
logs=logs_content
)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run the flow.
:param input_data: The input data.
:type input_data: Dict[str, Any]
:return: The output data.
:rtype: Dict[str, Any]
"""
self._update_prompts_and_input(input_data)
# ~~~when conversation is initialized, append the updated system prompts to the chat history ~~~
if self._is_conversation_initialized():
updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
self._state_update_add_chat_message(content=updated_system_message_content,
role=self.flow_config["system_name"])
while True:
api_output = super().run(input_data)["api_output"].strip()
try:
start = api_output.index("{")
end = api_output.rindex("}") + 1
json_str = api_output[start:end]
return json.loads(json_str)
except (ValueError, json.decoder.JSONDecodeError, json.JSONDecodeError):
updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
self._state_update_add_chat_message(content=updated_system_message_content,
role=self.flow_config["system_name"])
new_goal = "The previous respond cannot be parsed with json.loads. Next time, do not provide any comments or code blocks. Make sure your next response is purely json parsable."
new_input_data = input_data.copy()
new_input_data['result'] = new_goal
input_data = new_input_data
|