Tachi67 commited on
Commit
e971031
·
1 Parent(s): 18afe02

Update ControllerFlow_ExtLib.py

Browse files
Files changed (1) hide show
  1. ControllerFlow_ExtLib.py +65 -127
ControllerFlow_ExtLib.py CHANGED
@@ -1,127 +1,65 @@
1
- import json
2
- from copy import deepcopy
3
- from typing import Any, Dict, List
4
-
5
- from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow
6
-
7
- from dataclasses import dataclass
8
-
9
-
10
- @dataclass
11
- class Command:
12
- name: str
13
- description: str
14
- input_args: List[str]
15
-
16
- class ControllerFlow_ExtLib(ChatAtomicFlow):
17
- def __init__(
18
- self,
19
- commands: List[Command],
20
- **kwargs):
21
- super().__init__(**kwargs)
22
- self.system_message_prompt_template = self.system_message_prompt_template.partial(
23
- commands=self._build_commands_manual(commands),
24
- plan_file_location="no location yet",
25
- plan="no plan yet",
26
- logs="no logs yet",
27
- )
28
- self.hint_for_model = """
29
- Make sure your response is in the following format:
30
- Response Format:
31
- {
32
- "command": "call one of the commands you have e.g. `write_code`",
33
- "command_args": {
34
- "arg name": "value"
35
- }
36
- }
37
- """
38
-
39
- @staticmethod
40
- def _build_commands_manual(commands: List[Command]) -> str:
41
- ret = ""
42
- for i, command in enumerate(commands):
43
- command_input_json_schema = json.dumps(
44
- {input_arg: f"YOUR_{input_arg.upper()}" for input_arg in command.input_args})
45
- ret += f"{i + 1}. {command.name}: {command.description} Input arguments (given in the JSON schema): {command_input_json_schema}\n"
46
- return ret
47
-
48
- def _get_plan_file_location(self, input_data: Dict[str, Any]):
49
- assert "memory_files" in input_data, "memory_files not passed to Extlib/Controller"
50
- assert "plan" in input_data["memory_files"], "plan not in memory files"
51
- return input_data["memory_files"]["plan"]
52
-
53
- def _get_plan_content(self, input_data: Dict[str, Any]):
54
- assert "plan" in input_data, "plan not passed to Extlib/Controller"
55
- plan_content = input_data["plan"]
56
- if len(plan_content) == 0:
57
- plan_content = 'No plan yet'
58
- return plan_content
59
-
60
- def _get_logs_content(self, input_data: Dict[str, Any]):
61
- assert "logs" in input_data, "logs not passed to Extlib/Controller"
62
- logs_content = input_data["logs"]
63
- if len(logs_content) == 0:
64
- logs_content = "No logs yet"
65
- return logs_content
66
-
67
- @classmethod
68
- def instantiate_from_config(cls, config):
69
- flow_config = deepcopy(config)
70
-
71
- kwargs = {"flow_config": flow_config}
72
-
73
- # ~~~ Set up prompts ~~~
74
- kwargs.update(cls._set_up_prompts(flow_config))
75
-
76
- # ~~~Set up backend ~~~
77
- kwargs.update(cls._set_up_backend(flow_config))
78
-
79
- # ~~~ Set up commands ~~~
80
- commands = flow_config["commands"]
81
- commands = [
82
- Command(name, command_conf["description"], command_conf["input_args"]) for name, command_conf in
83
- commands.items()
84
- ]
85
- kwargs.update({"commands": commands})
86
-
87
- # ~~~ Instantiate flow ~~~
88
- return cls(**kwargs)
89
-
90
- def _update_prompts_and_input(self, input_data: Dict[str, Any]):
91
- if 'goal' in input_data:
92
- input_data['goal'] += self.hint_for_model
93
- if 'result' in input_data:
94
- input_data['result'] += self.hint_for_model
95
- plan_file_location = self._get_plan_file_location(input_data)
96
- plan_content = self._get_plan_content(input_data)
97
- logs_content = self._get_logs_content(input_data)
98
- self.system_message_prompt_template = self.system_message_prompt_template.partial(
99
- plan_file_location=plan_file_location,
100
- plan=plan_content,
101
- logs=logs_content
102
- )
103
-
104
- def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
105
- self._update_prompts_and_input(input_data)
106
-
107
- # ~~~when conversation is initialized, append the updated system prompts to the chat history ~~~
108
- if self._is_conversation_initialized():
109
- updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
110
- self._state_update_add_chat_message(content=updated_system_message_content,
111
- role=self.flow_config["system_name"])
112
-
113
- while True:
114
- api_output = super().run(input_data)["api_output"].strip()
115
- try:
116
- start = api_output.index("{")
117
- end = api_output.rindex("}") + 1
118
- json_str = api_output[start:end]
119
- return json.loads(json_str)
120
- except (json.decoder.JSONDecodeError, json.JSONDecodeError):
121
- updated_system_message_content = self._get_message(self.system_message_prompt_template, input_data)
122
- self._state_update_add_chat_message(content=updated_system_message_content,
123
- role=self.flow_config["system_name"])
124
- new_goal = "The previous respond cannot be parsed with json.loads. Next time, do not provide any comments or code blocks. Make sure your next response is purely json parsable."
125
- new_input_data = input_data.copy()
126
- new_input_data['result'] = new_goal
127
- input_data = new_input_data
 
