nbaldwin commited on
Commit
c299ff8
·
1 Parent(s): d3ee82f

Clean Versions

Browse files
ChatWithDemonstrationsFlow.py CHANGED
@@ -4,7 +4,7 @@ from aiflows.base_flows import CompositeFlow
4
  from aiflows.utils import logging
5
  from aiflows.interfaces import KeyInterface
6
  logging.set_verbosity_debug()
7
-
8
  log = logging.get_logger(__name__)
9
 
10
 
@@ -50,11 +50,16 @@ class ChatWithDemonstrationsFlow(CompositeFlow):
50
  )
51
 
52
  def set_up_flow_state(self):
 
 
53
  super().set_up_flow_state()
54
- self.flow_state["last_flow_called"] = None
55
 
56
  def run(self,input_message):
57
-
 
 
 
58
  # #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
59
  # future = self.subflows["demonstration_flow"].get_reply_future(input_message)
60
 
@@ -65,23 +70,23 @@ class ChatWithDemonstrationsFlow(CompositeFlow):
65
  # self.send_message(reply)
66
 
67
  # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
68
- if self.flow_state["last_flow_called"] is None:
69
  self.flow_state["input_message"] = input_message
70
  self.subflows["demonstration_flow"].get_reply(
71
  input_message,
72
  self.get_instance_id(),
73
  )
74
- self.flow_state["last_flow_called"] = "demonstration_flow"
75
 
76
- elif self.flow_state["last_flow_called"] == "demonstration_flow":
77
  self.subflows["chat_flow"].get_reply(
78
  input_message,
79
  self.get_instance_id(),
80
  )
81
- self.flow_state["last_flow_called"] = "chat_flow"
82
 
83
  else:
84
- self.flow_state["last_flow_called"] = None
85
 
86
  reply = self.package_output_message(
87
  self.flow_state["input_message"],
 
4
  from aiflows.utils import logging
5
  from aiflows.interfaces import KeyInterface
6
  logging.set_verbosity_debug()
7
+ from aiflows.messages import FlowMessage
8
  log = logging.get_logger(__name__)
9
 
10
 
 
50
  )
51
 
52
  def set_up_flow_state(self):
53
+ """ Set up the flow state. It sets the last state of the flow to None.
54
+ """
55
  super().set_up_flow_state()
56
+ self.flow_state["last_state"] = None
57
 
58
  def run(self,input_message):
59
+ """ Runs the flow.
60
+ :param input_message: The input message of the flow.
61
+ :type input_message: FlowMessage
62
+ """
63
  # #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
64
  # future = self.subflows["demonstration_flow"].get_reply_future(input_message)
65
 
 
70
  # self.send_message(reply)
71
 
72
  # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
73
+ if self.flow_state["last_state"] is None:
74
  self.flow_state["input_message"] = input_message
75
  self.subflows["demonstration_flow"].get_reply(
76
  input_message,
77
  self.get_instance_id(),
78
  )
79
+ self.flow_state["last_state"] = "demonstration_flow"
80
 
81
+ elif self.flow_state["last_state"] == "demonstration_flow":
82
  self.subflows["chat_flow"].get_reply(
83
  input_message,
84
  self.get_instance_id(),
85
  )
86
+ self.flow_state["last_state"] = "chat_flow"
87
 
88
  else:
89
+ self.flow_state["last_state"] = None
90
 
