|
|
from typing import Dict, Any |
|
|
import os |
|
|
|
|
|
from flow_modules.aiflows.ContentWriterFlowModule import ContentWriterFlow |
|
|
from aiflows.base_flows import CircularFlow |
|
|
|
|
|
|
|
|
class PlanWriterFlow(ContentWriterFlow): |
|
|
"""This flow inherits from ContentWriterFlow. |
|
|
In the subflow of the executor, we specify the InteractivePlanGneFlow (https://huggingface.co/aiflows/InteractivePlanGenFlowModule) |
|
|
|
|
|
*Input Interface*: |
|
|
- `goal` |
|
|
|
|
|
*Output Interface*: |
|
|
- `plan` |
|
|
- `result` |
|
|
- `summary` |
|
|
- `status` |
|
|
|
|
|
*Configuration Parameters*: |
|
|
- Also refer to superclass ContentWriterFlow (https://huggingface.co/aiflows/ContentWriterFlowModule) |
|
|
- `input_interface`: the input interface of the flow |
|
|
- `output_interface`: the output interface of the flow |
|
|
- `subflows_config`: the configuration of the subflows |
|
|
- `early_exit_key`: the key in the flow state that indicates the early exit condition |
|
|
- `topology`: the topology of the flow |
|
|
|
|
|
""" |
|
|
def _on_reach_max_round(self): |
|
|
"""This function is called when the flow reaches the maximum amount of rounds. |
|
|
It decides whether to terminate the flow or continue running. |
|
|
""" |
|
|
self._state_update_dict({ |
|
|
"plan": "The maximum amount of rounds was reached before the model generated the plan.", |
|
|
"status": "unfinished" |
|
|
}) |
|
|
|
|
|
@CircularFlow.output_msg_payload_processor |
|
|
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]: |
|
|
"""This function is called when the subflow finishes running. |
|
|
configured in the topology of the subflow config. |
|
|
:param output_payload: the output payload of the subflow |
|
|
:type output_payload: Dict[str, Any] |
|
|
:param src_flow: the subflow that generates the output payload |
|
|
:type src_flow: Flow |
|
|
:return: the processed output payload |
|
|
:rtype: Dict[str, Any] |
|
|
""" |
|
|
command = output_payload["command"] |
|
|
if command == "finish": |
|
|
|
|
|
keys_to_fetch_from_state = ["temp_plan_file_location", "plan", "memory_files"] |
|
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
|
temp_plan_file_location = fetched_state["temp_plan_file_location"] |
|
|
plan_content = fetched_state["plan"] |
|
|
plan_file_location = fetched_state["memory_files"]["plan"] |
|
|
|
|
|
|
|
|
if os.path.exists(temp_plan_file_location): |
|
|
os.remove(temp_plan_file_location) |
|
|
|
|
|
|
|
|
with open(plan_file_location, 'w') as file: |
|
|
file.write(plan_content) |
|
|
|
|
|
|
|
|
return { |
|
|
"EARLY_EXIT": True, |
|
|
"plan": plan_content, |
|
|
"summary": "ExtendLibrary/PlanWriter: " + output_payload["command_args"]["summary"], |
|
|
"status": "finished" |
|
|
} |
|
|
elif command == "manual_finish": |
|
|
|
|
|
keys_to_fetch_from_state = ["temp_plan_file_location"] |
|
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
|
temp_plan_file_location = fetched_state["temp_plan_file_location"] |
|
|
if os.path.exists(temp_plan_file_location): |
|
|
os.remove(temp_plan_file_location) |
|
|
|
|
|
return { |
|
|
"EARLY_EXIT": True, |
|
|
"plan": "no plan was generated", |
|
|
"summary": "ExtendLibrary/PlanWriter: PlanWriter was terminated explicitly by the user, process is unfinished", |
|
|
"status": "unfinished" |
|
|
} |
|
|
elif command == "write_plan": |
|
|
keys_to_fetch_from_state = ["memory_files"] |
|
|
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state) |
|
|
plan_file_location = fetched_state["memory_files"]["plan"] |
|
|
output_payload["command_args"]["plan_file_location"] = plan_file_location |
|
|
return output_payload |
|
|
else: |
|
|
return output_payload |
|
|
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
This function runs the flow. |
|
|
:param input_data: the input data of the flow |
|
|
:type input_data: Dict[str, Any] |
|
|
:return: the output data of the flow |
|
|
:rtype: Dict[str, Any] |
|
|
""" |
|
|
|
|
|
self._state_update_dict(update_data=input_data) |
|
|
|
|
|
max_rounds = self.flow_config.get("max_rounds", 1) |
|
|
if max_rounds is None: |
|
|
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.") |
|
|
|
|
|
self._sequential_run(max_rounds=max_rounds) |
|
|
|
|
|
output = self._get_output_from_state() |
|
|
|
|
|
self.reset(full_reset=True, recursive=True, src_flow=self) |
|
|
|
|
|
return output |