1
+ from typing import Dict, Any
2
+
3
+ from flow_modules.Tachi67.AbstractBossFlowModule import CtrlExMemFlow
4
+ from aiflows.base_flows import CircularFlow
5
+
6
+ class CtrlExMem_ExtLib(CtrlExMemFlow):
7
+ """This class inherits from the CtrlExMemFlow class from AbstractBossFlowModule.
8
+ See: https://huggingface.co/Tachi67/AbstractBossFlowModule/blob/main/CtrlExMemFlow.py
9
+ *Input Interface*:
10
+ - `plan`
11
+ - `logs`
12
+ - `memory_files`
13
+ - `goal`
14
+ *Output Interface*
15
+ - `result` (str): The result of the flow, the result will be returned to the caller.
16
+ - `summary` (str): The summary of the flow, the summary will be logged into the logs of the caller flow.
17
+
18
+ """
19
+ def _on_reach_max_round(self):
20
+ self._state_update_dict({
21
+ "result": "the maximum amount of rounds was reached before the extend library flow has done the job",
22
+ "summary": "ExtendLibraryFlow: the maximum amount of rounds was reached before the flow has done the job",
23
+ "status": "unfinished"
24
+ })
25
+
26
+ @CircularFlow.output_msg_payload_processor
27
+ def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
28
+ command = output_payload["command"]
29
+ if command == "finish":
30
+ return {
31
+ "EARLY_EXIT": True,
32
+ "result": output_payload["command_args"]["summary"],
33
+ "summary": "ExtendLibrary: " + output_payload["command_args"]["summary"],
34
+ "status": "finished"
35
+ }
36
+ elif command == "manual_finish":
37
+ # ~~~ return the manual quit status ~~~
38
+ return {
39
+ "EARLY_EXIT": True,
40
+ "result": "ExtendLibraryFlow was terminated explicitly by the user, process is unfinished",
41
+ "summary": "ExtendLibrary: process terminated by the user explicitly, nothing generated",
42
+ "status": "unfinished"
43
+ }
44
+ elif command == "save_code":
45
+ keys_to_fetch_from_state = ["code", "memory_files"]
46
+ fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
47
+ output_payload["command_args"]["code"] = fetched_state["code"]
48
+ output_payload["command_args"]["memory_files"] = fetched_state["memory_files"]
49
+ return output_payload
50
+
51
+ elif command == "update_plan":
52
+ keys_to_fetch_from_state = ["memory_files"]
53
+ fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
54
+ output_payload["command_args"]["memory_files"] = fetched_state["memory_files"]
55
+ return output_payload
56
+
57
+ elif command == "re_plan":
58
+ keys_to_fetch_from_state = ["plan", "memory_files"]
59
+ fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
60
+ output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"]
61
+ output_payload["command_args"]["plan"] = fetched_state["plan"]
62
+ return output_payload
63
+
64
+ else:
65
+ return output_payload