91
  reply = self.package_output_message(
92
  self.flow_state["input_message"],
ChatWithDemonstrationsFlow.yaml CHANGED
@@ -5,9 +5,15 @@ description: "A sequential flow that answers questions with demonstrations"
5
  subflows_config:
6
  demonstration_flow:
7
  _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
 
 
 
8
  name: "DemonstrationsAtomicFlow"
9
  description: "A flow that answers questions with demonstrations"
10
  chat_flow:
11
  _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
12
  name: "ChatAtomicFlow"
13
  description: "A flow that answers questions"
 
 
 
 
5
  subflows_config:
6
  demonstration_flow:
7
  _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
8
+ flow_class_name: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow
9
+ flow_endpoint: DemonstrationsAtomicFlow
10
+ user_id: local
11
  name: "DemonstrationsAtomicFlow"
12
  description: "A flow that answers questions with demonstrations"
13
  chat_flow:
14
  _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
15
  name: "ChatAtomicFlow"
16
  description: "A flow that answers questions"
17
+ flow_class_name: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow
18
+ flow_endpoint: ChatAtomicFlow
19
+ user_id: local
DemonstrationsAtomicFlow.py CHANGED
@@ -5,6 +5,7 @@ from aiflows.utils import general_helpers
5
  from typing import Dict,Any,Optional,List
6
  from aiflows.prompt_template import JinjaPrompt
7
  from copy import deepcopy
 
8
  import os
9
  import hydra
10
  log = logging.get_logger(__name__)
@@ -41,7 +42,7 @@ class DemonstrationsAtomicFlow(AtomicFlow):
41
 
42
  *Output Interface*:
43
 
44
- - The input interface expected by its successor flow (e.g. typically ChatAtomicFlow so the input interface expected by ChatAtomicFlow))
45
  - `demonstrations` (List[Dict[str, Any]]): A list of demonstrations. Each demonstration is a dictionary with the following keys:
46
  - idx (int): The index of the demonstration
47
  - query (str): The query of the demonstration
@@ -182,13 +183,11 @@ class DemonstrationsAtomicFlow(AtomicFlow):
182
  log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
183
 
184
  def run(self,
185
- input_message):
186
- """ This method runs the flow. It returns the input data of the flow with the demonstrations added to it.
187
 
188
- :param input_data: The input data of the flow.
189
- :type input_data: Dict[str, Any]
190
- :return: The input data of the flow with the demonstrations added to it.
191
- :rtype: Dict[str, Any]
192
  """
193
  input_data = input_message.data
194
  reply = self.package_output_message(
 
5
  from typing import Dict,Any,Optional,List
6
  from aiflows.prompt_template import JinjaPrompt
7
  from copy import deepcopy
8
+ from aiflows.messages import FlowMessage
9
  import os
10
  import hydra
11
  log = logging.get_logger(__name__)
 
42
 
43
  *Output Interface*:
44
 
45
+ - Whichever data that was passed in the input_message (e.g. typically ChatAtomicFlow so the input interface expected by ChatAtomicFlow))
46
  - `demonstrations` (List[Dict[str, Any]]): A list of demonstrations. Each demonstration is a dictionary with the following keys:
47
  - idx (int): The index of the demonstration
48
  - query (str): The query of the demonstration
 
183
  log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
184
 
185
  def run(self,
186
+ input_message: FlowMessage):
187
+ """ This method runs the flow. It returns the data of the input_message with the demonstrations added to it.
188
 
189
+ :param input_message: The input message of the flow.
190
+ :type input_message: FlowMessage
 
 
191
  """
192
  input_data = input_message.data
