File size: 6,136 Bytes
b7c0c96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c25e495
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b7c0c96
c25e495
b7c0c96
 
 
 
c25e495
 
 
 
 
 
 
b7c0c96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c25e495
 
 
 
 
 
 
b7c0c96
 
 
 
 
 
 
 
 
c25e495
 
 
 
 
 
 
b7c0c96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c25e495
 
 
 
 
 
 
b7c0c96
 
 
 
 
 
c25e495
 
 
 
 
 
 
b7c0c96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import json
from copy import deepcopy
from typing import Any, Dict, List

from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow

from dataclasses import dataclass


@dataclass
class Command:
    name: str
    description: str
    input_args: List[str]

class PlanWriterCtrlFlow(ChatAtomicFlow):
    """Refer to: https://huggingface.co/aiflows/JarvisFlowModule/blob/main/Controller_JarvisFlow.py
    This flow is a controller flow that controls the PlanWriterFlow.

    *Input Interface Non Initialized*:
    - `goal`: str

    *Input Interface Initialized*:
    - `feedback`: str
    - `goal`: str
    - `plan`: str

    *Output Interface*:
    - `command`: str
    - `command_args`: Dict[str, Any]

    *Configuration Parameters*:
    - `input_interface_non_initialized`: List[str] = ["goal"]
    - `input_interface_initialized`: List[str] = ["feedback", "goal", "plan"]
    - `output_interface`: List[str] = ["command", "command_args"]
    - `backend`: Dict[str, Any] : backend of the LLM
    - `commands`: List[Dict[str, Any]] : commands that the LLM can execute
    - `system_message_prompt_template`: str : the template of the system message prompt
    - `init_human_message_prompt_template`: str : the template of the initial human message prompt
    - `human_message_prompt_template`: str : the template of the human message prompt
    - `previous_messages`: Dict[str, Any] : the previous messages of the conversation (sliding window)

    """

    def __init__(
            self,
            commands: List[Command],
            **kwargs):
        """
        This function initializes the flow.
        :param commands: List[Command] : commands that the LLM can execute
        :type commands: List[Command]
        :param kwargs: other parameters
        :type kwargs: Dict[str, Any]
        """
        super().__init__(**kwargs)
        self.system_message_prompt_template = self.system_message_prompt_template.partial(
            commands=self._build_commands_manual(commands),
        )
        self.hint_for_model = """
        Make sure your response is in the following format:
              Response Format:
              {
              "command": "call plan writer, or to finish with a summary",
              "command_args": {
                  "arg name": "value"
                  }
              }
        """

    @staticmethod
    def _build_commands_manual(commands: List[Command]) -> str:
        """
        This function builds the manual for the commands.
        :param commands: List[Command] : commands that the LLM can execute
        :type commands: List[Command]
        :return: the manual for the commands
        :rtype: str
        """
        ret = ""
        for i, command in enumerate(commands):
            command_input_json_schema = json.dumps(
                {input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args})
            ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n"
        return ret

    @classmethod
    def instantiate_from_config(cls, config):
        """
        This function instantiates the flow from the config.
        :param config: the config of the flow
        :type config: Dict[str, Any]
        :return: the instantiated flow
        :rtype: ChatAtomicFlow
        """
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Set up prompts ~~~
        kwargs.update(cls._set_up_prompts(flow_config))

        # ~~~Set up backend ~~~
        kwargs.update(cls._set_up_backend(flow_config))

        # ~~~ Set up commands ~~~
        commands = flow_config["commands"]
        commands = [
            Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in
            commands.items()
        ]
        kwargs.update({"commands": commands})

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)

    def _update_prompts_and_input(self, input_data: Dict[str, Any]):
        """
        This function updates the prompts and input data.
        :param input_data: the input data
        :type input_data: Dict[str, Any]
        :return: None
        :rtype: None
        """
        if 'goal' in input_data:
            input_data['goal'] += self.hint_for_model
        if 'feedback' in input_data:
            input_data['feedback'] += self.hint_for_model

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        This function runs the flow.
        :param input_data: the input data
        :type input_data: Dict[str, Any]
        :return: the output of the flow
        :rtype: Dict[str, Any]
        """
        self._update_prompts_and_input(input_data)
        
        # ~~~when conversation is initialized, append the updated system prompts to the chat history ~~~
        if self._is_conversation_initialized():
            updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
            self._state_update_add_chat_message(content=updated_system_message_content,
                                                role=self.flow_config["system_name"])
        
        while True:
            api_output = super().run(input_data)["api_output"].strip()
            try:
                response = json.loads(api_output)
                return response
            except (json.decoder.JSONDecodeError, json.JSONDecodeError):
                updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
                self._state_update_add_chat_message(content=updated_system_message_content,
                                                    role=self.flow_config["system_name"])
                new_goal = "The previous respond cannot be parsed with json.loads. Next time, do not provide any comments or code blocks. Make sure your next response is purely json parsable."
                new_input_data = input_data.copy()
                new_input_data['feedback'] = new_goal
                input_data = new_input_data