193
  reply = self.package_output_message(
DemonstrationsAtomicFlow.yaml CHANGED
@@ -1,6 +1,6 @@
1
  name: "DemonstrationAtomicFlow"
2
  description: "A flow which returns Demonstrations"
3
-
4
  data: ??? #e.g. [{"query_data": {"query": "What is the capital of France?"}, "response_data": {"response": "Paris, my sir."}}]
5
  params:
6
  data_dir: ???
 
1
  name: "DemonstrationAtomicFlow"
2
  description: "A flow which returns Demonstrations"
3
+ _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
4
  data: ??? #e.g. [{"query_data": {"query": "What is the capital of France?"}, "response_data": {"response": "Paris, my sir."}}]
5
  params:
6
  data_dir: ???
demo.yaml CHANGED
@@ -9,7 +9,11 @@ output_interface: # Connector between the Flow's output and the caller
9
 
10
  subflows_config:
11
  demonstration_flow:
12
- _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
 
 
 
 
13
  data:
14
  - query_data:
15
  query: "What is the capital of Turkey?"
@@ -32,7 +36,10 @@ subflows_config:
32
 
33
  chat_flow:
34
  _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
35
- name: "SimpleQA_Flow"
 
 
 
36
  # ~~~ Input interface specification ~~~
37
  input_interface_non_initialized:
38
  - "question"
 
9
 
10
  subflows_config:
11
  demonstration_flow:
12
+ _target_: flow_modules.aiflows.ChatWithDemonstrationsFlowModule.DemonstrationsAtomicFlow.instantiate_from_default_config
13
+ name: "proxy of DemonstrationsAtomicFlow"
14
+ flow_endpoint: DemonstrationsAtomicFlow
15
+ user_id: local
16
+ description: "A flow that answers questions with demonstrations"
17
  data:
18
  - query_data:
19
  query: "What is the capital of Turkey?"
 
36
 
37
  chat_flow:
38
  _target_: flow_modules.aiflows.ChatFlowModule.ChatAtomicFlow.instantiate_from_default_config
39
+ name: "proxy SimpleQA_Flow"
40
+ description: "A flow that answers questions"
41
+ flow_endpoint: ChatAtomicFlow
42
+ user_id: local
43
  # ~~~ Input interface specification ~~~
44
  input_interface_non_initialized:
45
  - "question"
run.py CHANGED
@@ -41,17 +41,6 @@ if __name__ == "__main__":
41
  cfg_path = os.path.join(root_dir, "demo.yaml")
42
  cfg = read_yaml_file(cfg_path)
43
 
44
- #2.1 ~~~ Set the API information ~~~
45
- # OpenAI backend
46
- api_information = [ApiInfo(backend_used="openai",
47
- api_key = os.getenv("OPENAI_API_KEY"))]
48
- # # Azure backend
49
- # api_information = ApiInfo(backend_used = "azure",
50
- # api_base = os.getenv("AZURE_API_BASE"),
51
- # api_key = os.getenv("AZURE_OPENAI_KEY"),
52
- # api_version = os.getenv("AZURE_API_VERSION") )
53
-
54
-
55
  #2.1 ~~~ Set the API information ~~~
56
  # OpenAI backend
57
  api_information = [ApiInfo(backend_used="openai",
@@ -66,26 +55,25 @@ if __name__ == "__main__":
66
 
67
 
68
  #3. ~~~~ Serve The Flow ~~~~
 
69
  serve_utils.recursive_serve_flow(
70
  cl = cl,
71
- flow_type="ChatWithDemonstrationFlowModule",
72
- default_config=cfg,
73
- default_state=None,
74
- default_dispatch_point="coflows_dispatch"
75
  )
 
76
 
77
  #4. ~~~~~Start A Worker Thread~~~~~
78
- run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
79
 
80
  #5. ~~~~~Mount the flow and get its proxy~~~~~~
81
- proxy_flow = serve_utils.recursive_mount(
82
  cl=cl,
83
- client_id="local",
84
- flow_type="ChatWithDemonstrationFlowModule",
85
- config_overrides=None,
86
- initial_state=None,
87
- dispatch_point_override=None,
88
- )
89
 
90
  #6. ~~~ Get the data ~~~
91
  data = {"id": 0, "question": "What is the capital of France?"} # This can be a list of samples
 
41
  cfg_path = os.path.join(root_dir, "demo.yaml")
42
  cfg = read_yaml_file(cfg_path)
43
 
 
 
 
 
 
 
 
 
 
 
 
44
  #2.1 ~~~ Set the API information ~~~
45
  # OpenAI backend
46
  api_information = [ApiInfo(backend_used="openai",
 
55
 
56
 
57
  #3. ~~~~ Serve The Flow ~~~~
58
+
59
  serve_utils.recursive_serve_flow(
60
  cl = cl,
61
+ flow_class_name="flow_modules.aiflows.ChatWithDemonstrationsFlowModule.ChatWithDemonstrationsFlow",
62
+ flow_endpoint="ChatWithDemonstrationsFlow",
 
 
63
  )
64
+
65
 
66
  #4. ~~~~~Start A Worker Thread~~~~~
67
+ run_dispatch_worker_thread(cl)
68
 
69
  #5. ~~~~~Mount the flow and get its proxy~~~~~~
70
+ proxy_flow= serve_utils.get_flow_instance(
71
  cl=cl,
72
+ flow_endpoint="ChatWithDemonstrationsFlow",
73
+ user_id="local",
74
+ config_overrides = cfg
75
+ )
76
+
 
77
 
78
  #6. ~~~ Get the data ~~~
79
  data = {"id": 0, "question": "What is the capital of France?"} # This can be a list of samples